Commit 4c7a2e14 authored by Ramakrushna Sahoo's avatar Ramakrushna Sahoo

Initial commit

parent 6c0abe6d
commit 6c0abe6d77f394ecf996d89673fb5052675a0463
Author: Ramakrushna Sahoo <rasahoo@nisum.com>
Date: Wed Oct 9 16:26:34 2024 +0530
initial commit
......@@ -28,7 +28,20 @@ public class EmployeeController {
@GetMapping("/getEmpById/{id}")
public Mono<Employee> getEmployeeById(@PathVariable Integer id){
return service.getById(id);
Mono<Employee> mono= service.getById(id)
.doOnNext(emp-> {
try {
producer.sendEmployee(emp);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.doOnNext(value -> log.info("Employee data get by id successfully: {}", value))
.doOnError(error -> log.error("Data not found", error));
return mono;
}
@GetMapping("/getAll")
......@@ -52,19 +65,22 @@ public class EmployeeController {
.doOnError(error -> log.error("Data not found", error));
// Collect employees into a list and send to producer
flux.collectList()
.doOnNext(list -> {
try {
producer.sendEmployee(list);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
log.info("Employee list sent to producer: {}", list);
})
.doOnError(error -> log.error("Error while sending employee list: {}", error))
.subscribe();
// flux.collectList()
// .doOnNext(list -> {
// try {
// producer.sendEmployee(list);
// } catch (JsonProcessingException e) {
// throw new RuntimeException(e);
// }
// log.info("Employee list sent to producer: {}", list);
// })
// .doOnError(error -> log.error("Error while sending employee list: {}", error))
// .subscribe();
return flux; // Return the Flux for further processing
}
}
......@@ -28,6 +28,9 @@ public class EmployeeProducer {
this.objectMapper = objectMapper;
}
public CompletableFuture<SendResult<Integer, String>> sendEmployee(List<Employee> emp) throws JsonProcessingException {
if (emp.isEmpty()) {
throw new IllegalArgumentException("Employee list cannot be empty");
......@@ -39,6 +42,9 @@ public class EmployeeProducer {
return sendWithRetry(key, value, 3, 1000); // Retry up to 3 times with a backoff of 1000 ms
}
private CompletableFuture<SendResult<Integer, String>> sendWithRetry(Integer key, String value, int retries, long backoff) {
return kafkaTemplate.send(topic, key, value).handle((sendResult, throwable) -> {
if (throwable != null) {
......@@ -61,6 +67,23 @@ public class EmployeeProducer {
}
});
}
public CompletableFuture<SendResult<Integer, String>> sendEmployee(Employee emp) throws JsonProcessingException {
var key = emp.getId();
var value = objectMapper.writeValueAsString(emp);
return kafkaTemplate.send(topic, key , value);// Retry up to 3 times with a backoff of 1000 ms
}
private void handleSuccess(Integer key, String value ) {
log.info("Message sent successfully for the key: {}, value: {} ", key, value);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment