Commit 0ec76acb authored by NareshKodumoori's avatar NareshKodumoori

Adding two consumers

parent bda2b214
...@@ -18,9 +18,11 @@ import java.util.Map; ...@@ -18,9 +18,11 @@ import java.util.Map;
@Configuration @Configuration
public class KafkaConfiguration { public class KafkaConfiguration {
private Map<String, Object> config;
@Bean @Bean
public ConsumerFactory<String, String> consumerFactory(){ public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> config = new HashMap<>(); config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...@@ -37,12 +39,14 @@ public class KafkaConfiguration { ...@@ -37,12 +39,14 @@ public class KafkaConfiguration {
@Bean @Bean
public ConsumerFactory<String, User> userConsumerFactory() { public ConsumerFactory<String, User> userConsumerFactory() {
Map<String, Object> config = new HashMap<>(); config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json"); config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(User.class)); return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(User.class, false));
} }
@Bean @Bean
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment