Commit e70dd459 authored by Sarika Sama's avatar Sarika Sama

integrating kafka

parent 940b786c
......@@ -22,6 +22,7 @@ public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFatory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
......
......@@ -9,6 +9,7 @@ import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaListeners {
@Value("${order_topic}")
private String topicName;
......
......@@ -9,6 +9,7 @@ import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
......
......@@ -18,6 +18,7 @@ public class KafkaProducerConfig {
@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
......
......@@ -17,9 +17,10 @@ public class UserController {
@Autowired
UserService userService;
@GetMapping("/users")
@GetMapping("/publish-users")
public Flux<User> getUsers() {
return userService.getUsers();
Flux<User> user= userService.fetchUsersAndPublish();
return user;
}
}
......
package com.nisum.webfluxmongodbordermanagement.controller;
import com.nisum.webfluxmongodbordermanagement.config.KafkaProducer;
import com.nisum.webfluxmongodbordermanagement.entity.User;
import com.nisum.webfluxmongodbordermanagement.entity.UserOrders;
import com.nisum.webfluxmongodbordermanagement.service.UserOrdersService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.List;
@RestController
@Slf4j
public class UserOrdersController {
@Autowired
private UserOrdersService userOrdersService;
@Autowired
private KafkaProducer kafkaProducer;
@Value("${order_topic}")
private String topicName;
@GetMapping("/user-orders")
public Flux<UserOrders> getAllUsersWithOrders(){
public Flux<UserOrders> fetchUserOrdersAndPublish(){
Flux<UserOrders> userOrdersFlux = userOrdersService.getAllUsersWithOrders();
Mono<List<UserOrders>> usersMono = userOrdersFlux.collectList()
.subscribeOn(Schedulers.boundedElastic());
usersMono
.subscribe(
resultList -> {
log.info("sending userOrders to kafka");
for (UserOrders user : resultList) {
kafkaProducer.sendMessage( topicName, "userOrders-"+user.getUserId(), user.toString());
}
},
throwable -> log.error("Error occurred: {}", throwable.getMessage())
);
return userOrdersFlux;
}
......
......@@ -28,6 +28,7 @@ public class OrdersService {
private String topicName;
public Flux<Order> fetchOrdersAndPublish(){
Flux<Order> orders = orderRepository.findAll();
Mono<List<Order>> ordersMono = orders.collectList()
......
......@@ -18,12 +18,12 @@ public class UserOrdersService {
ReactiveMongoTemplate reactiveMongoTemplate;
public Flux<UserOrders> getAllUsersWithOrders() {
LookupOperation lookupOperation = LookupOperation.newLookup().
from("orders").
localField("userId").
foreignField("userId").
as("order");
Aggregation aggregation = Aggregation.newAggregation(lookupOperation);
return reactiveMongoTemplate.aggregate(aggregation, "user", UserOrders.class);
}
......
package com.nisum.webfluxmongodbordermanagement.service;
import com.nisum.webfluxmongodbordermanagement.config.KafkaProducer;
import com.nisum.webfluxmongodbordermanagement.entity.User;
import com.nisum.webfluxmongodbordermanagement.repository.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.List;
@Service
@Slf4j
public class UserService {
@Autowired
private UserRepository userRepository;
public Flux<User> getUsers(){
return userRepository.findAll();
}
// public Flux<User> getUsers(){
//
// return userRepository.findAll();
// }
@Autowired
private KafkaProducer kafkaProducer;
@Value("${order_topic}")
private String topicName;
public Flux<User> fetchUsersAndPublish(){
Flux<User> users = userRepository.findAll();
Mono<List<User>> usersMono = users.collectList()
.subscribeOn(Schedulers.boundedElastic());
usersMono
.subscribe(
resultList -> {
log.info("sending users to kafka");
for (User user : resultList) {
kafkaProducer.sendMessage( topicName, "user-"+user.getUserId(), user.toString());
}
},
throwable -> log.error("Error occurred: {}", throwable.getMessage())
);
return users;
}
}
\ No newline at end of file
server.port=9090
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=user-orders-db
order_topic=order_topic
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment