Commit 59e261bd authored by John Lam's avatar John Lam

add kafka warehouse order status

parent 39693637
...@@ -25,10 +25,10 @@ ...@@ -25,10 +25,10 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId> <artifactId>spring-boot-starter-webflux</artifactId>
</dependency> </dependency>
<!-- <dependency>--> <dependency>
<!-- <groupId>org.springframework.kafka</groupId>--> <groupId>org.springframework.kafka</groupId>
<!-- <artifactId>spring-kafka</artifactId>--> <artifactId>spring-kafka</artifactId>
<!-- </dependency>--> </dependency>
<dependency> <dependency>
<groupId>io.projectreactor.kafka</groupId> <groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId> <artifactId>reactor-kafka</artifactId>
......
package com.nisum.ascend.inventory.configuration; package com.nisum.ascend.inventory.configuration;
import com.nisum.ascend.inventory.dto.WareHouseOrder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
...@@ -8,6 +9,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; ...@@ -8,6 +9,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverOptions;
...@@ -32,6 +34,7 @@ public class KafkaReceiverConfig { ...@@ -32,6 +34,7 @@ public class KafkaReceiverConfig {
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator) return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(posLogTopic))); .subscription(Collections.singleton(posLogTopic)));
} }
private Map<String, Object> receiverConfig() { private Map<String, Object> receiverConfig() {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
......
package com.nisum.ascend.inventory.dto;
import lombok.Data;
@Data
public class Item {
private String itemId;
private String itemName;
private int itemSku;
private int itemQuantity;
private double itemPrice;
}
package com.nisum.ascend.inventory.dto;
import com.nisum.ascend.inventory.dto.Item;
import lombok.Data;
import java.util.List;
@Data
public class WareHouseOrder {
private String id;
private String orderId;
private String status;
//LocalDateTime
private String createdAt;
private String modifiedAt;
private List<Item> orderItems;
private String address;
}
package com.nisum.ascend.inventory.repository; package com.nisum.ascend.inventory.repository;
import com.nisum.ascend.inventory.dto.ProductDto;
import com.nisum.ascend.inventory.model.Product; import com.nisum.ascend.inventory.model.Product;
import org.springframework.data.mongodb.repository.Query; import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
......
package com.nisum.ascend.inventory.service; package com.nisum.ascend.inventory.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nisum.ascend.inventory.dto.Item;
import com.nisum.ascend.inventory.dto.WareHouseOrder;
import com.sun.tools.javac.jvm.Items;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.KafkaReceiver;
import java.util.List;
@Component @Component
@Slf4j @Slf4j
public class KafkaListenerService { public class KafkaListenerService {
...@@ -16,11 +21,31 @@ public class KafkaListenerService { ...@@ -16,11 +21,31 @@ public class KafkaListenerService {
@Autowired @Autowired
private KafkaReceiver<String, String> kafkaReceiver; private KafkaReceiver<String, String> kafkaReceiver;
@Autowired
private ProductService productService;
@EventListener(ApplicationStartedEvent.class) @EventListener(ApplicationStartedEvent.class)
public void consumeOrderMessages() { public void consumeWarehouseOrderStatus() {
kafkaReceiver.receive() kafkaReceiver.receive()
.doOnNext(record -> System.out.println(record)) .doOnNext(record -> log.info("record: {}", record))
.doOnNext(record -> onWarehouseOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe(); .subscribe();
} }
private void onWarehouseOrderStatusReceived(String warehouseOrderString) {
try {
System.out.print(warehouseOrderString);
ObjectMapper objectMapper = new ObjectMapper();
WareHouseOrder warehouseOrder = objectMapper.readValue(warehouseOrderString, WareHouseOrder.class);
List<Item> itemList = warehouseOrder.getOrderItems();
for (Item item : itemList) {
productService.updateProductInventoryBySku(String.valueOf(item.getItemSku()),warehouseOrder.getStatus(), item.getItemQuantity());
}
// log.info("Received this data: {}", warehouseOrder);
} catch (Exception e) {
log.error("error", e);
}
}
} }
package com.nisum.ascend.inventory.service; package com.nisum.ascend.inventory.service;
import com.nisum.ascend.inventory.dto.ProductDto;
import com.nisum.ascend.inventory.dto.Item;
import com.nisum.ascend.inventory.exception.ResourceNotFoundException; import com.nisum.ascend.inventory.exception.ResourceNotFoundException;
import com.nisum.ascend.inventory.model.Product; import com.nisum.ascend.inventory.model.Product;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -36,6 +37,22 @@ public class ProductService { ...@@ -36,6 +37,22 @@ public class ProductService {
return productRepository.findAll(); return productRepository.findAll();
} }
public Mono<Product> updateProductInventoryBySku(String sku, String status, int itemQuantity) {
int stock = itemQuantity;
if (status == "CANCELLED") {
stock *= -1;
}
int finalStock = stock;
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() - finalStock);
dbProduct.setFulfilledStock(dbProduct.getFulfilledStock() + finalStock);
dbProduct.setBlockedStock(dbProduct.getBlockedStock() - itemQuantity);
return productRepository.save(dbProduct);
});
}
public Mono<Product> updateProductBySku(String sku, Product product){ public Mono<Product> updateProductBySku(String sku, Product product){
return productRepository.findBySku(sku) return productRepository.findBySku(sku)
.flatMap(dbProduct -> { .flatMap(dbProduct -> {
......
package com.nisum.ascend.inventory.controller; package com.nisum.ascend.inventory.controller;
import com.nisum.ascend.inventory.dto.ProductDto;
import com.nisum.ascend.inventory.model.Product; import com.nisum.ascend.inventory.model.Product;
import com.nisum.ascend.inventory.repository.ProductRepository; import com.nisum.ascend.inventory.repository.ProductRepository;
import com.nisum.ascend.inventory.service.ProductService; import com.nisum.ascend.inventory.service.ProductService;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment