package com.nisum.orderprocessingapi.controller; import com.google.gson.Gson; import com.nisum.orderprocessingapi.dto.IdAndOrderIdMapper; import com.nisum.orderprocessingapi.dto.OrderCollectionDto; import com.nisum.orderprocessingapi.dto.OrderDto; import com.nisum.orderprocessingapi.dto.OrderResponseDto; import com.nisum.orderprocessingapi.producer.OrderEventGenerator; import com.nisum.orderprocessingapi.service.OrderProcessingService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverRecord; @Slf4j @RestController @RequestMapping(path = "/order") public class OrderController { @Autowired private KafkaReceiver<String,String> kafkaReceiver; @Autowired private OrderEventGenerator orderEventGenerator; @Autowired private OrderProcessingService orderProcessingService; @Autowired private Gson gson; @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux getAllOrderEvents(){ Flux<ReceiverRecord<String,String>> kafkaFlux = kafkaReceiver.receive(); return kafkaFlux.checkpoint("Messages are started being consumed").log() .doOnNext(order-> { String key = order.key(); OrderDto orderDto = gson.fromJson(order.value(), OrderDto.class); orderProcessingService.save(orderDto,key).onErrorResume(o->{ log.info("Unable to save document in mongo DB {} ",o); order.receiverOffset().acknowledge(); return Mono.empty(); }).subscribe(orderStatus -> { log.info("Mongo DB : Order Key : {} saved status {}", key,orderStatus); order.receiverOffset().acknowledge(); }); }).map(ReceiverRecord::value) .checkpoint("Messages are done consumed"); } @PostMapping(produces = MediaType.APPLICATION_JSON_VALUE) public Flux<OrderResponseDto> createOrderEventInKafka(@RequestBody OrderCollectionDto orderCollectionDto) throws InterruptedException { log.info("orderCollection Received : {}",orderCollectionDto); return orderEventGenerator.publishOrderEvent("test-order",orderCollectionDto); } @PostMapping(path = "/create",produces = MediaType.APPLICATION_JSON_VALUE) public Flux<IdAndOrderIdMapper> createOrderEvent(@RequestBody OrderCollectionDto orderCollectionDto) throws InterruptedException { log.info("orderCollection Received : {}",orderCollectionDto); return orderProcessingService.saveAll(orderCollectionDto); } @PutMapping(path = "/update/{id}",produces = MediaType.APPLICATION_JSON_VALUE) public Mono<OrderDto> updateOrderEvent(@PathVariable String id, @RequestBody OrderDto orderDto) throws InterruptedException { log.info("orderCollection Received for update : {}",orderDto); return orderProcessingService.update(orderDto); } @DeleteMapping("/delete/{id}") public Mono<Boolean> deleteOrderEvent(@PathVariable String id) throws InterruptedException { log.info("Order id Received for deletion : {}",id); return orderProcessingService.delete(id); } @GetMapping("/get/{id}") public Mono<OrderDto> readOrderEvent(@PathVariable String id) throws InterruptedException { log.info("Order id Received for retrieval : {}",id); return orderProcessingService.read(id); } }