Commit cbe6c7cf authored by Shanelle Valencia's avatar Shanelle Valencia

[AFP-53] Persist order info from warehouse to db [@svalencia]

parent 1095d6fa
package com.afp.ordermanagement.controller;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.reactivekafkaservice.Receiver;
import com.afp.ordermanagement.reactivekafkaservice.Sender;
import com.afp.ordermanagement.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -21,6 +22,9 @@ public class OrderController {
@Autowired
Sender sender;
@Autowired
Receiver receiver;
/**
* DESC - Persisting order information in the database and sending the
......
package com.afp.ordermanagement.model;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import java.util.Objects;
@Data
@Getter
@Setter
public class Item {
private String itemId;
......
......@@ -3,17 +3,21 @@ package com.afp.ordermanagement.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.ToString;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.List;
import java.util.Objects;
@Data
@AllArgsConstructor
@Document(collection = "orders")
public class Order {
@Id
private String id;
......@@ -32,9 +36,6 @@ public class Order {
public Order(){
}
public Order(OrderStatus status) {
this.orderStatus = status;
}
}
package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.controller.OrderController;
import com.afp.ordermanagement.model.Item;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.repository.OrderRepository;
import com.afp.ordermanagement.service.OrderService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -14,11 +17,9 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.*;
@Service
@Slf4j
......@@ -40,70 +41,49 @@ public class Receiver {
kafkaReceiver.receive()
.doOnNext(record -> System.out.println(record))
.doOnNext(record -> log.info("record.value(): {} ", record.value()))
.doOnNext(record -> onOrderStatusReceived(record.value()))
.doOnNext(record -> updateOrderStatus(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
private void onOrderStatusReceived(String orderStatusStr) {
private void updateOrderStatus(String orderStatusStr) {
try {
//deserialize string into java object using ObjectMapper
//deserialize kafka message into java object using ObjectMapper
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> orderStatus = objectMapper.readValue(orderStatusStr, Map.class);
String id = orderStatus.get("id");
String status = orderStatus.get("orderStatus").toUpperCase(Locale.ROOT);
// updateOrderStatus(id, status);
Order newOrder = new Order(OrderStatus.valueOf(status));
Mono<Order> updateOrder = orderService.updateOrderByOrderId(id, newOrder);
updateOrder.subscribe();
log.info("orderStatus: {}", orderStatus);
//Map<String, String> orderStatus = objectMapper.readValue(orderStatusStr, Map.class);
Order order = objectMapper.readValue(orderStatusStr, Order.class);
log.info("ORDER objectMapper {}", order);
String orderId = order.getId();
Mono<Order> updated = orderService.updateOrderByOrderId(orderId, order);
updated.block();
} catch (Exception e) {
log.error("Caught error", e);
}
}
// private void updateOrderStatus(String orderId, String orderStatus) {
// if (checkExistingOrder(orderId)) {
// log.info("Updating {} with status {}", orderId, orderStatus);
// Order newOrder = new Order(OrderStatus.valueOf(orderStatus));
// Mono<Order> updateOrder = orderService.updateOrderByOrderId(orderId, newOrder);
// updateOrder.subscribe();
//// updateOrder.block(); //subscribe vs block?
// }
// }
private boolean checkExistingOrder(String orderId) {
// Flux<Order> orders = orderService.getAllOrders();
// List<Order> orderList = orders.collectList().block();
// Order res = orderList.stream()
// .filter(order -> orderId.equals(order.getId()))
// .findAny()
// .orElse(null);
Mono<Order> order = orderRepository.findById(orderId);
// if (res == null) {
// log.error("Order {} not found", orderId);
// return false;
// }
log.info("ORDER MONO");
log.info(String.valueOf(order));
if (order == null) {
Flux<Order> orders = orderService.getAllOrders();
List<Order> orderList = orders.collectList().block();
Order res = orderList.stream()
.filter(order -> orderId.equals(order.getId()))
.findAny()
.orElse(null);
if (res == null) {
log.error("Order {} not found", orderId);
return false;
} else {
log.info("Order exists on the database");
return true;
}
log.info("Order exists on the database");
return true;
}
}
......@@ -39,11 +39,4 @@ public class Sender {
}
// public void sendUpdatedStatus(String id, String status) {
// log.info(String.format("Sender sending updated status for ordernumber: %s", id));
// ProducerRecord<String, String> stat = new ProducerRecord<>(ORDER_TOPIC, status);
//
// }
}
package com.afp.ordermanagement.service;
import com.afp.ordermanagement.model.Item;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.repository.OrderRepository;
......@@ -8,6 +9,10 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class OrderService {
......@@ -40,6 +45,7 @@ public class OrderService {
}
public Mono<Order> updateOrderByOrderId(String orderId, Order newOrder){
return orderRepository.findById(orderId)
.flatMap(existingOrder -> {
existingOrder.setCustomerAddress(newOrder.getCustomerAddress());
......@@ -47,8 +53,10 @@ public class OrderService {
existingOrder.setOrderTrackingCode(newOrder.getOrderTrackingCode());
existingOrder.setOrderItems(newOrder.getOrderItems());
existingOrder.setOrderStatus(newOrder.getOrderStatus());
existingOrder.setOrderItems(newOrder.getOrderItems());
return orderRepository.save(existingOrder);
});
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment