Commit 3947ebb4 authored by Lokesh Singh's avatar Lokesh Singh Committed by Lokesh Singh

kafka getting started

parent e27e8326
# Default ignored files
/shelf/
/workspace.xml
kafka-getting-started
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="kafka-getting-started" />
<module name="KafkaGettingStarted" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="JpaBuddyIdeaProjectConfig">
<option name="renamerInitialized" value="true" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_18" default="true" project-jdk-name="openjdk-18" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
<component name="ProjectType">
<option name="id" value="jpab" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.learning.kafka</groupId>
<artifactId>kafka-getting-started</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
## Apache Kafka
Setting up kafka in windows...
first go to the directory where kafka-single-node.yml exists.
execute this command from current directory...
👉`docker-compose -f kafka-single-node.yml up -d
`
this command will pull the zookeeper and kafka. and start the containers of kafka and zookeeper.
To check the containers running or not ... execute this command
👉`docker ps`
![MicrosoftTeams-image (4).png](..%2F..%2F..%2FMicrosoftTeams-image%20%284%29.png)
To shutdown and remove the setup, execute this command in the same directory...
👉`docker-compose -f kafka-single-node.yml down`
Logging into kafka container...
👉`docker exec -it kafka-broker /bin/bash`
![MicrosoftTeams-image (5).png](..%2F..%2F..%2FMicrosoftTeams-image%20%285%29.png)
Now navigate to kafka scripts directory...
👉`cd /opt/bitnami/kafka/bin`
package com.learning.kafkagettingstarted.chapter5;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaSimpleConsumer {
public static void main(String[] args) {
//Setup Properties for consumer
Properties kafkaProps = new Properties();
//List of Kafka brokers to connect to
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
//Deserializer class to convert Keys from Byte Array to String
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//Deserializer class to convert Messages from Byte Array to String
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//Consumer Group ID for this consumer
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"kafka-java-consumer");
//Set to consume from the earliest message, on start when no offset is
//available in Kafka
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
//Create a Consumer
KafkaConsumer<String, String> simpleConsumer =
new KafkaConsumer<String,String>(kafkaProps);
//Subscribe to the kafka.learning.orders topic
simpleConsumer.subscribe(Arrays.asList("kafka.learning.orders"));
//Continuously poll for new messages
while(true) {
//Poll with timeout of 100 milli seconds
ConsumerRecords<String, String> messages =
simpleConsumer.poll(Duration.ofMillis(100));
//Print batch of records consumed
for (ConsumerRecord<String, String> message : messages)
System.out.println("Message fetched : " + message);
}
}
}
package com.learning.kafkagettingstarted.chapter5;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaSimpleProducer {
public static void main(String[] args) {
//Setup Properties for Kafka Producer
Properties kafkaProps = new Properties();
//List of brokers to connect to
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
//Serializer class used to convert Keys to Byte Arrays
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//Serializer class used to convert Messages to Byte Arrays
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//Create a Kafka producer from configuration
KafkaProducer simpleProducer = new KafkaProducer(kafkaProps);
//Publish 10 messages at 2 second intervals, with a random key
try{
int startKey = (new Random()).nextInt(1000) ;
for( int i=startKey; i < startKey + 10; i++) {
//Create a producer Record
ProducerRecord<String,String> kafkaRecord =
new ProducerRecord<String,String>(
"kafka.learning.orders", //Topic name
String.valueOf(i), //Key for the message
"This is order" + i //Message Content
);
System.out.println("Sending Message : "+ kafkaRecord.toString());
//Publish to Kafka
simpleProducer.send(kafkaRecord);
Thread.sleep(2000);
}
}
catch(Exception e) {
}
finally {
simpleProducer.close();
}
}
}
package com.learning.kafkagettingstarted.chapter6;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaUseCaseConsumer {
public static void main(String[] args) {
//Setup Properties for consumer
Properties kafkaProps = new Properties();
//List of Kafka brokers to connect to
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
//Deserializer class to convert Keys from Byte Array to String
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//Deserializer class to convert Messages from Byte Array to String
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//Consumer Group ID for this consumer
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"kafka-java-consumer");
//Set to consume from the earliest message, on start when no offset is
//available in Kafka
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
//Create a Consumer
KafkaConsumer<String, String> simpleConsumer =
new KafkaConsumer<String,String>(kafkaProps);
//Subscribe to the kafka.learning.orders topic
simpleConsumer.subscribe(Arrays.asList("kafka.usecase.students"));
//Continuously poll for new messages
while(true) {
//Poll with timeout of 100 milli seconds
ConsumerRecords<String, String> messages =
simpleConsumer.poll(Duration.ofMillis(100));
//Print batch of records consumed
for (ConsumerRecord<String, String> message : messages)
System.out.println("Message fetched : " + message);
}
}
}
package com.learning.kafkagettingstarted.chapter6;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaUseCaseProducer {
public static void main(String[] args) {
//Setup Properties for Kafka Producer
Properties kafkaProps = new Properties();
//List of brokers to connect to
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
//Serializer class used to convert Keys to Byte Arrays
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//Serializer class used to convert Messages to Byte Arrays
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//Create a Kafka producer from configuration
KafkaProducer simpleProducer = new KafkaProducer(kafkaProps);
//Publish 10 messages at 2 second intervals, with a random key
try{
int startKey = (new Random()).nextInt(1000) ;
for( int i=startKey; i < startKey + 10; i++) {
//Create a producer Record
ProducerRecord<String,String> kafkaRecord =
new ProducerRecord<String,String>(
"kafka.usecase.students", //Topic name
String.valueOf(i), //Key for the message
"This is student " + i //Message Content
);
System.out.println("Sending Message : "+ kafkaRecord.toString());
//Publish to Kafka
simpleProducer.send(kafkaRecord);
Thread.sleep(2000);
}
}
catch(Exception e) {
}
finally {
simpleProducer.close();
}
}
}
1. Please make sure that Docker is already installed in the system,
If not, install from https://www.docker.com/products/docker-desktop
2. If on Windows O/S, open a Powershell window.
If on Mac OS or Linux, open a Terminal window.
3. Navigate to the directory where the exercise files are downloaded.
This directory would contain the kafka-single-node.yml file
4. Execute the following command from this directory
docker-compose -f kafka-single-node.yml up -d
5. Check if the containers are up and running
docker ps
6. To shutdown and remove the setup, execute this command in the same directory
docker-compose -f kafka-single-node.yml down
#
# This file contains the shell commands for the Kafka Essentials : Getting Started Course
#
## Logging into the Kafka Container
docker exec -it kafka-broker /bin/bash
## Navigate to the Kafka Scripts directory
cd /opt/bitnami/kafka/bin
## Creating new Topics
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.learning.tweets \
--partitions 1 \
--replication-factor 1
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.learning.alerts \
--partitions 1 \
--replication-factor 1
kafka v2.2+ use this..
kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1
## Listing Topics
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--list
kafka v2.2+ use this..
kafka-topics.sh --bootstrap-server localhost:9092 --list
## Getting details about a Topic
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--describe
kafka v2.2+
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first_topic
## Publishing Messages to Topics
./kafka-console-producer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.learning.tweets
## Consuming Messages from Topics
./kafka-console-consumer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.learning.tweets \
--from-beginning
## Deleting Topics
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--delete \
--topic kafka.learning.alerts
kafka v2.2+
kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --delete
#Create a Topic with multiple partitions
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.learning.orders \
--partitions 3 \
--replication-factor 1
#Check topic partitioning
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--topic kafka.learning.orders \
--describe
## Publishing Messages to Topics with keys
./kafka-console-producer.sh \
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=:" \
--topic kafka.learning.orders
## Consume messages using a consumer group
./kafka-console-consumer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.learning.orders \
--group test-consumer-group \
--property print.key=true \
--property key.separator=" = " \
--from-beginning
## Check current status of offsets
./kafka-consumer-groups.sh \
--bootstrap-server localhost:29092 \
--describe \
--all-groups
## Creating the Topic
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.usecase.students \
--partitions 2 \
--replication-factor 1
## Describe the Topic
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--topic kafka.usecase.students \
--describe
## Publish to the Topic
./kafka-console-producer.sh \
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=:" \
--topic kafka.usecase.students
## Consume Message from the Topic
./kafka-console-consumer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.usecase.students \
--group usecase-consumer-group \
--property print.key=true \
--property key.separator=" = " \
--from-beginning
\ No newline at end of file
1. Please make sure that Docker is already installed in the system,
If not, install from https://www.docker.com/products/docker-desktop
2. If on Windows O/S, open a Powershell window.
If on Mac OS or Linux, open a Terminal window.
3. Navigate to the directory where the exercise files are downloaded.
This directory would contain the kafka-single-node.yml file
4. Execute the following command from this directory
docker-compose -f kafka-single-node.yml up -d
5. Check if the containers are up and running
docker ps
6. To shutdown and remove the setup, execute this command in the same directory
docker-compose -f kafka-single-node.yml down
#
# This file contains the shell commands for the Kafka Essentials : Getting Started Course
#
## Logging into the Kafka Container
docker exec -it kafka-broker /bin/bash
## Navigate to the Kafka Scripts directory
cd /opt/bitnami/kafka/bin
## Creating new Topics
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.learning.tweets \
--partitions 1 \
--replication-factor 1
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.learning.alerts \
--partitions 1 \
--replication-factor 1
kafka v2.2+ use this..
kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1
## Listing Topics
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--list
kafka v2.2+ use this..
kafka-topics.sh --bootstrap-server localhost:9092 --list
## Getting details about a Topic
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--describe
kafka v2.2+
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first_topic
## Publishing Messages to Topics
./kafka-console-producer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.learning.tweets
## Consuming Messages from Topics
./kafka-console-consumer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.learning.tweets \
--from-beginning
## Deleting Topics
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--delete \
--topic kafka.learning.alerts
kafka v2.2+
kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --delete
#Create a Topic with multiple partitions
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.learning.orders \
--partitions 3 \
--replication-factor 1
#Check topic partitioning
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--topic kafka.learning.orders \
--describe
## Publishing Messages to Topics with keys
./kafka-console-producer.sh \
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=:" \
--topic kafka.learning.orders
## Consume messages using a consumer group
./kafka-console-consumer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.learning.orders \
--group test-consumer-group \
--property print.key=true \
--property key.separator=" = " \
--from-beginning
## Check current status of offsets
./kafka-consumer-groups.sh \
--bootstrap-server localhost:29092 \
--describe \
--all-groups
## Creating the Topic
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.usecase.students \
--partitions 2 \
--replication-factor 1
## Describe the Topic
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--topic kafka.usecase.students \
--describe
## Publish to the Topic
./kafka-console-producer.sh \
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=:" \
--topic kafka.usecase.students
## Consume Message from the Topic
./kafka-console-consumer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.usecase.students \
--group usecase-consumer-group \
--property print.key=true \
--property key.separator=" = " \
--from-beginning
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment