Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
I
inventory-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Ascend
inventory-service
Commits
f5798f0e
Commit
f5798f0e
authored
May 19, 2021
by
Ben Anderson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Deployment 58
parent
801065ba
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
84 additions
and
84 deletions
+84
-84
deployment.yaml
deployment.yaml
+1
-1
KafkaListenerService.java
.../nisum/ascend/inventory/service/KafkaListenerService.java
+83
-83
No files found.
deployment.yaml
View file @
f5798f0e
...
@@ -14,7 +14,7 @@ spec:
...
@@ -14,7 +14,7 @@ spec:
spec
:
spec
:
containers
:
containers
:
-
name
:
afp-prods-container
-
name
:
afp-prods-container
image
:
nexus.mynisum.com/afp-prods:5
8
image
:
nexus.mynisum.com/afp-prods:5
9
imagePullPolicy
:
Always
imagePullPolicy
:
Always
ports
:
ports
:
-
containerPort
:
8083
-
containerPort
:
8083
...
...
src/main/java/com/nisum/ascend/inventory/service/KafkaListenerService.java
View file @
f5798f0e
//
package com.nisum.ascend.inventory.service;
package
com
.
nisum
.
ascend
.
inventory
.
service
;
//
//
import com.fasterxml.jackson.databind.ObjectMapper;
import
com.fasterxml.jackson.databind.ObjectMapper
;
//
import com.nisum.ascend.inventory.dto.Item;
import
com.nisum.ascend.inventory.dto.Item
;
//
import com.nisum.ascend.inventory.dto.Order;
import
com.nisum.ascend.inventory.dto.Order
;
//
import com.nisum.ascend.inventory.dto.WareHouseOrder;
import
com.nisum.ascend.inventory.dto.WareHouseOrder
;
//
import lombok.extern.slf4j.Slf4j;
import
lombok.extern.slf4j.Slf4j
;
//
import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.beans.factory.annotation.Autowired
;
//
import org.springframework.beans.factory.annotation.Qualifier;
import
org.springframework.beans.factory.annotation.Qualifier
;
//
import org.springframework.boot.context.event.ApplicationStartedEvent;
import
org.springframework.boot.context.event.ApplicationStartedEvent
;
//
import org.springframework.context.event.EventListener;
import
org.springframework.context.event.EventListener
;
//
import org.springframework.stereotype.Component;
import
org.springframework.stereotype.Component
;
//
import reactor.kafka.receiver.KafkaReceiver;
import
reactor.kafka.receiver.KafkaReceiver
;
//
//
import java.util.List;
import
java.util.List
;
//
//
@Component
@Component
//
@Slf4j
@Slf4j
//
public class KafkaListenerService {
public
class
KafkaListenerService
{
//
//
@Autowired
@Autowired
//
@Qualifier("kafkaWarehouseOrderReceiver")
@Qualifier
(
"kafkaWarehouseOrderReceiver"
)
//
private KafkaReceiver<String, String> kafkaWarehouseOrderReceiver;
private
KafkaReceiver
<
String
,
String
>
kafkaWarehouseOrderReceiver
;
//
//
@Autowired
@Autowired
//
@Qualifier("kafkaOrderReceiver")
@Qualifier
(
"kafkaOrderReceiver"
)
//
private KafkaReceiver<String, String> kafkaOrderReceiver;
private
KafkaReceiver
<
String
,
String
>
kafkaOrderReceiver
;
//
//
@Autowired
@Autowired
//
private ProductService productService;
private
ProductService
productService
;
//
//
@EventListener(ApplicationStartedEvent.class)
@EventListener
(
ApplicationStartedEvent
.
class
)
//
public void consumeWarehouseOrderStatus() {
public
void
consumeWarehouseOrderStatus
()
{
//
kafkaWarehouseOrderReceiver.receive()
kafkaWarehouseOrderReceiver
.
receive
()
//
.doOnNext(record -> log.info("record: {}", record))
.
doOnNext
(
record
->
log
.
info
(
"record: {}"
,
record
))
//
.doOnNext(record -> onWarehouseOrderStatusReceived(record.value()))
.
doOnNext
(
record
->
onWarehouseOrderStatusReceived
(
record
.
value
()))
//
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
.
getMessage
()))
//
.subscribe();
.
subscribe
();
//
}
}
//
//
private void onWarehouseOrderStatusReceived(String warehouseOrderString) {
private
void
onWarehouseOrderStatusReceived
(
String
warehouseOrderString
)
{
//
try {
try
{
//
ObjectMapper objectMapper = new ObjectMapper();
ObjectMapper
objectMapper
=
new
ObjectMapper
();
//
WareHouseOrder warehouseOrder = objectMapper.readValue(warehouseOrderString, WareHouseOrder.class);
WareHouseOrder
warehouseOrder
=
objectMapper
.
readValue
(
warehouseOrderString
,
WareHouseOrder
.
class
);
//
List<Item> itemList = warehouseOrder.getOrderItems();
List
<
Item
>
itemList
=
warehouseOrder
.
getOrderItems
();
//
String status = warehouseOrder.getStatus();
String
status
=
warehouseOrder
.
getStatus
();
//
log.info("Received this data: {}", warehouseOrder);
log
.
info
(
"Received this data: {}"
,
warehouseOrder
);
//
log.info("recieved this list: {}", itemList);
log
.
info
(
"recieved this list: {}"
,
itemList
);
//
for (Item item : itemList) {
for
(
Item
item
:
itemList
)
{
//
productService.updateProductInventoryBySkuWareHouse(item.getItemSku(), status, item.getItemQuantity()).subscribe();
productService
.
updateProductInventoryBySkuWareHouse
(
item
.
getItemSku
(),
status
,
item
.
getItemQuantity
()).
subscribe
();
//
}
}
//
} catch (Exception e) {
}
catch
(
Exception
e
)
{
//
log.error("error", e);
log
.
error
(
"error"
,
e
);
//
}
}
//
}
}
//
//
//
@EventListener(ApplicationStartedEvent.class)
@EventListener
(
ApplicationStartedEvent
.
class
)
//
public void consumeOrderStatus() {
public
void
consumeOrderStatus
()
{
//
kafkaOrderReceiver.receive()
kafkaOrderReceiver
.
receive
()
//
.doOnNext(record -> log.info("record: {}", record))
.
doOnNext
(
record
->
log
.
info
(
"record: {}"
,
record
))
//
.doOnNext(record -> onOrderStatusReceived(record.value()))
.
doOnNext
(
record
->
onOrderStatusReceived
(
record
.
value
()))
//
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
.
getMessage
()))
//
.subscribe();
.
subscribe
();
//
}
}
//
//
private void onOrderStatusReceived(String orderString) {
private
void
onOrderStatusReceived
(
String
orderString
)
{
//
try {
try
{
//
ObjectMapper objectMapper = new ObjectMapper();
ObjectMapper
objectMapper
=
new
ObjectMapper
();
//
Order order = objectMapper.readValue(orderString, Order.class);
Order
order
=
objectMapper
.
readValue
(
orderString
,
Order
.
class
);
//
List<Item> itemList = order.getOrderItems();
List
<
Item
>
itemList
=
order
.
getOrderItems
();
//
String status = order.getOrderStatus();
String
status
=
order
.
getOrderStatus
();
//
log.info("Received this data: {}", order);
log
.
info
(
"Received this data: {}"
,
order
);
//
log.info("recieved this list: {}", itemList);
log
.
info
(
"recieved this list: {}"
,
itemList
);
//
for (Item item : itemList) {
for
(
Item
item
:
itemList
)
{
//
productService.updateProductInventoryBySkuOrder(item.getItemSku(), status, item.getItemQuantity()).block();
productService
.
updateProductInventoryBySkuOrder
(
item
.
getItemSku
(),
status
,
item
.
getItemQuantity
()).
block
();
//
}
}
//
//
} catch (Exception e) {
}
catch
(
Exception
e
)
{
//
log.error("error", e);
log
.
error
(
"error"
,
e
);
//
}
}
//
}
}
//
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment