package com.example.controller; import avro.Product; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderRecord; import java.text.SimpleDateFormat; @RestController public class SampleController { private static final Logger log = LoggerFactory.getLogger(SampleController.class.getName()); private static final String TOPIC = "ReactiveProduct"; KafkaSender kafkaSender; @Autowired public SampleController(KafkaSender kafkaSender) { this.kafkaSender = kafkaSender; } @PostMapping("/produce") public String sendMessages(@RequestBody Product product) { SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy"); SenderRecord msg = SenderRecord.create(new ProducerRecord<>(TOPIC, 100L, product), 1); Disposable value = kafkaSender.send(Mono.just(msg)).doOnError(e -> log.error("Send failed", e)) .subscribe(objectSenderResult -> { RecordMetadata recordMetadata = objectSenderResult.recordMetadata(); System.out.printf("Message sent successfully, topic-partition=%s-%d offset=%d \n", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); }); /* use createOutbound without record metadata*/ /* kafkaSender.createOutbound() .send(Flux.range(1, 10) .map(i -> new ProducerRecord("demo", "key", "Message_" + i))) .then() .doOnError(e -> e.printStackTrace()) .doOnSuccess(s -> System.out.println("Sends succeeded")) .subscribe(); */ return value.toString(); } }