Commit 56a711d1 authored by Vishal Vaddadhi's avatar Vishal Vaddadhi

AFP[120] & AFP[121] Ecom orders to kafka and DB

parent b7954c7f
Pipeline #1694 failed with stage
in 40 seconds
No preview for this file type
...@@ -26,6 +26,13 @@ ...@@ -26,6 +26,13 @@
<artifactId>spring-boot-starter-webflux</artifactId> <artifactId>spring-boot-starter-webflux</artifactId>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-json-serializer -->
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-dist --> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-dist -->
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
...@@ -44,12 +51,12 @@ ...@@ -44,12 +51,12 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency> <!--<dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>1.18.16</version> <version>1.18.16</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>-->
<!-- https://mvnrepository.com/artifact/com.github.javafaker/javafaker --> <!-- https://mvnrepository.com/artifact/com.github.javafaker/javafaker -->
<dependency> <dependency>
<groupId>com.github.javafaker</groupId> <groupId>com.github.javafaker</groupId>
...@@ -89,6 +96,11 @@ ...@@ -89,6 +96,11 @@
<groupId>io.projectreactor.kafka</groupId> <groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId> <artifactId>reactor-kafka</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
package com.afp.ordermanagement.config; package com.afp.ordermanagement.config;
import com.afp.ordermanagement.reactivekafkaservice.Sender; import com.afp.ordermanagement.model.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
...@@ -8,16 +8,14 @@ import org.apache.kafka.common.serialization.StringSerializer; ...@@ -8,16 +8,14 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
/*import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;*/
import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderOptions;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
...@@ -42,14 +40,14 @@ public class KafkaConfig { ...@@ -42,14 +40,14 @@ public class KafkaConfig {
senderConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); senderConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
senderConfigProps.put(ProducerConfig.ACKS_CONFIG, acks); senderConfigProps.put(ProducerConfig.ACKS_CONFIG, acks);
senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return senderConfigProps; return senderConfigProps;
//return new DefaultKafkaProducerFactory<>(configProps); //return new DefaultKafkaProducerFactory<>(senderConfigProps);
} }
@Bean @Bean
public KafkaSender<String, String> kafkaEventProducer() { public KafkaSender<String, Order> kafkaEventProducer() {
SenderOptions<String, String> senderOptions = SenderOptions.create(producerFactoryString()); SenderOptions<String, Order> senderOptions = SenderOptions.create(producerFactoryString());
return KafkaSender.create(senderOptions); return KafkaSender.create(senderOptions);
} }
......
...@@ -12,6 +12,7 @@ import reactor.core.publisher.Mono; ...@@ -12,6 +12,7 @@ import reactor.core.publisher.Mono;
@RestController @RestController
@RequestMapping("/api") @RequestMapping("/api")
@CrossOrigin
public class OrderController { public class OrderController {
@Autowired @Autowired
...@@ -22,12 +23,17 @@ public class OrderController { ...@@ -22,12 +23,17 @@ public class OrderController {
/** /**
* DESC - This route will let order manager get order status from warehouse * DESC - Persisting order information in the database and sending the
* @param orderId * order object via orders kafka topic so that warehouse can
* handle that
* @param orderObject
* @return
*/ */
@GetMapping("/orderStatus/{orderId}") @PostMapping("/ordersFromEcom")
public void getOrderStatusFromWarehouse(@PathVariable String orderId) { @ResponseStatus(HttpStatus.CREATED)
sender.sendOrderIdToWarehouse(orderId); public Mono<Order> getOrderFromEcom(@RequestBody Order orderObject) {
sender.sendOrderToWarehouse(orderObject);
return orderService.createOrder(orderObject);
} }
@GetMapping("/orders") @GetMapping("/orders")
...@@ -46,14 +52,7 @@ public class OrderController { ...@@ -46,14 +52,7 @@ public class OrderController {
public Flux<Order> getAllOrdersByCustomerId(@PathVariable("customerId") String customerId) { public Flux<Order> getAllOrdersByCustomerId(@PathVariable("customerId") String customerId) {
return orderService.getAllOrdersByCustomerId(customerId); return orderService.getAllOrdersByCustomerId(customerId);
} }
@PostMapping("/orders")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Order> saveOrder(@RequestBody Order order){
return orderService.createOrder(order);
}
@PutMapping("/order/{orderId}") @PutMapping("/order/{orderId}")
public Mono<ResponseEntity<Order>> updateOrder(@PathVariable(value = "orderId") String orderId, @RequestBody Order order){ public Mono<ResponseEntity<Order>> updateOrder(@PathVariable(value = "orderId") String orderId, @RequestBody Order order){
return orderService.updateOrderByOrderId(orderId, order) return orderService.updateOrderByOrderId(orderId, order)
......
...@@ -34,7 +34,7 @@ public class Receiver { ...@@ -34,7 +34,7 @@ public class Receiver {
@EventListener(ApplicationStartedEvent.class) @EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() { public void consumeOrderStatus() {
kafkaReceiver.receive() kafkaReceiver.receive()
.doOnNext(record -> log.info(String.format("##### -> Receiver receiving message: %s ", record.value()))) .doOnNext(record -> System.out.println(record))
.doOnNext(record -> log.info("record.value(): {} ", record.value())) .doOnNext(record -> log.info("record.value(): {} ", record.value()))
.doOnNext(record -> onOrderStatusReceived(record.value())) .doOnNext(record -> onOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnError(throwable -> System.out.println(throwable.getMessage()))
......
package com.afp.ordermanagement.reactivekafkaservice; package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.service.OrderService; import com.afp.ordermanagement.model.Order;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord; import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult; import reactor.kafka.sender.SenderResult;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@Service @Service
@Slf4j @Slf4j
...@@ -28,17 +19,16 @@ public class Sender { ...@@ -28,17 +19,16 @@ public class Sender {
@Autowired @Autowired
private KafkaSender<String, String> kafkaEventProducer; private KafkaSender<String, Order> kafkaEventProducer;
private static final String ORDER_TOPIC = "orders"; private static final String ORDER_TOPIC = "orders";
public void sendOrderIdToWarehouse(String id) { public void sendOrderToWarehouse(Order orderObject) {
log.info(String.format("##### -> Sender sending message: %s ", id)); log.info(String.format("##### -> Sender sending message: %s ", orderObject));
// sendMessage(ORDER_TOPIC, id, id); ProducerRecord<String, Order> record = new ProducerRecord<>(ORDER_TOPIC, orderObject);
ProducerRecord<String, String> record = new ProducerRecord<>(ORDER_TOPIC, id); Flux<SenderResult<Order>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, orderObject)))
Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, id)))
.doOnError(throwable -> System.out.println(throwable)) .doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> { .doOnNext(uuidSenderResult -> {
if (null != uuidSenderResult.exception()) { if (null != uuidSenderResult.exception()) {
...@@ -49,11 +39,11 @@ public class Sender { ...@@ -49,11 +39,11 @@ public class Sender {
} }
public void sendUpdatedStatus(String id, String status) { // public void sendUpdatedStatus(String id, String status) {
log.info(String.format("Sender sending updated status for ordernumber: %s", id)); // log.info(String.format("Sender sending updated status for ordernumber: %s", id));
ProducerRecord<String, String> stat = new ProducerRecord<>(ORDER_TOPIC, status); // ProducerRecord<String, String> stat = new ProducerRecord<>(ORDER_TOPIC, status);
//
} // }
} }
...@@ -15,6 +15,7 @@ public class OrderService { ...@@ -15,6 +15,7 @@ public class OrderService {
OrderRepository orderRepository; OrderRepository orderRepository;
public Mono<Order> createOrder(Order newOrder){ public Mono<Order> createOrder(Order newOrder){
System.out.println("here");
String defaultOrderTrackingCode = "N/A"; String defaultOrderTrackingCode = "N/A";
OrderStatus defaultOrderStatus = OrderStatus.RECEIVED; OrderStatus defaultOrderStatus = OrderStatus.RECEIVED;
long serviceSystemTime = System.currentTimeMillis(); long serviceSystemTime = System.currentTimeMillis();
...@@ -22,6 +23,7 @@ public class OrderService { ...@@ -22,6 +23,7 @@ public class OrderService {
newOrder.setOrderTrackingCode(defaultOrderTrackingCode); newOrder.setOrderTrackingCode(defaultOrderTrackingCode);
newOrder.setOrderCreatedAt(serviceSystemTime); newOrder.setOrderCreatedAt(serviceSystemTime);
newOrder.setOrderUpdatedAt(serviceSystemTime); newOrder.setOrderUpdatedAt(serviceSystemTime);
//System.out.println(newOrder);
return orderRepository.save(newOrder); return orderRepository.save(newOrder);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment