Commit 92de104d authored by Vishal Vaddadhi's avatar Vishal Vaddadhi

Merge branch 'make_kafka_reactive' into 'dev'

[AFP-100] Transition Kafka setup to reactive Kafka

See merge request !13
parents a275ea9d d644e306
Pipeline #1674 failed with stage
in 38 seconds
......@@ -26,19 +26,7 @@
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-dist -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-dist</artifactId>
<version>2.7.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
......@@ -97,6 +85,11 @@
<artifactId>velocity</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
......
package com.afp.ordermanagement.config;
import com.afp.ordermanagement.reactivekafkaservice.Sender;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
/*import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;*/
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.producer.acks}")
private String acks;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Bean
public ProducerFactory<String, String> producerFactoryString() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
public Map<String, Object> producerFactoryString() {
Map<String, Object> senderConfigProps = new HashMap<>();
senderConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
senderConfigProps.put(ProducerConfig.ACKS_CONFIG, acks);
senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return senderConfigProps;
//return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
public KafkaSender<String, String> kafkaEventProducer() {
SenderOptions<String, String> senderOptions = SenderOptions.create(producerFactoryString());
return KafkaSender.create(senderOptions);
}
// @Bean
// public KafkaTemplate<String, Object> kafkaTemplateString() {
// return new KafkaTemplate<>(producerFactoryString());
// }
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
public Map<String, Object> consumerFactory() {
Map<String, Object> receiverConfigProps = new HashMap<>();
receiverConfigProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
receiverConfigProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
receiverConfigProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
receiverConfigProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return receiverConfigProps;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
public KafkaReceiver<String, String> kafkaEventReceiver(@Value("${kafka.topic.input}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerFactory());
receiverOptions.maxCommitAttempts(3);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator).subscription(Collections.singleton(posLogTopic)));
}
// @Bean
// public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConsumerFactory(consumerFactory());
// return factory;
// }
}
//package com.afp.ordermanagement.config;
//
//import org.springframework.beans.factory.annotation.Qualifier;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.beans.factory.config.ConfigurableBeanFactory;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
//import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
//import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
//
//import com.mongodb.reactivestreams.client.MongoClient;
//import com.mongodb.reactivestreams.client.MongoClients;
//
//@Configuration
//@EnableReactiveMongoRepositories(basePackages = "com.afp.ordermanagement.repository")
//public class ManagerMongoConfig extends AbstractReactiveMongoConfiguration
//{
// @Value("${port}")
// private String port;
//
// @Value("${dbname}")
// private String dbName;
//
// @Override
// public MongoClient reactiveMongoClient() {
// return MongoClients.create();
// }
//
// @Override
// protected String getDatabaseName() {
// return dbName;
// }
//
// @Qualifier(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// public ReactiveMongoTemplate reactiveMongoTemplate() {
// return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
// }
//}
\ No newline at end of file
//package com.afp.ordermanagement.config;
//
//import org.springframework.beans.factory.annotation.Qualifier;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.beans.factory.config.ConfigurableBeanFactory;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
//import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
//import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
//
//import com.mongodb.reactivestreams.client.MongoClient;
//import com.mongodb.reactivestreams.client.MongoClients;
//
//@Configuration
//@EnableReactiveMongoRepositories(basePackages = "com.afp.ordermanagement.repository")
//public class OrderMongoConfig extends AbstractReactiveMongoConfiguration
//{
// @Value("${port}")
// private String port;
//
// @Value("${dbname}")
// private String dbName;
//
// @Override
// public MongoClient reactiveMongoClient() {
// return MongoClients.create();
// }
//
// @Override
// protected String getDatabaseName() {
// return dbName;
// }
//
// @Qualifier(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// public ReactiveMongoTemplate reactiveMongoTemplate() {
// return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
// }
//}
\ No newline at end of file
package com.afp.ordermanagement.controller;
import com.afp.ordermanagement.service.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
public KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/publish/{id}")
public void sendMessageToKafkaTopic(@RequestParam String id) {
producer.sendOrderId(id);
}
@PostMapping(value = "/inventory/{quantity}")
public void sendInventoryQuantity(@PathVariable String quantity) {
producer.sendInventoryQuantity(quantity);
}
}
......@@ -2,7 +2,6 @@ package com.afp.ordermanagement.controller;
import com.afp.ordermanagement.model.Manager;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.repository.ManagerRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
......
package com.afp.ordermanagement.controller;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.reactivekafkaservice.Sender;
import com.afp.ordermanagement.service.OrderService;
import com.afp.ordermanagement.service.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
......@@ -19,7 +18,7 @@ public class OrderController {
private OrderService orderService;
@Autowired
Producer producer;
Sender sender;
/**
......@@ -28,7 +27,7 @@ public class OrderController {
*/
@GetMapping("/orderStatus/{orderId}")
public void getOrderStatusFromWarehouse(@PathVariable String orderId) {
producer.sendOrderId(orderId);
sender.sendOrderIdToWarehouse(orderId);
}
@GetMapping("/orders")
......
package com.afp.ordermanagement.reactivekafkaservice;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import reactor.kafka.receiver.KafkaReceiver;
@Service
@Slf4j
public class Receiver {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() {
kafkaReceiver.receive()
.doOnNext(record -> log.info(String.format("##### -> Receiver receiving message: %s ", record.value())))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
}
package com.afp.ordermanagement.reactivekafkaservice;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@Service
@Slf4j
public class Sender {
@Autowired
private KafkaSender<String, String> kafkaEventProducer;
private static final String ORDER_TOPIC = "orders";
public void sendOrderIdToWarehouse(String id) {
log.info(String.format("##### -> Sender sending message: %s ", id));
ProducerRecord<String, String> record = new ProducerRecord<>(ORDER_TOPIC, id);
Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, id)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> {
if (null != uuidSenderResult.exception()) {
System.out.println("working");
}
});
working.doOnError(throwable -> log.error("some error")).subscribe();
}
}
package com.afp.ordermanagement.service;
import com.afp.ordermanagement.model.Order;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(topics = "managers", groupId = "group_id")
public void consumerManager(String message){
logger.info(String.format("#### -> Consumed message -> %s", message));
}
@KafkaListener(topics = "orders")
public void getOrderStatusFromWarehouse(String status) {
logger.info(String.format("#### -> Consumed order Status: %s", status));
}
@KafkaListener(topics = "inventory")
public void getInventoryQuantityUpdate(String quantity) {
logger.info(String.format("#### -> Consume inventory quantity update: %s", quantity));
}
}
package com.afp.ordermanagement.service;
import io.swagger.models.auth.In;
import org.apache.velocity.exception.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.config.ConfigDataResourceNotFoundException;
import org.springframework.stereotype.Service;
import org.springframework.kafka.core.KafkaTemplate;
import java.sql.SQLOutput;
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String MANAGER_TOPIC = "managers";
private static final String ORDER_TOPIC = "orders";
private static final String INVENTORY_TOPIC = "inventory";
private KafkaTemplate<String, String> kafkaTemplate;
public Producer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* DESC - Sending orderId as a message to ORDER_TOPIC for warehouse to consume
* @param id
* @throws ResourceNotFoundException
* @throws IllegalArgumentException
*/
public void sendOrderId(String id) {
try {
logger.info(String.format("#### -> Order id sent to warehouse: %s", id));
kafkaTemplate.send(ORDER_TOPIC, id);
} catch (ResourceNotFoundException e) {
logger.error("Order with that Id does not exist, exception caught: " + e);
} catch (IllegalArgumentException e) {
logger.error("Not a valid input, exception caught: " + e);
}
}
public void sendInventoryQuantity(String quantity) {
logger.info(String.format("#### -> Sending inventory quantity update: %s", quantity));
kafkaTemplate.send(INVENTORY_TOPIC, quantity);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment