Commit 548029e2 authored by Christopher Cottier's avatar Christopher Cottier

troubleshooting Kafka messaging

parent d9feaf6f
Pipeline #1748 failed with stage
in 38 seconds
......@@ -77,5 +77,11 @@ public class OrderController {
orderService.deleteOrderById(orderId);
}
@PostMapping("/orderStatus")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Order> getUpdatedOrderFromWarehouseTopic(@RequestBody Order order) {
sender.sendUpdatedOrderToWarehouseTopic(order);
return orderService.updateOrderByOrderId(order.getId(), order);
}
}
package com.afp.ordermanagement.model;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import java.util.Date;
import java.util.List;
@Getter
@Setter
@ToString
public class WarehouseOrder {
@Id
private String id;
private String orderId;
private String status;
private Date createdAt;
private Date modifiedAt;
private List<Item> orderItems;
private String address;
public WarehouseOrder() {
}
}
......@@ -4,6 +4,7 @@ import com.afp.ordermanagement.controller.OrderController;
import com.afp.ordermanagement.model.Item;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.model.WarehouseOrder;
import com.afp.ordermanagement.repository.OrderRepository;
import com.afp.ordermanagement.service.EmailService;
import com.afp.ordermanagement.service.OrderService;
......@@ -41,9 +42,9 @@ public class Receiver {
@EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() {
kafkaReceiver.receive()
// .doOnNext(record -> System.out.println(record))
// .doOnNext(record -> log.info("record.value(): {} ", record.value()))
// .doOnNext(record -> updateOrderStatus(record.value()))
.doOnNext(record -> System.out.println(record))
.doOnNext(record -> log.info("record.value(): {} ", record.value()))
.doOnNext(record -> updateOrderStatus(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
......@@ -54,15 +55,19 @@ public class Receiver {
private void updateOrderStatus(String orderStr) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Order order = objectMapper.readValue(orderStr, Order.class);
WarehouseOrder order = objectMapper.readValue(orderStr, WarehouseOrder.class);
log.info("ORDER objectMapper {}", order);
String orderId = order.getId();
String orderId = order.getOrderId();
System.out.println("About to try sending an email.");
emailService.emailCreator(order);
Mono<Order> updated = orderService.updateOrderByOrderId(orderId, order);
updated.block();
Mono<Order> convertedOrder = orderService.getOrderById(order.getOrderId());
convertedOrder.flatMap(o -> {
o.setOrderStatus(OrderStatus.valueOf(order.getStatus()));
emailService.emailCreator(o);
orderService.updateOrderByOrderId(orderId, o);
System.out.println("ayayayayayayayaya receiver" + o);
return Mono.just(o);
}).block();
} catch (Exception e) {
log.error("Caught error on UpdateOrderStatus method", e);
......
......@@ -12,6 +12,8 @@ import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import java.util.UUID;
@Service
@Slf4j
......@@ -25,13 +27,14 @@ public class Sender {
private static final String ORDER_TOPIC = "OMS_ORDER_UPDATE";
private static final String WAREHOUSE_TOPIC = "WMOS_ORDER_UPDATE";
public void sendOrderToTopic(Order orderObject) {
log.info(String.format("##### -> Sender sending message: %s ", orderObject));
ProducerRecord<String, Order> record = new ProducerRecord<>(ORDER_TOPIC, orderObject);
System.out.println("In sendOrderToWarehouse");
emailService.emailCreator(orderObject);
Flux<SenderResult<Order>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, orderObject)))
Flux<SenderResult<UUID>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, UUID.randomUUID())))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> {
if (null != uuidSenderResult.exception()) {
......@@ -41,5 +44,18 @@ public class Sender {
working.doOnError(throwable -> log.error("some error")).subscribe();
}
public void sendUpdatedOrderToWarehouseTopic(Order order) {
ProducerRecord<String, Order> record = new ProducerRecord<>(WAREHOUSE_TOPIC, order);
System.out.println("In sendUpdatedOrderToWarehouseTopic");
Flux<SenderResult<Order>> updated = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, order)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> {
if (null != uuidSenderResult.exception()) {
System.out.println("send order update");
}
});
updated.doOnError(throwable -> log.error("error on sendUpdatedOrderToWarehouse method")).subscribe();
}
}
......@@ -49,12 +49,13 @@ public class OrderService {
return orderRepository.findById(orderId)
.flatMap(existingOrder -> {
existingOrder.setCustomerAddress(newOrder.getCustomerAddress());
existingOrder.setCustomerEmailAddress(newOrder.getCustomerEmailAddress());
existingOrder.setOrderTrackingCode(newOrder.getOrderTrackingCode());
existingOrder.setOrderItems(newOrder.getOrderItems());
//existingOrder.setCustomerAddress(newOrder.getCustomerAddress());
//existingOrder.setCustomerEmailAddress(newOrder.getCustomerEmailAddress());
//existingOrder.setOrderTrackingCode(newOrder.getOrderTrackingCode());
//existingOrder.setOrderItems(newOrder.getOrderItems());
existingOrder.setOrderStatus(newOrder.getOrderStatus());
existingOrder.setOrderItems(newOrder.getOrderItems());
//existingOrder.setOrderItems(newOrder.getOrderItems());
System.out.println("serviceeee" + existingOrder);
return orderRepository.save(existingOrder);
});
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment