Commit 1be04ca2 authored by Shanelle Valencia's avatar Shanelle Valencia

Merge branch 'new-fetch-status-warehouse' into 'dev'

Clean up code

See merge request !39
parents 26bec4c7 bf9d99cb
Pipeline #1731 failed with stage
in 38 seconds
package com.afp.ordermanagement.controller; package com.afp.ordermanagement.controller;
import com.afp.ordermanagement.model.Order; import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.reactivekafkaservice.Receiver;
import com.afp.ordermanagement.reactivekafkaservice.Sender; import com.afp.ordermanagement.reactivekafkaservice.Sender;
import com.afp.ordermanagement.service.OrderService; import com.afp.ordermanagement.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -21,6 +22,9 @@ public class OrderController { ...@@ -21,6 +22,9 @@ public class OrderController {
@Autowired @Autowired
Sender sender; Sender sender;
@Autowired
Receiver receiver;
/** /**
* DESC - Persisting order information in the database and sending the * DESC - Persisting order information in the database and sending the
...@@ -36,12 +40,14 @@ public class OrderController { ...@@ -36,12 +40,14 @@ public class OrderController {
return orderService.createOrder(orderObject); return orderService.createOrder(orderObject);
} }
@GetMapping("/orders") @GetMapping("/orders")
@CrossOrigin @CrossOrigin
public Flux<Order> getAllOrders(){ public Flux<Order> getAllOrders(){
return orderService.getAllOrders(); return orderService.getAllOrders();
} }
@GetMapping("/orders/{orderId}") @GetMapping("/orders/{orderId}")
@CrossOrigin @CrossOrigin
public Mono<Order> getOrderById(@PathVariable("orderId") String orderId) { public Mono<Order> getOrderById(@PathVariable("orderId") String orderId) {
...@@ -65,4 +71,5 @@ public class OrderController { ...@@ -65,4 +71,5 @@ public class OrderController {
orderService.deleteOrderById(orderId); orderService.deleteOrderById(orderId);
} }
} }
package com.afp.ordermanagement.model; package com.afp.ordermanagement.model;
import lombok.Data; import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import java.util.Objects; import java.util.Objects;
@Data @Data
@Getter
@Setter
public class Item { public class Item {
private String itemId, itemName, itemSku; private String itemId, itemName, itemSku;
private int itemQuantity; private int itemQuantity;
......
...@@ -8,6 +8,7 @@ import org.springframework.data.mongodb.core.mapping.Document; ...@@ -8,6 +8,7 @@ import org.springframework.data.mongodb.core.mapping.Document;
import java.util.List; import java.util.List;
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
...@@ -17,6 +18,7 @@ public class Order { ...@@ -17,6 +18,7 @@ public class Order {
this.orderStatus = status; this.orderStatus = status;
} }
@Id @Id
private String id; private String id;
...@@ -32,3 +34,4 @@ public class Order { ...@@ -32,3 +34,4 @@ public class Order {
private CustomerAddress customerAddress; private CustomerAddress customerAddress;
} }
...@@ -3,5 +3,5 @@ package com.afp.ordermanagement.model; ...@@ -3,5 +3,5 @@ package com.afp.ordermanagement.model;
public enum OrderStatus { public enum OrderStatus {
RECEIVED, RECEIVED,
FULFILLED, FULFILLED,
CANCELLED CANCELLED,
} }
package com.afp.ordermanagement.reactivekafkaservice; package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.controller.OrderController;
import com.afp.ordermanagement.model.Item;
import com.afp.ordermanagement.model.Order; import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus; import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.repository.OrderRepository;
import com.afp.ordermanagement.service.OrderService; import com.afp.ordermanagement.service.OrderService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -13,11 +17,9 @@ import org.springframework.stereotype.Service; ...@@ -13,11 +17,9 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@Service @Service
@Slf4j @Slf4j
...@@ -29,6 +31,8 @@ public class Receiver { ...@@ -29,6 +31,8 @@ public class Receiver {
@Autowired @Autowired
private OrderService orderService; private OrderService orderService;
@Autowired
private OrderRepository orderRepository;
@EventListener(ApplicationStartedEvent.class) @EventListener(ApplicationStartedEvent.class)
...@@ -36,55 +40,31 @@ public class Receiver { ...@@ -36,55 +40,31 @@ public class Receiver {
kafkaReceiver.receive() kafkaReceiver.receive()
.doOnNext(record -> System.out.println(record)) .doOnNext(record -> System.out.println(record))
.doOnNext(record -> log.info("record.value(): {} ", record.value())) .doOnNext(record -> log.info("record.value(): {} ", record.value()))
.doOnNext(record -> onOrderStatusReceived(record.value())) .doOnNext(record -> updateOrderStatus(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe(); .subscribe();
} }
private void onOrderStatusReceived(String orderStatusStr) {
private void updateOrderStatus(String orderStr) {
try { try {
//deserialize string into java object using ObjectMapper
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> orderStatus = objectMapper.readValue(orderStatusStr, Map.class); Order order = objectMapper.readValue(orderStr, Order.class);
String id = orderStatus.get("id"); log.info("ORDER objectMapper {}", order);
String status = orderStatus.get("orderStatus").toUpperCase(Locale.ROOT); String orderId = order.getId();
updateOrderStatus(id, status);
log.info("orderStatus: {}", orderStatus);
} catch (Exception e) {
log.error("Caught error", e);
}
}
Mono<Order> updated = orderService.updateOrderByOrderId(orderId, order);
updated.block();
private void updateOrderStatus(String orderId, String orderStatus) { } catch (Exception e) {
if (checkExistingOrder(orderId)) { log.error("Caught error on UpdateOrderStatus method", e);
log.info("Updating {} with status {}", orderId, orderStatus); e.printStackTrace();
Order newOrder = new Order(OrderStatus.valueOf(orderStatus));
Mono<Order> updateOrder = orderService.updateOrderByOrderId(orderId, newOrder);
updateOrder.subscribe();
// updateOrder.block(); //subscribe vs block?
} }
} }
private boolean checkExistingOrder(String orderId) {
Flux<Order> orders = orderService.getAllOrders();
List<Order> orderList = orders.collectList().block();
Order res = orderList.stream()
.filter(order -> orderId.equals(order.getId()))
.findAny()
.orElse(null);
if (res == null) {
log.error("Order {} not found", orderId);
return false;
}
log.info("Order exists on the database");
return true;
}
} }
...@@ -39,11 +39,4 @@ public class Sender { ...@@ -39,11 +39,4 @@ public class Sender {
} }
// public void sendUpdatedStatus(String id, String status) {
// log.info(String.format("Sender sending updated status for ordernumber: %s", id));
// ProducerRecord<String, String> stat = new ProducerRecord<>(ORDER_TOPIC, status);
//
// }
} }
...@@ -4,9 +4,11 @@ import com.afp.ordermanagement.model.Order; ...@@ -4,9 +4,11 @@ import com.afp.ordermanagement.model.Order;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Repository @Repository
public interface OrderRepository extends ReactiveMongoRepository<Order, String> { public interface OrderRepository extends ReactiveMongoRepository<Order, String> {
Flux<Order> findByCustomerId(String customerId); Flux<Order> findByCustomerId(String customerId);
} }
package com.afp.ordermanagement.service; package com.afp.ordermanagement.service;
import com.afp.ordermanagement.model.Item;
import com.afp.ordermanagement.model.Order; import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus; import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.repository.OrderRepository; import com.afp.ordermanagement.repository.OrderRepository;
...@@ -8,6 +9,10 @@ import org.springframework.stereotype.Service; ...@@ -8,6 +9,10 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Service @Service
public class OrderService { public class OrderService {
...@@ -40,6 +45,7 @@ public class OrderService { ...@@ -40,6 +45,7 @@ public class OrderService {
} }
public Mono<Order> updateOrderByOrderId(String orderId, Order newOrder){ public Mono<Order> updateOrderByOrderId(String orderId, Order newOrder){
return orderRepository.findById(orderId) return orderRepository.findById(orderId)
.flatMap(existingOrder -> { .flatMap(existingOrder -> {
existingOrder.setCustomerAddress(newOrder.getCustomerAddress()); existingOrder.setCustomerAddress(newOrder.getCustomerAddress());
...@@ -47,6 +53,7 @@ public class OrderService { ...@@ -47,6 +53,7 @@ public class OrderService {
existingOrder.setOrderTrackingCode(newOrder.getOrderTrackingCode()); existingOrder.setOrderTrackingCode(newOrder.getOrderTrackingCode());
existingOrder.setOrderItems(newOrder.getOrderItems()); existingOrder.setOrderItems(newOrder.getOrderItems());
existingOrder.setOrderStatus(newOrder.getOrderStatus()); existingOrder.setOrderStatus(newOrder.getOrderStatus());
existingOrder.setOrderItems(newOrder.getOrderItems());
return orderRepository.save(existingOrder); return orderRepository.save(existingOrder);
}); });
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment