package SpringMongoDBKafka.kafka.producer; import SpringMongoDBKafka.entity.Book; import SpringMongoDBKafka.entity.Post; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @Configuration public class ProducerKafkaConfig { @Value("${kafka.bootstrapAddress}") private String bootstrapAddress; @Value("${kafka.groupId}") private String groupId; @Bean public ProducerFactory<String, Book> bookProducerFactory() { Map<String, Object> map = new HashMap<>(); map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(map); } @Bean public KafkaTemplate<String,Book> kafkaTemplate() { return new KafkaTemplate<>(bookProducerFactory()); } @Bean public ProducerFactory<String, Post> postProducerFactory() { Map<String, Object> map = new HashMap<>(); map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(map); } @Bean public KafkaTemplate<String,Post> postKafkaTemplate() { return new KafkaTemplate<>(postProducerFactory()); } }