Commit 7b47da44 authored by Ramakrushna Sahoo's avatar Ramakrushna Sahoo

update file with some changes

parent 4c7a2e14
...@@ -7,6 +7,7 @@ import com.main.producer.EmployeeProducer; ...@@ -7,6 +7,7 @@ import com.main.producer.EmployeeProducer;
import com.main.service.EmployeeService; import com.main.service.EmployeeService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
...@@ -27,7 +28,7 @@ public class EmployeeController { ...@@ -27,7 +28,7 @@ public class EmployeeController {
@GetMapping("/getEmpById/{id}") @GetMapping("/getEmpById/{id}")
public Mono<Employee> getEmployeeById(@PathVariable Integer id){ public Mono<ResponseEntity<Employee>> getEmployeeById(@PathVariable Integer id){
Mono<Employee> mono= service.getById(id) Mono<Employee> mono= service.getById(id)
.doOnNext(emp-> { .doOnNext(emp-> {
try { try {
...@@ -41,7 +42,8 @@ public class EmployeeController { ...@@ -41,7 +42,8 @@ public class EmployeeController {
return mono; return mono.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
} }
@GetMapping("/getAll") @GetMapping("/getAll")
...@@ -64,18 +66,18 @@ public class EmployeeController { ...@@ -64,18 +66,18 @@ public class EmployeeController {
.doOnNext(value -> log.info("Employee data fetched successfully: {}", value)) .doOnNext(value -> log.info("Employee data fetched successfully: {}", value))
.doOnError(error -> log.error("Data not found", error)); .doOnError(error -> log.error("Data not found", error));
// Collect employees into a list and send to producer // Collect employees into a list and send to producer
// flux.collectList() flux.collectList()
// .doOnNext(list -> { .doOnNext(list -> {
// try { try {
// producer.sendEmployee(list); producer.sendEmployee(list);
// } catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
// throw new RuntimeException(e); throw new RuntimeException(e);
// } }
// log.info("Employee list sent to producer: {}", list); log.info("Employee list sent to producer: {}", list);
// }) })
// .doOnError(error -> log.error("Error while sending employee list: {}", error)) .doOnError(error -> log.error("Error while sending employee list: {0}", error))
// .subscribe(); .subscribe();
return flux; // Return the Flux for further processing return flux; // Return the Flux for further processing
} }
......
...@@ -67,6 +67,8 @@ public class EmployeeProducer { ...@@ -67,6 +67,8 @@ public class EmployeeProducer {
} }
}); });
} }
public CompletableFuture<SendResult<Integer, String>> sendEmployee(Employee emp) throws JsonProcessingException { public CompletableFuture<SendResult<Integer, String>> sendEmployee(Employee emp) throws JsonProcessingException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment