Commit 6859bdf4 authored by John Lam's avatar John Lam

working kafka consumers for warehouse and ordermanagement

parent 5034fe71
package com.nisum.ascend.inventory.configuration;
import com.nisum.ascend.inventory.dto.Order;
import com.nisum.ascend.inventory.dto.WareHouseOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
......@@ -26,16 +27,35 @@ public class KafkaReceiverConfig {
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Bean
@Bean("kafkaWarehouseOrderReceiver")
public KafkaReceiver<String, String> kafkaWarehouseOrderEventReceiver(
@Value("${kafka.WAREHOUSETOPIC.input}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(WarehouseOrderEventReceiverConfig());
receiverOptions.maxCommitAttempts(3);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(posLogTopic)));
}
private Map<String, Object> WarehouseOrderEventReceiverConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
@Bean("kafkaOrderReceiver")
public KafkaReceiver<String, String> kafkaOrderEventReceiver(
@Value("${kafka.topic.input}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(receiverConfig());
@Value("${kafka.ORDERTOPIC.input}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(OrderEventReceiverConfig());
receiverOptions.maxCommitAttempts(3);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(posLogTopic)));
}
private Map<String, Object> receiverConfig() {
private Map<String, Object> OrderEventReceiverConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
......@@ -43,4 +63,5 @@ public class KafkaReceiverConfig {
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
}
\ No newline at end of file
......@@ -6,7 +6,7 @@ import lombok.Data;
public class Item {
private String itemId;
private String itemName;
private int itemSku;
private String itemSku;
private int itemQuantity;
private double itemPrice;
}
......
package com.nisum.ascend.inventory.dto;
import lombok.Data;
import java.time.LocalDate;
import java.util.List;
@Data
public class Order {
private String id;
private String orderUpdatedAt;
private String orderCreatedAt;
private String customerId;
private String customerEmailAddress;
private String orderStatus;
List<Item> orderItems;
private String orderTrackingCode;
private String customerAddress;
}
......@@ -2,10 +2,11 @@ package com.nisum.ascend.inventory.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nisum.ascend.inventory.dto.Item;
import com.nisum.ascend.inventory.dto.Order;
import com.nisum.ascend.inventory.dto.WareHouseOrder;
import com.sun.tools.javac.jvm.Items;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
......@@ -18,14 +19,19 @@ import java.util.List;
public class KafkaListenerService {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@Qualifier("kafkaWarehouseOrderReceiver")
private KafkaReceiver<String, String> kafkaWarehouseOrderReceiver;
@Autowired
@Qualifier("kafkaOrderReceiver")
private KafkaReceiver<String, String> kafkaOrderReceiver;
@Autowired
private ProductService productService;
@EventListener(ApplicationStartedEvent.class)
public void consumeWarehouseOrderStatus() {
kafkaReceiver.receive()
kafkaWarehouseOrderReceiver.receive()
.doOnNext(record -> log.info("record: {}", record))
.doOnNext(record -> onWarehouseOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
......@@ -34,14 +40,43 @@ public class KafkaListenerService {
private void onWarehouseOrderStatusReceived(String warehouseOrderString) {
try {
System.out.print(warehouseOrderString);
ObjectMapper objectMapper = new ObjectMapper();
WareHouseOrder warehouseOrder = objectMapper.readValue(warehouseOrderString, WareHouseOrder.class);
List<Item> itemList = warehouseOrder.getOrderItems();
String status = warehouseOrder.getStatus();
log.info("Received this data: {}", warehouseOrder);
log.info("recieved this list: {}", itemList);
for (Item item : itemList) {
productService.updateProductInventoryBySku(item.getItemSku(), status, item.getItemQuantity()).block();
}
} catch (Exception e) {
log.error("error", e);
}
}
@EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() {
kafkaOrderReceiver.receive()
.doOnNext(record -> log.info("record: {}", record))
.doOnNext(record -> onOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
private void onOrderStatusReceived(String orderString) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Order order = objectMapper.readValue(orderString, Order.class);
List<Item> itemList = order.getOrderItems();
String status = order.getOrderStatus();
log.info("Received this data: {}", order);
log.info("recieved this list: {}", itemList);
for (Item item : itemList) {
productService.updateProductInventoryBySku(String.valueOf(item.getItemSku()),warehouseOrder.getStatus(), item.getItemQuantity());
productService.updateProductInventoryBySku(item.getItemSku(), status, item.getItemQuantity()).block();
}
// log.info("Received this data: {}", warehouseOrder);
} catch (Exception e) {
log.error("error", e);
}
......
......@@ -40,10 +40,17 @@ public class ProductService {
public Mono<Product> updateProductInventoryBySku(String sku, String status, int itemQuantity) {
int stock = itemQuantity;
if (status == "CANCELLED") {
if (status.equals("CANCELLED")) {
stock *= -1;
}
} else if (status.equals("RECEIVED")) {
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setBlockedStock(dbProduct.getBlockedStock() + itemQuantity);
return productRepository.save(dbProduct);
});
}
int finalStock = stock;
System.out.printf("sku = %s, status = %s, itemquanity = %d, stock = %d, finalStock = %d \n", sku, status, itemQuantity, stock, finalStock);
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() - finalStock);
......
......@@ -6,4 +6,5 @@ spring.data.mongodb.database=products-promotions-DB
kafka.consumer.acks=all
kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.group-id=PRODUCT
kafka.topic.input=my-messages
kafka.WAREHOUSETOPIC.input=my-messages
kafka.ORDERTOPIC.input=order-topic
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment