Commit af6377ce authored by John Lam's avatar John Lam

add kafka recieverconfig and kafkalistner to listen to topic

parent 8b421221
......@@ -25,11 +25,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.kafka</groupId>-->
<!-- <artifactId>spring-kafka</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
......
package com.nisum.ascend.inventory.configuration;
import lombok.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfig {
}
package com.nisum.ascend.inventory.configuration;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class KafkaReceiverConfig {
@Value("${kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Bean
public KafkaReceiver<String, String> kafkaOrderEventReceiver(
@Value("${kafka.topic.input}") String posLogTopic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(receiverConfig());
receiverOptions.maxCommitAttempts(3);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(posLogTopic)));
}
private Map<String, Object> receiverConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
}
\ No newline at end of file
package com.nisum.ascend.inventory.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import reactor.kafka.receiver.KafkaReceiver;
@Component
@Slf4j
public class KafkaListenerService {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@EventListener(ApplicationStartedEvent.class)
public void consumeOrderMessages() {
kafkaReceiver.receive()
.doOnNext(record -> System.out.println(record))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
}
server.port=8080
spring.data.mongodb.uri=mongodb+srv://admin:${db.password}@inventory-promotions.d4nfz.mongodb\
.net/${spring.data.mongodb.database}?retryWrites=true&w=majority
spring.data.mongodb.database=products-promotions-DB
kafka.consumer.acks=all
kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.group-id=PRODUCT
kafka.topic.input=my-messages
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment