Commit b253dadb authored by Philippe Fonzin's avatar Philippe Fonzin

reactive kafka intergration

parent b40629b8
......@@ -74,6 +74,19 @@
<artifactId>httpclient</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
</dependencies>
<build>
......
package com.ascendfinalproject.warehouse.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.producer.acks}")
private String acks;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Bean
public Map<String, Object> consumerFactory() {
Map<String, Object> receiverConfigProps = new HashMap<>();
receiverConfigProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
receiverConfigProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
receiverConfigProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
receiverConfigProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return receiverConfigProps;
}
@Bean
public KafkaReceiver<String, String> kafkaEventReceiver(@Value("${kafka.topic.input}") String topic) {
// creates specified config options for kafkaReceiver using consumerFactory
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerFactory());
receiverOptions.maxCommitAttempts(5);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator).subscription(Collections.singleton(topic)));
}
@Bean
public Map<String, Object> producerFactory() {
Map<String, Object> senderConfigProps = new HashMap<>();
senderConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
senderConfigProps.put(ProducerConfig.ACKS_CONFIG, acks);
senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return senderConfigProps;
}
@Bean
public KafkaSender<String, String> kafkaEventProducer() {
// creates specified config options for kafkaSender using producerFactory
SenderOptions<String, String> senderOptions = SenderOptions.create(producerFactory());
return KafkaSender.create(senderOptions);
}
}
package com.ascendfinalproject.warehouse.controllers;
import com.ascendfinalproject.warehouse.services.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/fulfilled")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.orderFulfilled(message);
}
}
package com.ascendfinalproject.warehouse.controllers;
import com.ascendfinalproject.warehouse.kafkaservice.Sender;
import com.ascendfinalproject.warehouse.models.OrderResponse;
import com.ascendfinalproject.warehouse.models.WarehouseOrder;
import com.ascendfinalproject.warehouse.services.WarehouseOrderService;
......@@ -16,6 +17,15 @@ public class WarehouseController {
@Autowired
WarehouseOrderService orderService;
@Autowired
Sender sender;
@PostMapping("/fulfilled")
public void getOrderStatusFromWarehouse() {
sender.sendOrderStatus("fulfilled");
}
@CrossOrigin
@GetMapping(value = "/orders")
public Flux<WarehouseOrder> getOrders() {
......
package com.ascendfinalproject.warehouse.kafkaservice;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import reactor.kafka.receiver.KafkaReceiver;
@Service
@Slf4j
public class Receiver {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@EventListener(ApplicationStartedEvent.class)
public void consumeNewOrder() {
kafkaReceiver.receive()
.doOnNext(record -> log.info(String.format("Receive message: %s ", record.value())))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
}
package com.ascendfinalproject.warehouse.kafkaservice;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@Service
@Slf4j
public class Sender {
@Autowired
private KafkaSender<String, String> kafkaEventProducer;
private static final String TOPIC = "order";
public void sendOrderStatus(String status) {
log.info(String.format("Sender message: %s ", status));
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, status);
Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, status)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> {
if (null != uuidSenderResult.exception()) {
System.out.println("it works!");
}
});
working.doOnError(throwable -> log.error("error")).subscribe();
}
}
package com.ascendfinalproject.warehouse.services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
// this is placeholder
@KafkaListener(topics = "fulfilled", groupId = "WAREHOUSE_MANAGEMENT")
public void consume(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
}
package com.ascendfinalproject.warehouse.services;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
private Map<String, Object> consumerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, "WAREHOUSE_MANAGEMENT");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
}
package com.ascendfinalproject.warehouse.services;
import com.fasterxml.jackson.databind.JsonSerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
private Map<String, Object> producerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return config;
}
}
package com.ascendfinalproject.warehouse.services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String FULFILLED = "fulfilled";
private static final String CANCELLED = "cancelled";
@Autowired
// publish messages to the topic
private KafkaTemplate<String, String> kafkaTemplate;
public void orderFulfilled(String message) {
logger.info(String.format("#### -> this order is fulfilled -> %s", message));
this.kafkaTemplate.send(FULFILLED, message);
}
public void orderCancelled(String message) {
logger.info(String.format("#### -> this order is cancelled -> %s", message));
this.kafkaTemplate.send(CANCELLED, message);
}
}
spring.data.mongodb.uri=mongodb+srv://warehouse1:ascendWarehouseProject@warehouse-cluster.xopll.mongodb.net/myFirstDatabase?retryWrites=true&w=majority
spring.data.mongodb.database=test
#server:
# port: 9000
#spring:
# kafka:
# consumer:
# bootstrap-servers: localhost:9092
# group-id: WAREHOUSE_MANAGEMENT
# auto-offset-reset: earliest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# producer:
# bootstrap-servers: localhost:9092
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
\ No newline at end of file
kafka.producer.bootstrap-servers: localhost:9092
kafka.producer.acks: all
kafka.consumer.group-id: WAREHOUSE_MANAGEMENT
kafka.topic.input: test_topic
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment