Producer with Callback and Keys

parent 62242883
...@@ -24,7 +24,7 @@ public class ProducerDemo { ...@@ -24,7 +24,7 @@ public class ProducerDemo {
// Create a Producer Record // Create a Producer Record
ProducerRecord<String, String> record = ProducerRecord<String, String> record =
new ProducerRecord<String, String>("first_topic", "Hi, Welcome to Producer"); new ProducerRecord<String, String>("my_topic1", "Hi, Welcome to Producer");
// Send Data - Asyncronous // Send Data - Asyncronous
producer.send(record); producer.send(record);
......
package com.github.nagarjun.kafka.demo1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerDemoKeys {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Logger logger = LoggerFactory.getLogger(ProducerDemoKeys.class);
//Create Producer Properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create Producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
String topic = "my_topic1";
String value = "Welcome to Producer. Message : " + Integer.toString(i);
String key = "ID_"+Integer.toString(i);
logger.info("Key :"+key);
// Create a Producer Record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(topic,key,value);
// Send Data - Asyncronous
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//executes every time a record sent successfully or an exception is thrown
if (e == null) {
logger.info("Recieved new Metadata . \n" +
"Topic :" + recordMetadata.topic() + "\n" +
"Partition :" + recordMetadata.partition() + "\n" +
"Offset :" + recordMetadata.offset() + "\n" +
"Timestamp :" + recordMetadata.timestamp());
} else {
logger.error("Error while sending the data ", e);
}
}
}).get(); // Synchronous
}
//Flush Data and Close Producer
producer.flush();
producer.close();
}
}
package com.github.nagarjun.kafka.demo1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerDemoWithCallback {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);
//Create Producer Properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create Producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
// Create a Producer Record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("my_topic1", "Hi, Welcome to Producer. Message : " + Integer.toString(i));
// Send Data - Asyncronous
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//executes every time a record sent successfully or an exception is thrown
if (e == null) {
logger.info("Recieved new Metadata . \n" +
"Topic :" + recordMetadata.topic() + "\n" +
"Partition :" + recordMetadata.partition() + "\n" +
"Offset :" + recordMetadata.offset() + "\n" +
"Timestamp :" + recordMetadata.timestamp());
} else {
logger.error("Error while sending the data ", e);
}
}
});
}
//Flush Data and Close Producer
producer.flush();
producer.close();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment