Commit ecf69caa authored by Vishal Vaddadhi's avatar Vishal Vaddadhi

kafka setup, readme added

parent 8284503d
Pipeline #1650 failed with stage
in 40 seconds
......@@ -14,6 +14,7 @@ target/
.sts4-cache
*.properties
*.yml
### IntelliJ IDEA ###
.idea
......
# Foobar
This is the backend for the order-management project.
## Installation
Clone this project and CD into the directory and open it up with your favorite editor or IDE of choice.
You are also going to want to install Apache Kafka and get the server running on your local machine before running the backend application.
To install the Apache Kafka , go to this [link](https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz) and download the tar.
You then want to unzip the tar file and put it in whatever directory is easiest for you to access.
## Usage
You want to now open up two separate terminals for running the zookeeper and the kafka server. Once you have two terminals opened up CD into the apache kafka directory in both terminals and run these commands in order:
```
bin/zookeeper-server-start.sh config/zookeeper.properties
```
Now in the other terminal you want to run this command
```
bin/kafka-server-start.sh config/server.properties
```
Once you have both of these commands running you can now go into your IDE of choice or editor, and run the backend java application. Once the application is running you want to send either a curl request with a query param of "message=somethinghere" or use a service like postman and have a query params in there. The route you are going to want to hit is http://localhost:8080/kafka/publish.
Once you hit the route mentioned above with the query param, go back to your IDE and you should see producer sending a message and consumer consuming that message.
......@@ -25,6 +25,21 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-dist -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-dist</artifactId>
<version>2.7.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
......
package com.afp.ordermanagement.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactoryString() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
package com.afp.ordermanagement.controller;
import com.afp.ordermanagement.service.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
public KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam String message) {
producer.sendMessage(message);
}
}
package com.afp.ordermanagement.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(topics = "managers", groupId = "group_id")
public void consumerManager(String message){
logger.info(String.format("#### -> Consumed message -> %s", message));
}
}
package com.afp.ordermanagement.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.kafka.core.KafkaTemplate;
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String MANAGER_TOPIC = "managers";
private static final String ORDER_TOPIC = "orders";
private KafkaTemplate<String, String> kafkaTemplate;
public Producer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
logger.info(String.format("#### -> Producing message: %s", message));
kafkaTemplate.send(MANAGER_TOPIC, message);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment