first commit

parents
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>kafka-training</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/test/java=UTF-8
encoding/<project>=UTF-8
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.nisum.kafka</groupId>
<artifactId>kafka-training</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-training</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
package com.nisum.kafka.kafka_training;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}
package com.nisum.kafka.kafka_training;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ConsumerDemo {
public static void main(String[] args) {
// TODO Auto-generated method stub
//System.out.println("Hello World");
//create consumer configs
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"My_first_application");
prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//create consumer and subscribe our consumer to topic
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("second_topic"));
//consumer.subscribe(Arrays.asList("first_topic","second_topic"));
//poll for new data
while(true){
ConsumerRecords<String,String> reoords= consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord record:reoords){
System.out.println("Key: " + record.key()+"value" +record.value());
System.out.println("Partition:" + record.partition()+"offset"+record.offset());
}
}
}
}
package com.nisum.kafka.kafka_training;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerDemoAssignSeek {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ConsumerDemoAssignSeek.class.getName());
// TODO Auto-generated method stub
//System.out.println("Hello World");
//create consumer configs
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
// prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"My_first_application");
prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//create consumer and subscribe our consumer to topic
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
//consumer.subscribe(Collections.singleton("second_topic"));
//Assign and seek are mostly used replay data or fetch a specific message
TopicPartition partitionToReadFrom = new TopicPartition("second_topic",0);
consumer.assign(Arrays.asList(partitionToReadFrom));
//consumer.subscribe(Arrays.asList("first_topic","second_topic"));
//seek
long offsetToReadFrom =15l;
consumer.seek(partitionToReadFrom, offsetToReadFrom);
int numberofMessagesToRead = 5;
boolean keepOnReading=true;
int numberOfMessagesReadsoFar = 0;
//poll for new data
while(keepOnReading){
ConsumerRecords<String,String> reoords= consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord record:reoords){
numberOfMessagesReadsoFar+=1;
logger.info("Key: " + record.key()+"value" +record.value());
logger.info("Partition:" + record.partition()+"offset"+record.offset());
if(numberOfMessagesReadsoFar>= numberofMessagesToRead) {
keepOnReading=false;
break;
}
}
}
}
}
package com.nisum.kafka.kafka_training;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ConsumerDemoGroups {
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"My_first_application");
prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
}
}
package com.nisum.kafka.kafka_training;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerDemoWithThread {
private Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());
public static void main(String[] args) {
// TODO Auto-generated method stub
//System.out.println("Hello World");
//create consumer configs
new ConsumerDemoWithThread().run();
}
private ConsumerDemoWithThread() {
}
private void run() {
String bootstrapServers ="localhost:9092";
String groupId = "my_third_application";
String topic = "second_topic";
CountDownLatch latch = new CountDownLatch(1);
logger.info("creating consumer");
Runnable myConsumer = new ConsumerThread(latch, bootstrapServers, groupId, topic);
Thread mythread = new Thread(myConsumer);
mythread.start();
//add shutdown
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
logger.info("caught shutdown hook");
((ConsumerThread) myConsumer).shutdown();
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.info("Application exited");
}
));
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
logger.error("Application got interrupted",e);
e.printStackTrace();
}finally {
logger.info("Application is closing ");
}
}
public class ConsumerThread implements Runnable{
private Logger logger = LoggerFactory.getLogger(ConsumerThread.class.getName());
private CountDownLatch latch;
private KafkaConsumer<String,String> consumer;
public ConsumerThread(CountDownLatch latch,String bootstrapServers,String groupId,String topic) {
this.latch=latch;
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
consumer=new KafkaConsumer<String,String>(prop);
consumer.subscribe(Collections.singleton(topic));
}
public void run() {
try {
while(true){
ConsumerRecords<String,String> reoords= consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord record:reoords){
logger.info("Key: " + record.key()+"value" +record.value());
logger.info("Partition:" + record.partition()+"offset"+record.offset());
}
}
}catch(WakeupException we) {
logger.info("Receivied shutdown singnal");
}finally {
consumer.close();
latch.countDown();//tell main method we are done with consumer thread
}
}
public void shutdown() {
//wakeup menthod is a special mehtod to interrup consumer.poll()
//it will throw the exception called wakeup
consumer.wakeup();
}
}
}
package com.nisum.kafka.kafka_training;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class ProducerDemo {
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties prop = new Properties();
prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
ProducerRecord<String,String> record = new ProducerRecord<String,String>("first_topic","Hello World");
producer.send(record);
producer.flush();
producer.close();
}
}
package com.nisum.kafka.kafka_training;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public class ProducerDemoWithCallback {
public static void main(String[] args) {
// TODO Auto-generated method stub
{
// final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);
//System.out.println("Hello World");
//String bootStrapProp
Properties prop = new Properties();
prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
for(int i=0;i <10 ;i ++) {
String topic ="first_topic";
String value = "Hello World"+i;
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,value);
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("second_topic", "Hello World"+i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("Received meta data \n" + "Topic:" + recordMetadata.topic() +"\n"+
"partition:" + recordMetadata.partition() + "\n" +
"offsets" + recordMetadata.offset());
} else {
System.out.println("Error while producing" + e);
}
}
});
}
producer.flush();
producer.close();
}
}
}
package com.nisum.kafka.kafka_training;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemoWithKeys {
public static void main(String[] args) {
// final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);
//System.out.println("Hello World");
//String bootStrapProp
final Logger logger = LoggerFactory.getLogger(ProducerDemoWithKeys.class);
Properties prop = new Properties();
prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
for(int i=0;i <10 ;i ++) {
String topic ="second_topic";
String value = "Hello World"+i;
String key = "id_" + i;
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,value,key);//// The same key message will go to same partiotion always
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("second_topic", "Hello World"+i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
logger.info("Received meta data \n" + "Topic:" + recordMetadata.topic() +"\n"+
"partition:" + recordMetadata.partition() + "\n" +
"offsets" + recordMetadata.offset()+"\n"
);
} else {
System.out.println("Error while producing" + e);
logger.error("Error while producing" , e);
}
}
});
}
producer.flush();
producer.close();
}
}
package com.nisum.kafka.kafka_training;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}
/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
}
Manifest-Version: 1.0
Built-By: srikkala
Build-Jdk: 1.8.0_251
Created-By: Maven Integration for Eclipse
#Generated by Maven Integration for Eclipse
#Thu Apr 08 11:18:15 IST 2021
version=0.0.1-SNAPSHOT
groupId=com.nisum.kafka
m2e.projectName=kafka-training
m2e.projectLocation=D\:\\Work_space\\kafka-training
artifactId=kafka-training
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.nisum.kafka</groupId>
<artifactId>kafka-training</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-training</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment