Commit 96b18f03 authored by Khai Yuan ​Liew's avatar Khai Yuan ​Liew

Merge branch 'AFP-66-kafka-consumers' into 'dev'

Afp 66 kafka consumers

See merge request !12
parents b3ee5e26 b96ca2f5
apiVersion: apps/v1
kind: Deployment
metadata:
name: inventory-backend-deployment
spec:
replicas: 2
selector:
matchLabels:
app: inventory
template:
metadata:
labels:
app: inventory
spec:
containers:
- name: inventory-container
image: rahulwagh17/jhooq-docker-demo:jhooq-docker-demo
ports:
- containerPort: 8080
env:
- name: PORT
value: "8080"
---
apiVersion: v1
kind: Service
metadata:
name: inventory-backend-service
spec:
type: LoadBalancer
ports:
- port: 8080
targetPort: 8080
selector:
app: inventory
\ No newline at end of file
......@@ -29,7 +29,10 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
......
package com.nisum.ascend.inventory.configuration;
import com.nisum.ascend.inventory.dto.Order;
import com.nisum.ascend.inventory.dto.WareHouseOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class KafkaReceiverConfig {
@Value("${kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Bean("kafkaWarehouseOrderReceiver")
public KafkaReceiver<String, String> kafkaWarehouseOrderEventReceiver(
@Value("${kafka.WAREHOUSETOPIC.input}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(WarehouseOrderEventReceiverConfig());
receiverOptions.maxCommitAttempts(3);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(posLogTopic)));
}
private Map<String, Object> WarehouseOrderEventReceiverConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
@Bean("kafkaOrderReceiver")
public KafkaReceiver<String, String> kafkaOrderEventReceiver(
@Value("${kafka.ORDERTOPIC.input}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(OrderEventReceiverConfig());
receiverOptions.maxCommitAttempts(3);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(posLogTopic)));
}
private Map<String, Object> OrderEventReceiverConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
}
\ No newline at end of file
package com.nisum.ascend.inventory.dto;
import lombok.Data;
@Data
public class CustomerAddress {
private String street;
private String city;
private String state;
private String zip;
}
package com.nisum.ascend.inventory.dto;
import lombok.Data;
@Data
public class Item {
private String itemId;
private String itemName;
private String itemSku;
private int itemQuantity;
private double itemPrice;
}
package com.nisum.ascend.inventory.dto;
import lombok.Data;
import java.time.LocalDate;
import java.util.List;
@Data
public class Order {
private String id;
private long orderUpdatedAt;
private long orderCreatedAt;
private String customerId;
private String customerEmailAddress;
private String orderStatus;
List<Item> orderItems;
private String orderTrackingCode;
private CustomerAddress customerAddress;
}
package com.nisum.ascend.inventory.dto;
import com.nisum.ascend.inventory.dto.Item;
import lombok.Data;
import java.util.Date;
import java.util.List;
@Data
public class WareHouseOrder {
private String id;
private String orderId;
private String status;
private Date createdAt;
private Date modifiedAt;
private List<Item> orderItems;
private String address;
}
package com.nisum.ascend.inventory.repository;
import com.nisum.ascend.inventory.model.Product;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
......
package com.nisum.ascend.inventory.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nisum.ascend.inventory.dto.Item;
import com.nisum.ascend.inventory.dto.Order;
import com.nisum.ascend.inventory.dto.WareHouseOrder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.kafka.receiver.KafkaReceiver;
import java.util.List;
@Component
@Slf4j
public class KafkaListenerService {
@Autowired
@Qualifier("kafkaWarehouseOrderReceiver")
private KafkaReceiver<String, String> kafkaWarehouseOrderReceiver;
@Autowired
@Qualifier("kafkaOrderReceiver")
private KafkaReceiver<String, String> kafkaOrderReceiver;
@Autowired
private ProductService productService;
@EventListener(ApplicationStartedEvent.class)
public void consumeWarehouseOrderStatus() {
kafkaWarehouseOrderReceiver.receive()
.doOnNext(record -> log.info("record: {}", record))
.doOnNext(record -> onWarehouseOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
private void onWarehouseOrderStatusReceived(String warehouseOrderString) {
try {
ObjectMapper objectMapper = new ObjectMapper();
WareHouseOrder warehouseOrder = objectMapper.readValue(warehouseOrderString, WareHouseOrder.class);
List<Item> itemList = warehouseOrder.getOrderItems();
String status = warehouseOrder.getStatus();
log.info("Received this data: {}", warehouseOrder);
log.info("recieved this list: {}", itemList);
for (Item item : itemList) {
productService.updateProductInventoryBySkuWareHouse(item.getItemSku(), status, item.getItemQuantity()).subscribe();
}
} catch (Exception e) {
log.error("error", e);
}
}
@EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() {
kafkaOrderReceiver.receive()
.doOnNext(record -> log.info("record: {}", record))
.doOnNext(record -> onOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
private void onOrderStatusReceived(String orderString) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Order order = objectMapper.readValue(orderString, Order.class);
List<Item> itemList = order.getOrderItems();
String status = order.getOrderStatus();
log.info("Received this data: {}", order);
log.info("recieved this list: {}", itemList);
for (Item item : itemList) {
productService.updateProductInventoryBySkuOrder(item.getItemSku(), status, item.getItemQuantity()).block();
}
} catch (Exception e) {
log.error("error", e);
}
}
}
package com.nisum.ascend.inventory.service;
import com.nisum.ascend.inventory.dto.Item;
import com.nisum.ascend.inventory.exception.ResourceNotFoundException;
import com.nisum.ascend.inventory.model.Product;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -54,4 +56,58 @@ public class ProductService {
return productRepository.save(product);
}
public Mono<Product> updateProductInventoryBySkuWareHouse(String sku, String status, int itemQuantity) {
System.out.printf("sku = %s, status = %s, itemquanity = %d \n", sku, status, itemQuantity);
if (status.equals("CANCELLED")) {
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() + itemQuantity);
return productRepository.save(dbProduct);
});
} else if (status.equals("RECEIVED")) {
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setBlockedStock(dbProduct.getBlockedStock() + itemQuantity);
return productRepository.save(dbProduct);
});
} else { // if status.equals("FULFILLED")
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() - itemQuantity);
dbProduct.setFulfilledStock(dbProduct.getFulfilledStock() + itemQuantity);
dbProduct.setBlockedStock(dbProduct.getBlockedStock() - itemQuantity);
return productRepository.save(dbProduct);
});
}
}
public Mono<Product> updateProductInventoryBySkuOrder(String sku, String status, int itemQuantity) {
System.out.printf("sku = %s, status = %s, itemquanity = %d \n", sku, status, itemQuantity);
if (status.equals("CANCELED")){
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() + itemQuantity);
dbProduct.setBlockedStock(dbProduct.getBlockedStock() - itemQuantity);
return productRepository.save(dbProduct);
});
}
else if (status.equals("RECEIVED")){
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setBlockedStock(dbProduct.getBlockedStock() + itemQuantity);
return productRepository.save(dbProduct);
});
}
else { // if status.equals("FULFILLED")
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
return productRepository.save(dbProduct);
});
}
}
}
server.port=8080
spring.data.mongodb.uri=mongodb+srv://admin:${db.password}@inventory-promotions.d4nfz.mongodb\
.net/${spring.data.mongodb.database}?retryWrites=true&w=majority
spring.data.mongodb.database=products-promotions-DB
kafka.consumer.acks=all
kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.group-id=PRODUCT
kafka.WAREHOUSETOPIC.input=WMOS_ORDER_UPDATE
kafka.ORDERTOPIC.input=OMS_ORDER_UPDATE
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment