Commit 197d0369 authored by Shanelle Valencia's avatar Shanelle Valencia

[AFP-53] Fix rebase issue -delete backend directory dupe of src directory [@svalencia]

parent 455a328b
Pipeline #1687 canceled with stage
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
<mapping directory="" vcs="Git" />
</component>
</project>
\ No newline at end of file
package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.service.OrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@Service
@Slf4j
public class Receiver {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@Autowired
private OrderService orderService;
@EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() {
kafkaReceiver.receive()
.doOnNext(record -> log.info(String.format("##### -> Receiver receiving message: %s ", record.value())))
.doOnNext(record -> log.info("record.value(): {} ", record.value()))
.doOnNext(record -> onOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
private void onOrderStatusReceived(String orderStatusStr) {
try {
//deserialize JSON string into java object using ObjectMapper
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> orderStatus = objectMapper.readValue(orderStatusStr, Map.class);
String id = orderStatus.get("id");
String status = orderStatus.get("orderStatus").toUpperCase(Locale.ROOT);
updateOrderStatus(id, status);
log.info("orderStatus: {}", orderStatus);
} catch (Exception e) {
log.error("Caught error", e);
}
}
private void updateOrderStatus(String orderId, String orderStatus) {
if (checkExistingOrder(orderId)) {
log.info("Updating {} with status {}", orderId, orderStatus);
Order newOrder = new Order(OrderStatus.valueOf(orderStatus));
Mono<Order> updateOrder = orderService.updateOrderByOrderId(orderId, newOrder);
updateOrder.subscribe();
// updateOrder.block(); //subscribe vs block?
}
}
private boolean checkExistingOrder(String orderId) {
Flux<Order> orders = orderService.getAllOrders();
List<Order> orderList = orders.collectList().block();
Order res = orderList.stream()
.filter(order -> orderId.equals(order.getId()))
.findAny()
.orElse(null);
if (res == null) {
log.error("Order {} not found", orderId);
return false;
}
log.info("Order exists on the database");
return true;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment