Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
I
inventory-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Ascend
inventory-service
Commits
5061bffd
Commit
5061bffd
authored
May 19, 2021
by
Ben Anderson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Deployment 57
parent
2062559a
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
151 additions
and
151 deletions
+151
-151
deployment.yaml
deployment.yaml
+1
-1
KafkaReceiverConfig.java
...m/ascend/inventory/configuration/KafkaReceiverConfig.java
+67
-67
KafkaListenerService.java
.../nisum/ascend/inventory/service/KafkaListenerService.java
+83
-83
No files found.
deployment.yaml
View file @
5061bffd
...
...
@@ -14,7 +14,7 @@ spec:
spec
:
containers
:
-
name
:
afp-prods-container
image
:
nexus.mynisum.com/afp-prods:5
6
image
:
nexus.mynisum.com/afp-prods:5
7
imagePullPolicy
:
Always
ports
:
-
containerPort
:
8083
...
...
src/main/java/com/nisum/ascend/inventory/configuration/KafkaReceiverConfig.java
View file @
5061bffd
package
com
.
nisum
.
ascend
.
inventory
.
configuration
;
import
com.nisum.ascend.inventory.dto.Order
;
import
com.nisum.ascend.inventory.dto.WareHouseOrder
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.consumer.ConsumerConfig
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.common.serialization.StringDeserializer
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.kafka.support.serializer.JsonDeserializer
;
import
reactor.kafka.receiver.KafkaReceiver
;
import
reactor.kafka.receiver.ReceiverOptions
;
import
java.util.Collection
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.Map
;
@Configuration
@Slf4j
public
class
KafkaReceiverConfig
{
@Value
(
"${kafka.consumer.bootstrap-servers}"
)
private
String
bootstrapServers
;
@Value
(
"${kafka.consumer.group-id}"
)
private
String
groupId
;
@Bean
(
"kafkaWarehouseOrderReceiver"
)
public
KafkaReceiver
<
String
,
String
>
kafkaWarehouseOrderEventReceiver
(
@Value
(
"${kafka.WAREHOUSETOPIC.input}"
)
String
posLogTopic
)
{
ReceiverOptions
<
String
,
String
>
receiverOptions
=
ReceiverOptions
.
create
(
WarehouseOrderEventReceiverConfig
());
receiverOptions
.
maxCommitAttempts
(
3
);
return
KafkaReceiver
.
create
(
receiverOptions
.
addAssignListener
(
Collection:
:
iterator
)
.
subscription
(
Collections
.
singleton
(
posLogTopic
)));
}
private
Map
<
String
,
Object
>
WarehouseOrderEventReceiverConfig
()
{
Map
<
String
,
Object
>
config
=
new
HashMap
<>();
config
.
put
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
groupId
);
config
.
put
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
bootstrapServers
);
config
.
put
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
config
.
put
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
return
config
;
}
@Bean
(
"kafkaOrderReceiver"
)
public
KafkaReceiver
<
String
,
String
>
kafkaOrderEventReceiver
(
@Value
(
"${kafka.ORDERTOPIC.input}"
)
String
posLogTopic
)
{
ReceiverOptions
<
String
,
String
>
receiverOptions
=
ReceiverOptions
.
create
(
OrderEventReceiverConfig
());
receiverOptions
.
maxCommitAttempts
(
3
);
return
KafkaReceiver
.
create
(
receiverOptions
.
addAssignListener
(
Collection:
:
iterator
)
.
subscription
(
Collections
.
singleton
(
posLogTopic
)));
}
private
Map
<
String
,
Object
>
OrderEventReceiverConfig
()
{
Map
<
String
,
Object
>
config
=
new
HashMap
<>();
config
.
put
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
groupId
);
config
.
put
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
bootstrapServers
);
config
.
put
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
config
.
put
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
return
config
;
}
}
\ No newline at end of file
//package com.nisum.ascend.inventory.configuration;
//
//
//import com.nisum.ascend.inventory.dto.Order;
//import com.nisum.ascend.inventory.dto.WareHouseOrder;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.kafka.clients.consumer.ConsumerConfig;
//import org.apache.kafka.clients.producer.ProducerConfig;
//import org.apache.kafka.common.serialization.StringDeserializer;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.kafka.support.serializer.JsonDeserializer;
//import reactor.kafka.receiver.KafkaReceiver;
//import reactor.kafka.receiver.ReceiverOptions;
//
//import java.util.Collection;
//import java.util.Collections;
//import java.util.HashMap;
//import java.util.Map;
//
//@Configuration
//@Slf4j
//public class KafkaReceiverConfig {
//
// @Value("${kafka.consumer.bootstrap-servers}")
// private String bootstrapServers;
// @Value("${kafka.consumer.group-id}")
// private String groupId;
//
// @Bean("kafkaWarehouseOrderReceiver")
// public KafkaReceiver<String, String> kafkaWarehouseOrderEventReceiver(
// @Value("${kafka.WAREHOUSETOPIC.input}") String posLogTopic) {
// ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(WarehouseOrderEventReceiverConfig());
// receiverOptions.maxCommitAttempts(3);
// return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
// .subscription(Collections.singleton(posLogTopic)));
// }
//
// private Map<String, Object> WarehouseOrderEventReceiverConfig() {
// Map<String, Object> config = new HashMap<>();
// config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// return config;
// }
//
// @Bean("kafkaOrderReceiver")
// public KafkaReceiver<String, String> kafkaOrderEventReceiver(
// @Value("${kafka.ORDERTOPIC.input}") String posLogTopic) {
// ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(OrderEventReceiverConfig());
// receiverOptions.maxCommitAttempts(3);
// return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
// .subscription(Collections.singleton(posLogTopic)));
// }
//
// private Map<String, Object> OrderEventReceiverConfig() {
// Map<String, Object> config = new HashMap<>();
// config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// return config;
// }
//
//}
\ No newline at end of file
src/main/java/com/nisum/ascend/inventory/service/KafkaListenerService.java
View file @
5061bffd
package
com
.
nisum
.
ascend
.
inventory
.
service
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.nisum.ascend.inventory.dto.Item
;
import
com.nisum.ascend.inventory.dto.Order
;
import
com.nisum.ascend.inventory.dto.WareHouseOrder
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Qualifier
;
import
org.springframework.boot.context.event.ApplicationStartedEvent
;
import
org.springframework.context.event.EventListener
;
import
org.springframework.stereotype.Component
;
import
reactor.kafka.receiver.KafkaReceiver
;
import
java.util.List
;
@Component
@Slf4j
public
class
KafkaListenerService
{
@Autowired
@Qualifier
(
"kafkaWarehouseOrderReceiver"
)
private
KafkaReceiver
<
String
,
String
>
kafkaWarehouseOrderReceiver
;
@Autowired
@Qualifier
(
"kafkaOrderReceiver"
)
private
KafkaReceiver
<
String
,
String
>
kafkaOrderReceiver
;
@Autowired
private
ProductService
productService
;
@EventListener
(
ApplicationStartedEvent
.
class
)
public
void
consumeWarehouseOrderStatus
()
{
kafkaWarehouseOrderReceiver
.
receive
()
.
doOnNext
(
record
->
log
.
info
(
"record: {}"
,
record
))
.
doOnNext
(
record
->
onWarehouseOrderStatusReceived
(
record
.
value
()))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
.
getMessage
()))
.
subscribe
();
}
private
void
onWarehouseOrderStatusReceived
(
String
warehouseOrderString
)
{
try
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
WareHouseOrder
warehouseOrder
=
objectMapper
.
readValue
(
warehouseOrderString
,
WareHouseOrder
.
class
);
List
<
Item
>
itemList
=
warehouseOrder
.
getOrderItems
();
String
status
=
warehouseOrder
.
getStatus
();
log
.
info
(
"Received this data: {}"
,
warehouseOrder
);
log
.
info
(
"recieved this list: {}"
,
itemList
);
for
(
Item
item
:
itemList
)
{
productService
.
updateProductInventoryBySkuWareHouse
(
item
.
getItemSku
(),
status
,
item
.
getItemQuantity
()).
subscribe
();
}
}
catch
(
Exception
e
)
{
log
.
error
(
"error"
,
e
);
}
}
@EventListener
(
ApplicationStartedEvent
.
class
)
public
void
consumeOrderStatus
()
{
kafkaOrderReceiver
.
receive
()
.
doOnNext
(
record
->
log
.
info
(
"record: {}"
,
record
))
.
doOnNext
(
record
->
onOrderStatusReceived
(
record
.
value
()))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
.
getMessage
()))
.
subscribe
();
}
private
void
onOrderStatusReceived
(
String
orderString
)
{
try
{
ObjectMapper
objectMapper
=
new
ObjectMapper
();
Order
order
=
objectMapper
.
readValue
(
orderString
,
Order
.
class
);
List
<
Item
>
itemList
=
order
.
getOrderItems
();
String
status
=
order
.
getOrderStatus
();
log
.
info
(
"Received this data: {}"
,
order
);
log
.
info
(
"recieved this list: {}"
,
itemList
);
for
(
Item
item
:
itemList
)
{
productService
.
updateProductInventoryBySkuOrder
(
item
.
getItemSku
(),
status
,
item
.
getItemQuantity
()).
block
();
}
}
catch
(
Exception
e
)
{
log
.
error
(
"error"
,
e
);
}
}
}
//
package com.nisum.ascend.inventory.service;
//
//
import com.fasterxml.jackson.databind.ObjectMapper;
//
import com.nisum.ascend.inventory.dto.Item;
//
import com.nisum.ascend.inventory.dto.Order;
//
import com.nisum.ascend.inventory.dto.WareHouseOrder;
//
import lombok.extern.slf4j.Slf4j;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.beans.factory.annotation.Qualifier;
//
import org.springframework.boot.context.event.ApplicationStartedEvent;
//
import org.springframework.context.event.EventListener;
//
import org.springframework.stereotype.Component;
//
import reactor.kafka.receiver.KafkaReceiver;
//
//
import java.util.List;
//
//
@Component
//
@Slf4j
//
public class KafkaListenerService {
//
//
@Autowired
//
@Qualifier("kafkaWarehouseOrderReceiver")
//
private KafkaReceiver<String, String> kafkaWarehouseOrderReceiver;
//
//
@Autowired
//
@Qualifier("kafkaOrderReceiver")
//
private KafkaReceiver<String, String> kafkaOrderReceiver;
//
//
@Autowired
//
private ProductService productService;
//
//
@EventListener(ApplicationStartedEvent.class)
//
public void consumeWarehouseOrderStatus() {
//
kafkaWarehouseOrderReceiver.receive()
//
.doOnNext(record -> log.info("record: {}", record))
//
.doOnNext(record -> onWarehouseOrderStatusReceived(record.value()))
//
.doOnError(throwable -> System.out.println(throwable.getMessage()))
//
.subscribe();
//
}
//
//
private void onWarehouseOrderStatusReceived(String warehouseOrderString) {
//
try {
//
ObjectMapper objectMapper = new ObjectMapper();
//
WareHouseOrder warehouseOrder = objectMapper.readValue(warehouseOrderString, WareHouseOrder.class);
//
List<Item> itemList = warehouseOrder.getOrderItems();
//
String status = warehouseOrder.getStatus();
//
log.info("Received this data: {}", warehouseOrder);
//
log.info("recieved this list: {}", itemList);
//
for (Item item : itemList) {
//
productService.updateProductInventoryBySkuWareHouse(item.getItemSku(), status, item.getItemQuantity()).subscribe();
//
}
//
} catch (Exception e) {
//
log.error("error", e);
//
}
//
}
//
//
//
@EventListener(ApplicationStartedEvent.class)
//
public void consumeOrderStatus() {
//
kafkaOrderReceiver.receive()
//
.doOnNext(record -> log.info("record: {}", record))
//
.doOnNext(record -> onOrderStatusReceived(record.value()))
//
.doOnError(throwable -> System.out.println(throwable.getMessage()))
//
.subscribe();
//
}
//
//
private void onOrderStatusReceived(String orderString) {
//
try {
//
ObjectMapper objectMapper = new ObjectMapper();
//
Order order = objectMapper.readValue(orderString, Order.class);
//
List<Item> itemList = order.getOrderItems();
//
String status = order.getOrderStatus();
//
log.info("Received this data: {}", order);
//
log.info("recieved this list: {}", itemList);
//
for (Item item : itemList) {
//
productService.updateProductInventoryBySkuOrder(item.getItemSku(), status, item.getItemQuantity()).block();
//
}
//
//
} catch (Exception e) {
//
log.error("error", e);
//
}
//
}
//
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment