Commit 952a69e5 authored by ppapili's avatar ppapili

Adding changes related to Kafka-mongo poc

parent 0ab45416
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.poc.kafka.mongo</groupId>
<artifactId>kafka-mongo-poc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-mongo-poc</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>tomcat-embed-el</artifactId>
<groupId>org.apache.tomcat.embed</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.poc.kafka.mongo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaPocApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaPocApplication.class, args);
}
}
package com.poc.kafka.mongo.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String server;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<String, String>(config);
}
@Bean
KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFatory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFatory());
concurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false);
return concurrentKafkaListenerContainerFactory;
}
}
package com.poc.kafka.mongo.controller;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.validation.Valid;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.poc.kafka.mongo.model.Emp;
import com.poc.kafka.mongo.service.EmployeeService;
@RestController
@Validated
@RequestMapping(path = "/employees")
public class EmployeeController {
@Autowired
private EmployeeService employeeService;
@PostMapping(produces = "application/json", consumes = "application/json", path = "/produce")
public void publishEmployee(@Valid @RequestBody Emp emp) throws InterruptedException, ExecutionException {
employeeService.send(emp);
}
@GetMapping(produces = "application/json", consumes = "application/json", path = "/empId/{empId}")
public ResponseEntity<Emp> findByEmpId(@Valid @PathVariable Long empId) {
return new ResponseEntity<>(employeeService.findByEmpId(empId), HttpStatus.OK);
}
@GetMapping(produces = "application/json", consumes = "application/json", path = "/")
public ResponseEntity<List<Emp>> findAll() {
return new ResponseEntity<>(employeeService.findAll(), HttpStatus.OK);
}
@GetMapping(produces = "application/json", consumes = "application/json", path = "/id/{id}")
public ResponseEntity<Emp> findById(@Valid @PathVariable String id) {
return new ResponseEntity<>(employeeService.findById(id), HttpStatus.OK);
}
@GetMapping(produces = "application/json", consumes = "application/json", path = "/sal/{sal}")
public ResponseEntity<List<Emp>> findBySal(@Valid @PathVariable Double sal) {
return new ResponseEntity<>(employeeService.findBySalary(sal), HttpStatus.OK);
}
@PostMapping(produces = "application/json", consumes = "application/json", path = "/upsert")
public ResponseEntity<Emp> saveOrUpdateEmployee(@Valid @RequestBody Emp emp) {
employeeService.saveOrUpdate(emp);
return new ResponseEntity<>(HttpStatus.OK);
}
@PostMapping(produces = "application/json", consumes = "application/json", path = "/save")
public ResponseEntity<Emp> saveEmployee(@Valid @RequestBody Emp emp) {
return new ResponseEntity<>(employeeService.save(emp), HttpStatus.OK);
}
@DeleteMapping(produces = "application/json", consumes = "application/json", path = "/delete/{empId}")
public ResponseEntity<Long> deleteByEmpId(@PathVariable Long empId) {
return new ResponseEntity<>(employeeService.deleteByEmpId(empId), HttpStatus.OK);
}
}
package com.poc.kafka.mongo.entity;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
@Document(collection = "employee")
public class Employee {
@Id
private String _id;
@Field(name = "emp_id")
private Long empId;
@Field(name = "emp_name")
private String empName;
@Field(name = "emp_sal")
private Double empSal;
@Field(name = "dept_id")
private String deptId;
@Field(name = "dept_name")
private String deptName;
public Employee() {
super();
}
/**
* @return the _id
*/
public String get_id() {
return _id;
}
/**
* @param _id the _id to set
*/
public void set_id(String _id) {
this._id = _id;
}
/**
* @return the empId
*/
public Long getEmpId() {
return empId;
}
/**
* @param empId the empId to set
*/
public void setEmpId(Long empId) {
this.empId = empId;
}
/**
* @return the empName
*/
public String getEmpName() {
return empName;
}
/**
* @param empName the empName to set
*/
public void setEmpName(String empName) {
this.empName = empName;
}
/**
* @return the empSal
*/
public Double getEmpSal() {
return empSal;
}
/**
* @param empSal the empSal to set
*/
public void setEmpSal(Double empSal) {
this.empSal = empSal;
}
/**
* @return the deptId
*/
public String getDeptId() {
return deptId;
}
/**
* @param deptId the deptId to set
*/
public void setDeptId(String deptId) {
this.deptId = deptId;
}
/**
* @return the deptName
*/
public String getDeptName() {
return deptName;
}
/**
* @param deptName the deptName to set
*/
public void setDeptName(String deptName) {
this.deptName = deptName;
}
}
package com.poc.kafka.mongo.mapper;
import java.util.Optional;
import org.springframework.stereotype.Component;
import com.poc.kafka.mongo.entity.Employee;
import com.poc.kafka.mongo.model.Emp;
@Component
public class EmployeeMapper implements EntityMapper {
@Override
public Object mapPojoToEntity(Object pojo) {
Optional<Object> obj=Optional.ofNullable(pojo);
if(obj.isPresent()) {
Emp emp=(Emp)pojo;
Employee employee=new Employee();
employee.setDeptId(emp.getDeptId());
employee.setDeptName(emp.getDeptName());
employee.setEmpId(emp.getEmpId());
employee.setEmpName(emp.getEmpName());
employee.setEmpSal(emp.getEmpSal());
return employee;
}
return null;
}
@Override
public Object mapEntityToPojo(Object entity) {
Optional<Object> obj=Optional.ofNullable(entity);
if(obj.isPresent()) {
Employee employee=(Employee)entity;
Emp emp=new Emp();
emp.setDeptId(employee.getDeptId());
emp.setDeptName(employee.getDeptName());
emp.setEmpId(employee.getEmpId());
emp.setEmpName(employee.getEmpName());
emp.setEmpSal(employee.getEmpSal());
return emp;
}
return null;
}
}
package com.poc.kafka.mongo.mapper;
public interface EntityMapper {
public Object mapPojoToEntity(Object pojo);
public Object mapEntityToPojo(Object entity);
}
package com.poc.kafka.mongo.model;
import javax.validation.constraints.NotNull;
public class Emp {
@NotNull(message = "empID should not be null")
private Long empId;
@NotNull(message = "empName should not be null")
private String empName;
@NotNull(message = "empSal should not be null")
private Double empSal;
@NotNull(message = "deptId should not be null")
private String deptId;
@NotNull(message = "deptName should not be null")
private String deptName;
public Emp() {
super();
}
/**
* @return the empId
*/
public Long getEmpId() {
return empId;
}
/**
* @param empId the empId to set
*/
public void setEmpId(Long empId) {
this.empId = empId;
}
/**
* @return the empName
*/
public String getEmpName() {
return empName;
}
/**
* @param empName the empName to set
*/
public void setEmpName(String empName) {
this.empName = empName;
}
/**
* @return the empSal
*/
public Double getEmpSal() {
return empSal;
}
/**
* @param empSal the empSal to set
*/
public void setEmpSal(Double empSal) {
this.empSal = empSal;
}
/**
* @return the deptId
*/
public String getDeptId() {
return deptId;
}
/**
* @param deptId the deptId to set
*/
public void setDeptId(String deptId) {
this.deptId = deptId;
}
/**
* @return the deptName
*/
public String getDeptName() {
return deptName;
}
/**
* @param deptName the deptName to set
*/
public void setDeptName(String deptName) {
this.deptName = deptName;
}
}
package com.poc.kafka.mongo.repository;
import java.util.stream.Stream;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.stereotype.Repository;
import com.poc.kafka.mongo.entity.Employee;
@Repository
public interface EmployeeMongoRepository extends MongoRepository<Employee, String> {
@Query(value="{emp_id : ?0}",sort = "{emp_id : 1}")
public Employee findByEmpId(Long empId);
/**
* -1 -decending
* 1 ascending
* @param salary
* @return
*/
@Query(value="{emp_sal : {$gte: ?0}}",sort = "{emp_sal : -1}")
public Stream<Employee> findBySalary(Double salary);
@Query(value = "{emp_id : ?0}", delete = true)
public Long deleteByEmployeeId(Long empId);
}
package com.poc.kafka.mongo.repository;
import java.util.List;
import com.poc.kafka.mongo.entity.Employee;
public interface EmployeeRepository {
public List<Employee> findAll();
public Employee findById(String id);
public Employee save(Employee e);
public void saveOrUpdate(Employee e);
}
package com.poc.kafka.mongo.repository;
import java.util.List;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Repository;
import com.mongodb.client.result.UpdateResult;
import com.poc.kafka.mongo.entity.Employee;
@Repository
public class EmployeeRepositoryImpl implements EmployeeRepository {
private static final Logger logger = LoggerFactory.getLogger(EmployeeRepositoryImpl.class);
@Autowired
private MongoTemplate mongoTemplate;
@Override
public List<Employee> findAll() {
return mongoTemplate.findAll(Employee.class);
}
@Override
public Employee findById(String id) {
Query query = new Query();
query.addCriteria(Criteria.where("emp_id").is(id));
return mongoTemplate.findOne(query, Employee.class);
}
@Override
public Employee save(Employee e) {
return mongoTemplate.save(e);
}
@Override
public void saveOrUpdate(Employee e) {
Query query = new Query();
query.addCriteria(Criteria.where("emp_id").is(e.getEmpId()));
Document doc = new Document();
mongoTemplate.getConverter().write(e, doc);
Update update = Update.fromDocument(doc);
UpdateResult result=mongoTemplate.upsert(query, update, Employee.class);
result.getMatchedCount();
logger.info("Matched Count:{},{}",result.getMatchedCount(),e.getEmpId());
}
}
package com.poc.kafka.mongo.service;
import java.util.List;
import com.poc.kafka.mongo.model.Emp;
public interface EmployeeService {
public void send(Emp e);
public void receive(String message);
/**
*
* @param empId
* @return
*/
public Emp findByEmpId(Long empId);
/**
*
* @param id
* @return
*/
public Emp findById(String id);
/**
*
* @return
*/
public List<Emp> findAll();
/**
*
* @param emp
* @return
*/
public Emp save(Emp emp);
/**
*
* @param emp
*/
public void saveOrUpdate(Emp emp);
/**
*
* @param sal
* @return
*/
public List<Emp> findBySalary(Double sal);
/**
*
* @param empId
* @return
*/
public Long deleteByEmpId(Long empId);
}
package com.poc.kafka.mongo.service;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.gson.Gson;
import com.poc.kafka.mongo.entity.Employee;
import com.poc.kafka.mongo.mapper.EmployeeMapper;
import com.poc.kafka.mongo.model.Emp;
import com.poc.kafka.mongo.repository.EmployeeMongoRepository;
import com.poc.kafka.mongo.repository.EmployeeRepository;
@Service
public class EmployeeServiceImpl implements EmployeeService {
private static final Logger logger = LoggerFactory.getLogger(EmployeeServiceImpl.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private Gson gson;
@Value(value = "${spring.kafka.topics}")
private String topicName;
@Autowired
private EmployeeMongoRepository employeeMongoRepository;
@Autowired
private EmployeeMapper employeeMapper;
@Autowired
private EmployeeRepository employeeRepository;
@Override
public void send(Emp emp) {
String request = gson.toJson(emp);
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(topicName, request);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
logger.info("Unable to send message=[ {} ] due to : {}", request, ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("Sent message=[ {} ] with offset=[ {} ]", request, result.getRecordMetadata().offset());
}
});
}
@Override
@KafkaListener(topics = { "${spring.kafka.topics}" })
public void receive(String emp) {
logger.info("Kafka event consumed is:{} " , emp);
Emp e = gson.fromJson(emp, Emp.class);
logger.info("converted value:{} " , e.toString());
/**
* To send to mongo database after listening the record from kafka
*/
saveOrUpdate(e);
}
@Override
public Emp findByEmpId(Long empId) {
return (Emp)employeeMapper.mapEntityToPojo(employeeMongoRepository.findByEmpId(empId));
}
@Override
public Emp findById(String id) {
return (Emp)employeeMapper.mapEntityToPojo(employeeMongoRepository.findById(id).orElse(new Employee()));
}
@Override
public List<Emp> findAll() {
List<Employee> list= employeeMongoRepository.findAll();
List<Emp> empList= list.stream()
.map(e->employeeMapper.mapEntityToPojo(e))
.map(e->(Emp)e)
.collect(Collectors.toList());
return empList;
}
@Override
public Emp save(Emp emp) {
Employee employee = (Employee) employeeMapper.mapPojoToEntity(emp);
return (Emp)employeeMapper.mapEntityToPojo(employeeMongoRepository.save(employee));
}
@Override
public void saveOrUpdate(Emp emp) {
Employee employee = (Employee) employeeMapper.mapPojoToEntity(emp);
employeeRepository.saveOrUpdate(employee);
}
@Override
public List<Emp> findBySalary(Double sal) {
List<Emp> empList= null;
try (Stream<Employee> stream = employeeMongoRepository.findBySalary(sal)) {
List<Employee> list = stream.collect(Collectors.toList());
empList= list.stream()
.map(e->(Emp)employeeMapper.mapEntityToPojo(e))
.collect(Collectors.toList());
}
return empList;
}
@Override
public Long deleteByEmpId(Long empId) {
return employeeMongoRepository.deleteByEmployeeId(empId);
}
}
spring.kafka.bootstrap-servers=127.0.0.1:9092
##Producer Serialization:
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.topics=topic_emp
server.port=8083
#consumer properties
#spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=emp_consume_group
#spring.kafka.consumer.enable-auto-commit=false
#spring.kafka.listener.missing-topics-fatal=false
#Mongo DB properties
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=ownDatabase
server.servlet.context-path=/kafka-mongo-poc
logging.level.org.springframework.data.mongodb.core.MongoTemplate=DEBUG
logging.level.org.springframework=INFO
logging.level.root=INFO
\ No newline at end of file
package com.poc.kafka;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class KafkaPocApplicationTests {
@Test
void contextLoads() {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment