Commit 00ce5455 authored by Potharaju Peddi's avatar Potharaju Peddi

for webflux reactive kafka with mongoDB

parent cd6e981e
...@@ -2,11 +2,22 @@ package com.WebFluxReactiveMongo.Reactor.Controller; ...@@ -2,11 +2,22 @@ package com.WebFluxReactiveMongo.Reactor.Controller;
import com.WebFluxReactiveMongo.Reactor.Dto.StudentDto; import com.WebFluxReactiveMongo.Reactor.Dto.StudentDto;
import com.WebFluxReactiveMongo.Reactor.Service.StudentService; import com.WebFluxReactiveMongo.Reactor.Service.StudentService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import java.util.HashMap;
import java.util.Map;
@RestController @RestController
...@@ -16,6 +27,22 @@ public class StudentController { ...@@ -16,6 +27,22 @@ public class StudentController {
@Autowired @Autowired
private StudentService studentService; private StudentService studentService;
private static final String TOPIC = "TestTopic1";
public static KafkaSender sampleProducer() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
SenderOptions<String, JsonNode> senderOptions = SenderOptions.create(props);
return KafkaSender.create(senderOptions);
}
@GetMapping @GetMapping
public Flux<StudentDto> getStudents(){ public Flux<StudentDto> getStudents(){
return studentService.getStudents(); return studentService.getStudents();
...@@ -52,4 +79,23 @@ public class StudentController { ...@@ -52,4 +79,23 @@ public class StudentController {
return studentService.deleteStudent(id); return studentService.deleteStudent(id);
} }
@GetMapping("/publishReactorJson")
public Mono<StudentDto> sendMessages(@RequestBody StudentDto studentDto) throws InterruptedException, JsonProcessingException {
Mono<StudentDto> stockDTO =studentService.saveStudent(Mono.just(studentDto));
return stockDTO.flatMap(data -> sendToKafka(data, "create"));
}
private Mono<StudentDto> sendToKafka(StudentDto studentDto, String create) {
return StudentController.sampleProducer().createOutbound()
.send(Mono.just(new ProducerRecord<>("TestTopic1",
"dto.getSalt()", studentDto))).then().log()
.doOnError(e -> log.error(
String.format("Failed to send topic: %s value: %s",
"TestTopic1", studentDto), e))
.thenReturn(studentDto);
}
} }
package com.WebFluxReactiveMongo.Reactor.kafka; package com.WebFluxReactiveMongo.Reactor.kafka;
import com.WebFluxReactiveMongo.Reactor.Dto.StudentDto; import com.WebFluxReactiveMongo.Reactor.Dto.StudentDto;
import com.WebFluxReactiveMongo.Reactor.Entity.Student;
import com.WebFluxReactiveMongo.Reactor.Repository.StudentRepository; import com.WebFluxReactiveMongo.Reactor.Repository.StudentRepository;
import com.WebFluxReactiveMongo.Reactor.Service.StudentService; import com.WebFluxReactiveMongo.Reactor.Service.StudentService;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
...@@ -55,15 +56,11 @@ public class SampleProducer { ...@@ -55,15 +56,11 @@ public class SampleProducer {
public void sendMessages(String topic, int count, CountDownLatch latch) throws InterruptedException { public void sendMessages(String topic, int count, CountDownLatch latch) throws InterruptedException {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
//Flux<StudentDto> studentDto = Flux.just(new StudentDto("106","AADHI","DIYA",50000),
//new StudentDto("107","VASU","KIYA",45000));
sender.<Integer>send(Flux.range(1, 1) sender.<Integer>send(Flux.range(1, 1)
//.map(i -> SenderRecord.create(new ProducerRecord<>(topic, i, "Message_" + i), i)))
.map(i -> { .map(i -> {
JsonNode jsonValue = null; JsonNode jsonValue = null;
try { try {
Mono<StudentDto> studentDtoMono=Mono.just(new StudentDto("5855","Kafka Producer message","Nisum School",60000)); String value = objectMapper.writeValueAsString(new Student("5855","Kafka Producer message","Nisum School",60000));
String value = objectMapper.writeValueAsString(studentDtoMono);
jsonValue = objectMapper.readTree(value); jsonValue = objectMapper.readTree(value);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment