Commit bc3b1318 authored by John Lam's avatar John Lam

split kafka consumer methods into order and warehouse

parent 385a1be5
package com.nisum.ascend.inventory.dto;
import lombok.Data;
@Data
public class CustomerAddress {
private String street;
private String city;
private String state;
private String zip;
}
...@@ -8,12 +8,12 @@ import java.util.List; ...@@ -8,12 +8,12 @@ import java.util.List;
@Data @Data
public class Order { public class Order {
private String id; private String id;
private String orderUpdatedAt; private long orderUpdatedAt;
private String orderCreatedAt; private long orderCreatedAt;
private String customerId; private String customerId;
private String customerEmailAddress; private String customerEmailAddress;
private String orderStatus; private String orderStatus;
List<Item> orderItems; List<Item> orderItems;
private String orderTrackingCode; private String orderTrackingCode;
private String customerAddress; private CustomerAddress customerAddress;
} }
...@@ -3,6 +3,7 @@ package com.nisum.ascend.inventory.dto; ...@@ -3,6 +3,7 @@ package com.nisum.ascend.inventory.dto;
import com.nisum.ascend.inventory.dto.Item; import com.nisum.ascend.inventory.dto.Item;
import lombok.Data; import lombok.Data;
import java.util.Date;
import java.util.List; import java.util.List;
@Data @Data
...@@ -10,11 +11,9 @@ public class WareHouseOrder { ...@@ -10,11 +11,9 @@ public class WareHouseOrder {
private String id; private String id;
private String orderId; private String orderId;
private String status; private String status;
//LocalDateTime private Date createdAt;
private String createdAt; private Date modifiedAt;
private String modifiedAt;
private List<Item> orderItems; private List<Item> orderItems;
private String address; private String address;
} }
...@@ -47,9 +47,8 @@ public class KafkaListenerService { ...@@ -47,9 +47,8 @@ public class KafkaListenerService {
log.info("Received this data: {}", warehouseOrder); log.info("Received this data: {}", warehouseOrder);
log.info("recieved this list: {}", itemList); log.info("recieved this list: {}", itemList);
for (Item item : itemList) { for (Item item : itemList) {
productService.updateProductInventoryBySku(item.getItemSku(), status, item.getItemQuantity()).block(); productService.updateProductInventoryBySkuWareHouse(item.getItemSku(), status, item.getItemQuantity()).subscribe();
} }
} catch (Exception e) { } catch (Exception e) {
log.error("error", e); log.error("error", e);
} }
...@@ -74,7 +73,7 @@ public class KafkaListenerService { ...@@ -74,7 +73,7 @@ public class KafkaListenerService {
log.info("Received this data: {}", order); log.info("Received this data: {}", order);
log.info("recieved this list: {}", itemList); log.info("recieved this list: {}", itemList);
for (Item item : itemList) { for (Item item : itemList) {
productService.updateProductInventoryBySku(item.getItemSku(), status, item.getItemQuantity()).block(); productService.updateProductInventoryBySkuOrder(item.getItemSku(), status, item.getItemQuantity()).block();
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -38,29 +38,6 @@ public class ProductService { ...@@ -38,29 +38,6 @@ public class ProductService {
} }
public Mono<Product> updateProductInventoryBySku(String sku, String status, int itemQuantity) {
int stock = itemQuantity;
if (status.equals("CANCELLED")) {
stock *= -1;
} else if (status.equals("RECEIVED")) {
System.out.printf("sku = %s, status = %s, itemquanity = %d, stock = %d, finalStock = %d \n", sku, status, itemQuantity);
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setBlockedStock(dbProduct.getBlockedStock() + itemQuantity);
return productRepository.save(dbProduct);
});
}
int finalStock = stock;
System.out.printf("sku = %s, status = %s, itemquanity = %d, stock = %d, finalStock = %d \n", sku, status, itemQuantity, stock, finalStock);
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() - finalStock);
dbProduct.setFulfilledStock(dbProduct.getFulfilledStock() + itemQuantity);
dbProduct.setBlockedStock(dbProduct.getBlockedStock() - itemQuantity);
return productRepository.save(dbProduct);
});
}
public Mono<Product> updateProductBySku(String sku, Product product){ public Mono<Product> updateProductBySku(String sku, Product product){
return productRepository.findBySku(sku) return productRepository.findBySku(sku)
.flatMap(dbProduct -> { .flatMap(dbProduct -> {
...@@ -80,4 +57,58 @@ public class ProductService { ...@@ -80,4 +57,58 @@ public class ProductService {
return productRepository.save(product); return productRepository.save(product);
} }
public Mono<Product> updateProductInventoryBySkuWareHouse(String sku, String status, int itemQuantity) {
System.out.printf("sku = %s, status = %s, itemquanity = %d \n", sku, status, itemQuantity);
if (status.equals("CANCELED")) {
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() + itemQuantity);
return productRepository.save(dbProduct);
});
} else if (status.equals("RECEIVED")) {
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setBlockedStock(dbProduct.getBlockedStock() + itemQuantity);
return productRepository.save(dbProduct);
});
} else { // if status.equals("FULFILLED")
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() - itemQuantity);
dbProduct.setFulfilledStock(dbProduct.getFulfilledStock() + itemQuantity);
dbProduct.setBlockedStock(dbProduct.getBlockedStock() - itemQuantity);
return productRepository.save(dbProduct);
});
}
}
public Mono<Product> updateProductInventoryBySkuOrder(String sku, String status, int itemQuantity) {
System.out.printf("sku = %s, status = %s, itemquanity = %d \n", sku, status, itemQuantity);
if (status.equals("CANCELED")){
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setAvailableStock(dbProduct.getAvailableStock() + itemQuantity);
dbProduct.setBlockedStock(dbProduct.getBlockedStock() - itemQuantity);
return productRepository.save(dbProduct);
});
}
else if (status.equals("RECEIVED")){
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
dbProduct.setBlockedStock(dbProduct.getBlockedStock() + itemQuantity);
return productRepository.save(dbProduct);
});
}
else { // if status.equals("FULFILLED")
return productRepository.findBySku(sku)
.flatMap(dbProduct -> {
return productRepository.save(dbProduct);
});
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment