Commit 56b34f30 authored by Shanelle Valencia's avatar Shanelle Valencia

[AFP-53] Added endpoint testing for kafka update [@svalencia]

parent 78aa876f
...@@ -66,7 +66,7 @@ public class KafkaConfig { ...@@ -66,7 +66,7 @@ public class KafkaConfig {
} }
@Bean @Bean
public KafkaReceiver<String, String> kafkaEventReceiver(@Value("${kafka.topic.input}") String posLogTopic) { public KafkaReceiver<String, String> kafkaEventReceiver(@Value("${kafka.topic.output}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerFactory()); ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerFactory());
receiverOptions.maxCommitAttempts(3); receiverOptions.maxCommitAttempts(3);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator).subscription(Collections.singleton(posLogTopic))); return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator).subscription(Collections.singleton(posLogTopic)));
......
...@@ -42,6 +42,15 @@ public class OrderController { ...@@ -42,6 +42,15 @@ public class OrderController {
} }
@PostMapping("/orderStatus")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Order> getUpdatedOrderFromWarehouseTopic(@RequestBody Order order) {
sender.sendUpdatedOrderToWarehouseTopic(order);
return orderService.updateOrderByOrderId(order.getId(), order);
}
@GetMapping("/orders") @GetMapping("/orders")
@CrossOrigin @CrossOrigin
public Flux<Order> getAllOrders(){ public Flux<Order> getAllOrders(){
......
...@@ -41,9 +41,16 @@ public class Receiver { ...@@ -41,9 +41,16 @@ public class Receiver {
@EventListener(ApplicationStartedEvent.class) @EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() { public void consumeOrderStatus() {
kafkaReceiver.receive() kafkaReceiver.receive()
// .doOnNext(record -> System.out.println(record)) .doOnNext(record -> System.out.println(record))
// .doOnNext(record -> log.info("record.value(): {} ", record.value())) .doOnNext(record -> log.info("record.value(): {} ", record.value()))
// .doOnNext(record -> updateOrderStatus(record.value())) // .doOnNext(record -> {
// if (record.key() != null) {
// updateOrderStatus(record.value());
// } else {
// log.error("{} is :::::: ", record.value());
// }
// })
.doOnNext(record -> updateOrderStatus(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe(); .subscribe();
} }
...@@ -53,20 +60,25 @@ public class Receiver { ...@@ -53,20 +60,25 @@ public class Receiver {
private void updateOrderStatus(String orderStr) { private void updateOrderStatus(String orderStr) {
try { try {
ObjectMapper objectMapper = new ObjectMapper(); // if (orderStr.isEmpty()) {
Order order = objectMapper.readValue(orderStr, Order.class); // log.info("ORDERSTR IS EMPTYYYYYY");
log.info("ORDER objectMapper {}", order); // return;
String orderId = order.getId(); // } else {
ObjectMapper objectMapper = new ObjectMapper();
System.out.println("About to try sending an email."); Order order = objectMapper.readValue(orderStr, Order.class);
emailService.emailCreator(order); log.info("ORDER objectMapper {}", order);
String orderId = order.getId();
Mono<Order> updated = orderService.updateOrderByOrderId(orderId, order);
updated.block(); System.out.println("About to try sending an email.");
emailService.emailCreator(order);
Mono<Order> updated = orderService.updateOrderByOrderId(orderId, order);
updated.subscribe();
// }
} catch (Exception e) { } catch (Exception e) {
log.error("Caught error on UpdateOrderStatus method", e); log.error("Caught error on UpdateOrderStatus method", e);
e.printStackTrace(); // e.printStackTrace();
} }
} }
......
...@@ -25,6 +25,8 @@ public class Sender { ...@@ -25,6 +25,8 @@ public class Sender {
private static final String ORDER_TOPIC = "orders"; private static final String ORDER_TOPIC = "orders";
private static final String WAREHOUSE_TOPIC = "warehouseManagement";
public void sendOrderToWarehouse(Order orderObject) { public void sendOrderToWarehouse(Order orderObject) {
log.info(String.format("##### -> Sender sending message: %s ", orderObject)); log.info(String.format("##### -> Sender sending message: %s ", orderObject));
...@@ -42,4 +44,17 @@ public class Sender { ...@@ -42,4 +44,17 @@ public class Sender {
} }
public void sendUpdatedOrderToWarehouseTopic(Order order) {
ProducerRecord<String, Order> record = new ProducerRecord<>(WAREHOUSE_TOPIC, order);
System.out.println("In sendUpdatedOrderToWarehouseTopic");
Flux<SenderResult<Order>> updated = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, order)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> {
if (null != uuidSenderResult.exception()) {
System.out.println("send order update");
}
});
updated.doOnError(throwable -> log.error("error on sendUpdatedOrderToWarehouse method")).subscribe();
}
} }
...@@ -4,6 +4,7 @@ kafka.producer.acks=all ...@@ -4,6 +4,7 @@ kafka.producer.acks=all
kafka.consumer.bootstrap-servers=localhost:9092 kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.group-id=group_id kafka.consumer.group-id=group_id
kafka.topic.input=orders kafka.topic.input=orders
kafka.topic.output=warehouseManagement
# Config for MailTrap SMTP Mail testing service # Config for MailTrap SMTP Mail testing service
spring.mail.protocol=smtp spring.mail.protocol=smtp
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment