Commit 97fb7a23 authored by Ben Anderson's avatar Ben Anderson

Resolved merge conflict and updated deployment file

parents ddab3806 15e00df8
...@@ -6,6 +6,7 @@ spec: ...@@ -6,6 +6,7 @@ spec:
replicas: 2 replicas: 2
selector: selector:
matchLabels: matchLabels:
<<<<<<< HEAD
app: afp-products app: afp-products
template: template:
metadata: metadata:
...@@ -17,10 +18,10 @@ spec: ...@@ -17,10 +18,10 @@ spec:
image: nexus.mynisum.com/parameterbuild:0 image: nexus.mynisum.com/parameterbuild:0
imagePullPolicy: Always imagePullPolicy: Always
ports: ports:
- containerPort: 8080 - containerPort: 8083
env: env:
- name: PORT - name: PORT
value: "8080" value: "8083"
- name: MONGOCONNECTION - name: MONGOCONNECTION
valueFrom: valueFrom:
secretKeyRef: secretKeyRef:
...@@ -36,7 +37,7 @@ metadata: ...@@ -36,7 +37,7 @@ metadata:
spec: spec:
type: LoadBalancer type: LoadBalancer
ports: ports:
- port: 8080 - port: 8083
targetPort: 8080 targetPort: 8083
selector: selector:
app: afp-products app: afp-products
...@@ -29,7 +29,10 @@ ...@@ -29,7 +29,10 @@
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
......
package com.nisum.ascend.inventory.configuration;
import com.nisum.ascend.inventory.dto.Order;
import com.nisum.ascend.inventory.dto.WareHouseOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class KafkaReceiverConfig {
@Value("${kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Bean("kafkaWarehouseOrderReceiver")
public KafkaReceiver<String, String> kafkaWarehouseOrderEventReceiver(
@Value("${kafka.WAREHOUSETOPIC.input}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(WarehouseOrderEventReceiverConfig());
receiverOptions.maxCommitAttempts(3);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(posLogTopic)));
}
private Map<String, Object> WarehouseOrderEventReceiverConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
@Bean("kafkaOrderReceiver")
public KafkaReceiver<String, String> kafkaOrderEventReceiver(
@Value("${kafka.ORDERTOPIC.input}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(OrderEventReceiverConfig());
receiverOptions.maxCommitAttempts(3);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(posLogTopic)));
}
private Map<String, Object> OrderEventReceiverConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
}
\ No newline at end of file
package com.nisum.ascend.inventory.dto;
import lombok.Data;
@Data
public class CustomerAddress {
private String street;
private String city;
private String state;
private String zip;
}
package com.nisum.ascend.inventory.dto;
import lombok.Data;
@Data
public class Item {
private String itemId;
private String itemName;
private String itemSku;
private int itemQuantity;
private double itemPrice;
}
package com.nisum.ascend.inventory.dto;
import lombok.Data;
import java.time.LocalDate;
import java.util.List;
@Data
public class Order {
private String id;
private long orderUpdatedAt;
private long orderCreatedAt;
private String customerId;
private String customerEmailAddress;
private String orderStatus;
List<Item> orderItems;
private String orderTrackingCode;
private CustomerAddress customerAddress;
}
package com.nisum.ascend.inventory.dto;
import com.nisum.ascend.inventory.dto.Item;
import lombok.Data;
import java.util.Date;
import java.util.List;
@Data
public class WareHouseOrder {
private String id;
private String orderId;
private String status;
private Date createdAt;
private Date modifiedAt;
private List<Item> orderItems;
private String address;
}
package com.nisum.ascend.inventory.repository; package com.nisum.ascend.inventory.repository;
import com.nisum.ascend.inventory.model.Product; import com.nisum.ascend.inventory.model.Product;
import org.springframework.data.mongodb.repository.Query; import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
......
package com.nisum.ascend.inventory.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nisum.ascend.inventory.dto.Item;
import com.nisum.ascend.inventory.dto.Order;
import com.nisum.ascend.inventory.dto.WareHouseOrder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.kafka.receiver.KafkaReceiver;
import java.util.List;
@Component
@Slf4j
public class KafkaListenerService {
@Autowired
@Qualifier("kafkaWarehouseOrderReceiver")
private KafkaReceiver<String, String> kafkaWarehouseOrderReceiver;
@Autowired
@Qualifier("kafkaOrderReceiver")
private KafkaReceiver<String, String> kafkaOrderReceiver;
@Autowired
private ProductService productService;
@EventListener(ApplicationStartedEvent.class)
public void consumeWarehouseOrderStatus() {
kafkaWarehouseOrderReceiver.receive()
.doOnNext(record -> log.info("record: {}", record))
.doOnNext(record -> onWarehouseOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
private void onWarehouseOrderStatusReceived(String warehouseOrderString) {
try {
ObjectMapper objectMapper = new ObjectMapper();
WareHouseOrder warehouseOrder = objectMapper.readValue(warehouseOrderString, WareHouseOrder.class);
List<Item> itemList = warehouseOrder.getOrderItems();
String status = warehouseOrder.getStatus();
log.info("Received this data: {}", warehouseOrder);
log.info("recieved this list: {}", itemList);
for (Item item : itemList) {
productService.updateProductInventoryBySkuWareHouse(item.getItemSku(), status, item.getItemQuantity()).subscribe();
}
} catch (Exception e) {
log.error("error", e);
}
}
@EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() {
kafkaOrderReceiver.receive()
.doOnNext(record -> log.info("record: {}", record))
.doOnNext(record -> onOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
private void onOrderStatusReceived(String orderString) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Order order = objectMapper.readValue(orderString, Order.class);
List<Item> itemList = order.getOrderItems();
String status = order.getOrderStatus();
log.info("Received this data: {}", order);
log.info("recieved this list: {}", itemList);
for (Item item : itemList) {
productService.updateProductInventoryBySkuOrder(item.getItemSku(), status, item.getItemQuantity()).block();
}
} catch (Exception e) {
log.error("error", e);
}
}
}
package com.nisum.ascend.inventory.service; package com.nisum.ascend.inventory.service;
import com.nisum.ascend.inventory.dto.Item;
import com.nisum.ascend.inventory.exception.ResourceNotFoundException; import com.nisum.ascend.inventory.exception.ResourceNotFoundException;
import com.nisum.ascend.inventory.model.Product; import com.nisum.ascend.inventory.model.Product;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -54,4 +56,39 @@ public class ProductService { ...@@ -54,4 +56,39 @@ public class ProductService {
return productRepository.save(product); return productRepository.save(product);
} }
public Mono<Product> updateProductInventoryBySkuWareHouse(String sku, String status, int itemQuantity) {
System.out.printf("sku = %s, status = %s, itemquanity = %d \n", sku, status, itemQuantity);
if (status.equals("CANCELLED")) {
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setBlockedStock(dbProduct.getBlockedStock() - itemQuantity);
dbProduct.setAvailableStock(dbProduct.getAvailableStock() + itemQuantity);
return productRepository.save(dbProduct);
});
}
else if (status.equals("FULFILLED")){
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setBlockedStock(dbProduct.getBlockedStock() - itemQuantity);
dbProduct.setFulfilledStock(dbProduct.getFulfilledStock() + itemQuantity);
return productRepository.save(dbProduct);
});
}
else return productRepository.findBySku(sku);
}
public Mono<Product> updateProductInventoryBySkuOrder(String sku, String status, int itemQuantity) {
System.out.printf("sku = %s, status = %s, itemquanity = %d \n", sku, status, itemQuantity);
// should only process RECEIVED
if (status.equals("RECEIVED")) {
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() - itemQuantity);
dbProduct.setBlockedStock(dbProduct.getBlockedStock() + itemQuantity);
return productRepository.save(dbProduct);
});
}
else return productRepository.findBySku(sku);
}
} }
server.port=8081 server.port=8083
spring.data.mongodb.uri=${MONGOCONNECTION} spring.data.mongodb.uri=${MONGOCONNECTION}
spring.data.mongodb.database=products-promotions-DB
kafka.consumer.acks=all
kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.group-id=PRODUCT
kafka.WAREHOUSETOPIC.input=WMOS_ORDER_UPDATE
kafka.ORDERTOPIC.input=OMS_ORDER_UPDATE
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment