Commit b0f0551c authored by Shanelle Valencia's avatar Shanelle Valencia

[AFP-53] Receive updated status from warehouse and update database [@svalencia]

parent 3a3d92f4
...@@ -12,6 +12,7 @@ import org.springframework.context.annotation.Configuration; ...@@ -12,6 +12,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;*/ import org.springframework.kafka.core.KafkaTemplate;*/
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.KafkaSender;
...@@ -44,7 +45,6 @@ public class KafkaConfig { ...@@ -44,7 +45,6 @@ public class KafkaConfig {
senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return senderConfigProps; return senderConfigProps;
//return new DefaultKafkaProducerFactory<>(configProps);
} }
@Bean @Bean
...@@ -53,6 +53,7 @@ public class KafkaConfig { ...@@ -53,6 +53,7 @@ public class KafkaConfig {
return KafkaSender.create(senderOptions); return KafkaSender.create(senderOptions);
} }
// @Bean // @Bean
// public KafkaTemplate<String, Object> kafkaTemplateString() { // public KafkaTemplate<String, Object> kafkaTemplateString() {
// return new KafkaTemplate<>(producerFactoryString()); // return new KafkaTemplate<>(producerFactoryString());
......
package com.afp.ordermanagement.controller; package com.afp.ordermanagement.controller;
import com.afp.ordermanagement.model.Order; import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.reactivekafkaservice.Receiver;
import com.afp.ordermanagement.reactivekafkaservice.Sender; import com.afp.ordermanagement.reactivekafkaservice.Sender;
import com.afp.ordermanagement.service.OrderService; import com.afp.ordermanagement.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -20,6 +21,9 @@ public class OrderController { ...@@ -20,6 +21,9 @@ public class OrderController {
@Autowired @Autowired
Sender sender; Sender sender;
@Autowired
Receiver receiver;
/** /**
* DESC - This route will let order manager get order status from warehouse * DESC - This route will let order manager get order status from warehouse
...@@ -30,6 +34,8 @@ public class OrderController { ...@@ -30,6 +34,8 @@ public class OrderController {
sender.sendOrderIdToWarehouse(orderId); sender.sendOrderIdToWarehouse(orderId);
} }
@GetMapping("/orders") @GetMapping("/orders")
@CrossOrigin @CrossOrigin
public Flux<Order> getAllOrders(){ public Flux<Order> getAllOrders(){
......
...@@ -14,6 +14,7 @@ import java.util.Objects; ...@@ -14,6 +14,7 @@ import java.util.Objects;
@Document(collection = "orders") @Document(collection = "orders")
public class Order { public class Order {
@Id @Id
private String id; private String id;
...@@ -32,4 +33,8 @@ public class Order { ...@@ -32,4 +33,8 @@ public class Order {
public Order(){ public Order(){
} }
public Order(OrderStatus status) {
this.orderStatus = status;
}
} }
package com.afp.ordermanagement.reactivekafkaservice; package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.service.OrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.KafkaReceiver;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@Service @Service
@Slf4j @Slf4j
public class Receiver { public class Receiver {
...@@ -14,11 +26,65 @@ public class Receiver { ...@@ -14,11 +26,65 @@ public class Receiver {
@Autowired @Autowired
private KafkaReceiver<String, String> kafkaReceiver; private KafkaReceiver<String, String> kafkaReceiver;
@Autowired
private OrderService orderService;
@EventListener(ApplicationStartedEvent.class) @EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() { public void consumeOrderStatus() {
kafkaReceiver.receive() kafkaReceiver.receive()
.doOnNext(record -> log.info(String.format("##### -> Receiver receiving message: %s ", record.value()))) .doOnNext(record -> log.info(String.format("##### -> Receiver receiving message: %s ", record.value())))
.doOnNext(record -> log.info("record.value(): {} ", record.value()))
.doOnNext(record -> onOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe(); .subscribe();
} }
private void onOrderStatusReceived(String orderStatusStr) {
try {
//deserialize JSON string into java object using ObjectMapper
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> orderStatus = objectMapper.readValue(orderStatusStr, Map.class);
String id = orderStatus.get("id");
String status = orderStatus.get("orderStatus").toUpperCase(Locale.ROOT);
updateOrderStatus(id, status);
log.info("orderStatus: {}", orderStatus);
} catch (Exception e) {
log.error("Caught error", e);
}
}
private void updateOrderStatus(String orderId, String orderStatus) {
if (checkExistingOrder(orderId)) {
log.info("Updating {} with status {}", orderId, orderStatus);
Order newOrder = new Order(OrderStatus.valueOf(orderStatus));
Mono<Order> updateOrder = orderService.updateOrderByOrderId(orderId, newOrder);
updateOrder.subscribe();
// updateOrder.block(); //subscribe vs block?
}
}
private boolean checkExistingOrder(String orderId) {
Flux<Order> orders = orderService.getAllOrders();
List<Order> orderList = orders.collectList().block();
Order res = orderList.stream()
.filter(order -> orderId.equals(order.getId()))
.findAny()
.orElse(null);
if (res == null) {
log.error("Order {} not found", orderId);
return false;
}
log.info("Order exists on the database");
return true;
}
} }
package com.afp.ordermanagement.reactivekafkaservice; package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.service.OrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord; import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult; import reactor.kafka.sender.SenderResult;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@Service @Service
@Slf4j @Slf4j
public class Sender { public class Sender {
@Autowired @Autowired
private KafkaSender<String, String> kafkaEventProducer; private KafkaSender<String, String> kafkaEventProducer;
private static final String ORDER_TOPIC = "orders"; private static final String ORDER_TOPIC = "orders";
public void sendOrderIdToWarehouse(String id) { public void sendOrderIdToWarehouse(String id) {
log.info(String.format("##### -> Sender sending message: %s ", id)); log.info(String.format("##### -> Sender sending message: %s ", id));
// sendMessage(ORDER_TOPIC, id, id);
ProducerRecord<String, String> record = new ProducerRecord<>(ORDER_TOPIC, id); ProducerRecord<String, String> record = new ProducerRecord<>(ORDER_TOPIC, id);
Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, id))) Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, id)))
.doOnError(throwable -> System.out.println(throwable)) .doOnError(throwable -> System.out.println(throwable))
...@@ -33,4 +47,13 @@ public class Sender { ...@@ -33,4 +47,13 @@ public class Sender {
}); });
working.doOnError(throwable -> log.error("some error")).subscribe(); working.doOnError(throwable -> log.error("some error")).subscribe();
} }
public void sendUpdatedStatus(String id, String status) {
log.info(String.format("Sender sending updated status for ordernumber: %s", id));
ProducerRecord<String, String> stat = new ProducerRecord<>(ORDER_TOPIC, status);
}
} }
...@@ -37,7 +37,7 @@ public class OrderService { ...@@ -37,7 +37,7 @@ public class OrderService {
return orderRepository.findByCustomerId(customerId); return orderRepository.findByCustomerId(customerId);
} }
public Mono<Order> updateOrderByOrderId(String orderId, Order newOrder){ public Mono<Order> updateOrderByOrderId(String orderId, Order newOrder) {
return orderRepository.findById(orderId) return orderRepository.findById(orderId)
.flatMap(existingOrder -> { .flatMap(existingOrder -> {
existingOrder.setCustomerAddress(newOrder.getCustomerAddress()); existingOrder.setCustomerAddress(newOrder.getCustomerAddress());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment