Commit 36c04e79 authored by Philippe Fonzin's avatar Philippe Fonzin

update order and send kafka message

parent 8233462e
...@@ -63,7 +63,7 @@ public class KafkaConfig { ...@@ -63,7 +63,7 @@ public class KafkaConfig {
@Bean @Bean
public KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer() { public KafkaSender<String, WarehouseOrderRequest> kafkaOMSProducer() {
SenderOptions<String, WarehouseOrderRequest> senderOptions = SenderOptions.create(producerFactory()); SenderOptions<String, WarehouseOrderRequest> senderOptions = SenderOptions.create(producerFactory());
return KafkaSender.create(senderOptions); return KafkaSender.create(senderOptions);
} }
......
...@@ -17,17 +17,19 @@ import reactor.kafka.sender.SenderResult; ...@@ -17,17 +17,19 @@ import reactor.kafka.sender.SenderResult;
public class Sender { public class Sender {
@Autowired @Autowired
private KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer; private KafkaSender<String, WarehouseOrderRequest> kafkaOMSProducer;
@Autowired @Autowired
private KafkaSender<String, WarehouseOrderResponse> kafkaUpdateEventProducer; private KafkaSender<String, WarehouseOrderResponse> kafkaUpdateEventProducer;
private static final String TOPIC = "test_topic"; private static final String TOPIC = "warehouse_management";
private static final String OMS = "order_management";
public void sendOrder(WarehouseOrderRequest currentOrder) { public void sendOrder(WarehouseOrderRequest currentOrder) {
ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(TOPIC, currentOrder); ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(OMS, currentOrder);
Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder))) Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaOMSProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
.doOnError(throwable -> System.out.println(throwable)) .doOnError(throwable -> System.out.println(throwable))
.doOnNext(t -> { .doOnNext(t -> {
if (null != t.exception()) { if (null != t.exception()) {
......
...@@ -5,4 +5,4 @@ spring.data.mongodb.database=test ...@@ -5,4 +5,4 @@ spring.data.mongodb.database=test
kafka.producer.bootstrap-servers: localhost:9092 kafka.producer.bootstrap-servers: localhost:9092
kafka.producer.acks: all kafka.producer.acks: all
kafka.consumer.group-id: WAREHOUSE_MANAGEMENT kafka.consumer.group-id: WAREHOUSE_MANAGEMENT
kafka.topic.input: test_topic kafka.topic.input: order_management
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment