Commit 394fa93b authored by Sarika Sama's avatar Sarika Sama

integrating kafka

parent 940b786c
...@@ -22,6 +22,7 @@ public class KafkaConsumerConfig { ...@@ -22,6 +22,7 @@ public class KafkaConsumerConfig {
@Bean @Bean
public ConsumerFactory<String, String> consumerFatory() { public ConsumerFactory<String, String> consumerFatory() {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
......
...@@ -9,6 +9,7 @@ import org.springframework.stereotype.Component; ...@@ -9,6 +9,7 @@ import org.springframework.stereotype.Component;
@Component @Component
@Slf4j @Slf4j
public class KafkaListeners { public class KafkaListeners {
@Value("${order_topic}") @Value("${order_topic}")
private String topicName; private String topicName;
......
...@@ -9,6 +9,7 @@ import org.springframework.stereotype.Component; ...@@ -9,6 +9,7 @@ import org.springframework.stereotype.Component;
@Component @Component
@Slf4j @Slf4j
public class KafkaProducer { public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate; private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired @Autowired
......
...@@ -18,6 +18,7 @@ public class KafkaProducerConfig { ...@@ -18,6 +18,7 @@ public class KafkaProducerConfig {
@Bean @Bean
ProducerFactory<String, String> producerFactory() { ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
......
...@@ -28,6 +28,7 @@ public class OrdersService { ...@@ -28,6 +28,7 @@ public class OrdersService {
private String topicName; private String topicName;
public Flux<Order> fetchOrdersAndPublish(){ public Flux<Order> fetchOrdersAndPublish(){
Flux<Order> orders = orderRepository.findAll(); Flux<Order> orders = orderRepository.findAll();
Mono<List<Order>> ordersMono = orders.collectList() Mono<List<Order>> ordersMono = orders.collectList()
......
...@@ -18,6 +18,7 @@ public class UserOrdersService { ...@@ -18,6 +18,7 @@ public class UserOrdersService {
ReactiveMongoTemplate reactiveMongoTemplate; ReactiveMongoTemplate reactiveMongoTemplate;
public Flux<UserOrders> getAllUsersWithOrders() { public Flux<UserOrders> getAllUsersWithOrders() {
LookupOperation lookupOperation = LookupOperation.newLookup(). LookupOperation lookupOperation = LookupOperation.newLookup().
from("orders"). from("orders").
localField("userId"). localField("userId").
......
...@@ -14,6 +14,7 @@ public class UserService { ...@@ -14,6 +14,7 @@ public class UserService {
private UserRepository userRepository; private UserRepository userRepository;
public Flux<User> getUsers(){ public Flux<User> getUsers(){
return userRepository.findAll(); return userRepository.findAll();
} }
......
server.port=9090 server.port=9090
spring.data.mongodb.host=localhost spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017 spring.data.mongodb.port=27017
spring.data.mongodb.database=user-orders-db spring.data.mongodb.database=user-orders-db
order_topic=order_topic order_topic=order_topic
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment