Commit 178f5e5b authored by Lokesh Singh's avatar Lokesh Singh

spring boot and kafka poc

parents
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
/mvnw.bat
/mvnw.cmd
/mvnw
/.mvn/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/
This diff is collapsed.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lokesh.spring.kafka</groupId>
<artifactId>spring-boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-kafka</name>
<description>POC project for Spring Boot Kafka</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.lokesh.spring.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApplication.class, args);
}
}
package com.lokesh.spring.kafka.config;
import com.lokesh.spring.kafka.model.SuperHero;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${spring.kafka.consumer.bootstrap-servers: localhost:9092}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id: group_id}")
private String groupId;
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// Json Consumer
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
@Bean
public ConsumerFactory<String, SuperHero> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(SuperHero.class));
}
@Bean
public <T> ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerJsonFactory() {
ConcurrentKafkaListenerContainerFactory<String, SuperHero> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
factory.setBatchListener(true);
return factory;
}
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// String Consumer
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
@Bean
public ConsumerFactory<String, String> stringConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerStringFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(stringConsumerFactory());
factory.setBatchListener(true);
return factory;
}
@Bean
public KafkaListenerErrorHandler myTopicErrorHandler() {
return (m, e) -> {
logger.error("❌Got an error {}", e.getMessage());
return "some info about the failure";
};
}
}
package com.lokesh.spring.kafka.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers: localhost:9092}")
private String bootstrapServers;
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// Json Producer
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
@Bean
public <T> ProducerFactory<String, T> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// **___ configure the following three settings for SSL Encryption ___**
//configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
//configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.truststore.jks");
//configProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
// **___configure the following three settings for SSL Authentication ___**
//configProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.keystore.jks");
//configProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
//configProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Primary
@Bean
public <T> KafkaTemplate<String, T> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// String Producer
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
@Bean
public ProducerFactory<String, String> producerStringFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// **___ configure the following three settings for SSL Encryption ___**
//configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
//configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.truststore.jks");
//configProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
// **___configure the following three settings for SSL Authentication ___**
//configProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.keystore.jks");
//configProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
//configProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaStringTemplate() {
return new KafkaTemplate<>(producerStringFactory());
}
}
package com.lokesh.spring.kafka.controllers;
import com.lokesh.spring.kafka.model.SuperHero;
import com.lokesh.spring.kafka.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
@Autowired
private ProducerService<SuperHero> producerService;
@PostMapping(value = "/publish/message")
public String sendMessageToKafkaTopic(@RequestParam("message") String message) {
producerService.sendMessage(message);
return "🍻🍻Successfully publisher message..!";
}
@PostMapping(value = "/publish")
public Map<String, Object> sendObjectToKafkaTopic(@RequestBody SuperHero superHero) {
producerService.sendSuperHeroMessage(superHero);
Map<String, Object> map = new HashMap<>();
map.put("message", "🍻🍻🍻Successfully publisher Super Hero..!");
map.put("payload", superHero);
return map;
}
}
package com.lokesh.spring.kafka.model;
public class Message {
private static final long serialVersionUID = 1L;
private String message;
public Message() {}
public Message(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
package com.lokesh.spring.kafka.model;
import java.io.Serializable;
public class SuperHero implements Serializable {
private static final long serialVersionUID = 1L;
private String name;
private String superName;
private String profession;
private int age;
private boolean canFly;
public SuperHero() { }
public SuperHero(String name, String superName, String profession, int age, boolean canFly) {
super();
this.name = name;
this.superName = superName;
this.profession = profession;
this.age = age;
this.canFly = canFly;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSuperName() {
return superName;
}
public void setSuperName(String superName) {
this.superName = superName;
}
public String getProfession() {
return profession;
}
public void setProfession(String profession) {
this.profession = profession;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public boolean isCanFly() {
return canFly;
}
public void setCanFly(boolean canFly) {
this.canFly = canFly;
}
@Override
public String toString() {
return "SuperHero [name=" + name + ", superName=" + superName
+ ", profession=" + profession + ", age=" + age + ", canFly="
+ canFly + "]";
}
}
\ No newline at end of file
package com.lokesh.spring.kafka.service;
import com.lokesh.spring.kafka.model.SuperHero;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
private final Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = {"${spring.kafka.topic}"}, containerFactory = "kafkaListenerStringFactory", groupId = "group_id")
public void consumeMessage(String message) {
logger.info("**** -> Consumed message -> {}", message);
}
@KafkaListener(topics = {"${spring.kafka.superhero-topic}"}, containerFactory = "kafkaListenerJsonFactory", groupId = "group_id")
public void consumeSuperHero(SuperHero superHero) {
logger.info("**** -> 😋😋 Consumed Super Hero :: {}", superHero);
}
}
package com.lokesh.spring.kafka.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class ProducerService<T> {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${spring.kafka.topic}")
private String topic;
@Value("${spring.kafka.superhero-topic}")
private String superHeroTopic;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaTemplate<String, T> kafkaTemplateSuperHero;
public void sendMessage(String message) {
logger.info("#### -> Publishing message -> {}", message);
kafkaTemplate.send(topic, message);
}
public void sendSuperHeroMessage(T superHero) {
logger.info("#### -> Publishing SuperHero :: {}", superHero);
kafkaTemplateSuperHero.send(superHeroTopic, superHero);
}
}
server:
port: 8095
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
# auto-offset-reset: earliest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic: message-topic
superhero-topic: superhero-topic
\ No newline at end of file
package com.lokesh.spring.kafka;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootKafkaApplicationTests {
@Test
public void contextLoads() {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment