Commit 08f52363 authored by uday's avatar uday

added repry properties for producer

parent 89898e06
......@@ -23,4 +23,8 @@ public class KafkaProperties {
private String keyDeSerializer;
private String valueDeSerializer;
private String groupId;
private String requestTimeoutMs;
private String requestTimeOut;
private String deliveryTimeOut;
private String requestPerConnection;
}
......@@ -16,11 +16,15 @@ import java.util.Properties;
public interface PropertiesConfiguration {
public static <T> KafkaProducer<String, T> getProperties(KafkaProperties kafkaProperties) {
final Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProperties.getKeySerializer());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProperties.getValueSerializer());
properties.setProperty(ProducerConfig.ACKS_CONFIG, kafkaProperties.getAcks());
properties.setProperty(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getRetries());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProperties.getKeySerializer());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProperties.getValueSerializer());
properties.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getAcks());
properties.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getRetries());
properties.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getAcks());
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, kafkaProperties.getRequestPerConnection());
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1000);
properties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProperties.getDeliveryTimeOut());
return new KafkaProducer<String, T>(properties);
}
public static <T> KafkaConsumer<String, T> consumerProperties(KafkaProperties kafkaConsumerProperties) {
......
......@@ -4,6 +4,7 @@ import com.safeway.epe.avrokafkawithoutschemaregistry.configuration.KafkaPropert
import com.safeway.epe.avrokafkawithoutschemaregistry.promotion.GeneratePromotion;
import com.safeway.epe.model.Promotion;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.producer.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -21,15 +22,23 @@ public class PromotionProducer {
private KafkaProperties kafkaProperties;
public Future<RecordMetadata> sendPromotionEvent(KafkaProducer<String, Promotion> producer) throws IOException, ExecutionException, InterruptedException {
Promotion promotion = GeneratePromotion.generatedPromotion();
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, Promotion>(kafkaProperties.getTopic(), promotion));
RecordMetadata metadata = future.get();
if (metadata.hasOffset() && metadata.hasTimestamp()) {
log.info("partition::" + metadata.partition() + "\t Offset::" + metadata.offset() + "\t timestamp::" + convertTimestampToDateTime(metadata.timestamp()));
} else {
throw new RuntimeException("Exception ocured");
int setCount =0 ;
Future<RecordMetadata> future = null;
while(true) {
Promotion promotion = GeneratePromotion.generatedPromotion();
try {
future = producer.send(new ProducerRecord<String, Promotion>(kafkaProperties.getTopic(), promotion));
RecordMetadata metadata = future.get();
if (metadata.hasOffset() && metadata.hasTimestamp()) {
log.info("partition::" + metadata.partition() + "\t Offset::" + metadata.offset() + "\t timestamp::" + convertTimestampToDateTime(metadata.timestamp()));
} else {
throw new RuntimeException("Exception ::");
}
//producer.close();
} catch (Exception e) {
log.error("Broker is down");
}
return future;
}
producer.close();
return future;
}
}
......@@ -9,8 +9,13 @@ kafka.producer.bootStrapServers: http://127.0.0.1:9092
kafka.producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
kafka.producer.valueSerializer: com.safeway.epe.avrokafkawithoutschemaregistry.serializer.KafkaAvroSerialize
kafka.producer.acks: all
kafka.producer.retries: 0
kafka.producer.retries: 12
kafka.producer.topic: promotion_spark1
kafka.producer.requestTimeoutMs:2000
kafka.producer.retriesValue:0x7fffffff
kafka.produer.requestTimeOut:2000
kafka.producer.deliveryTimeOut:10000
kafka.producer.requestPerConnection:1
#Kafka Consumer Properties
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment