Refactor the code and package structure

parent 9c7af755
package com.egen.loyaltyeventsproducer.IService;
public interface IKafkaProducerService {
public void generateKafkaMessages();
}
package com.egen.loyaltyeventsproducer; package com.egen.loyaltyeventsproducer;
import com.egen.loyaltyeventsproducer.config.KafkaProducerConfig;
import com.egen.loyaltyeventsproducer.producer.LoyaltyRewardsProducer; import com.egen.loyaltyeventsproducer.producer.LoyaltyRewardsProducer;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.support.serializer.JsonSerializer;
...@@ -18,13 +21,29 @@ import java.util.TimerTask; ...@@ -18,13 +21,29 @@ import java.util.TimerTask;
@SpringBootApplication @SpringBootApplication
public class LoyaltyEventsProducerApplication { public class LoyaltyEventsProducerApplication {
private static final String TOPIC = "loyalty-rewards-info"; public static void main(String[] args) {
SpringApplication.run(LoyaltyEventsProducerApplication.class, args);
/*LoyaltyRewardsProducer producer = new LoyaltyRewardsProducer();
kafkaSender = getKafkaSender();
//System.out.println("Kafka Props :"+kafkaProducerConfig.getBootstrapper());
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
producer.generateMessages(kafkaSender, TOPIC, 1);
}
}, 0, 4000);*/
}
/*private static final String TOPIC = "loyalty-rewards-info";
private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String CLIENT_ID_CONFIG = "rewards-generator"; private static final String CLIENT_ID_CONFIG = "rewards-generator";
private static KafkaSender<String, JsonNode> kafkaSender; private static KafkaSender<String, JsonNode> kafkaSender;
public static reactor.kafka.sender.KafkaSender<String, JsonNode> getKafkaSender(){ private static KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();
public static reactor.kafka.sender.KafkaSender<String, JsonNode> getKafkaSender(){
System.out.println(kafkaProducerConfig.getBootstrapper());
Map<String, Object> props = new HashMap<>(); Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG); props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG);
...@@ -33,18 +52,7 @@ public class LoyaltyEventsProducerApplication { ...@@ -33,18 +52,7 @@ public class LoyaltyEventsProducerApplication {
SenderOptions<String, JsonNode> senderOptions = SenderOptions.create(props); SenderOptions<String, JsonNode> senderOptions = SenderOptions.create(props);
return reactor.kafka.sender.KafkaSender.create(senderOptions); return reactor.kafka.sender.KafkaSender.create(senderOptions);
} }*/
public static void main(String[] args) {
SpringApplication.run(LoyaltyEventsProducerApplication.class, args);
LoyaltyRewardsProducer producer = new LoyaltyRewardsProducer();
kafkaSender = getKafkaSender();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
producer.generateMessages(kafkaSender, TOPIC, 1);
}
}, 0, 4000);
}
} }
package com.egen.loyaltyeventsproducer.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducerConfig {
@Value("${bootstrap.server}")
private String bootstrapper;
@Value("${kafka.clientId}")
private String clientId;
public String getBootstrapper() {
return bootstrapper;
}
public void setBootstrapper(String bootstrapper) {
this.bootstrapper = bootstrapper;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
}
package com.egen.loyaltyeventsproducer.controller;
import com.egen.loyaltyeventsproducer.config.KafkaProducerConfig;
import com.egen.loyaltyeventsproducer.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/v1/loyalty/kafka")
public class KafkaProducerController {
@Autowired
private KafkaProducerService kafkaProducerService;
@GetMapping("/producer")
public void generateKafkaMessages()
{
kafkaProducerService.generateKafkaMessages();
}
}
package com.egen.loyaltyeventsproducer.service;
import com.egen.loyaltyeventsproducer.IService.IKafkaProducerService;
import com.egen.loyaltyeventsproducer.config.KafkaProducerConfig;
import com.egen.loyaltyeventsproducer.producer.LoyaltyRewardsProducer;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Service;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@Service
public class KafkaProducerService implements IKafkaProducerService {
private static final String TOPIC = "loyalty-rewards-info";
private static KafkaSender<String, JsonNode> kafkaSender;
@Autowired
private KafkaProducerConfig KafkaProducerConfig;
public reactor.kafka.sender.KafkaSender<String, JsonNode> getKafkaSender(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProducerConfig.getBootstrapper());
props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaProducerConfig.getClientId());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
SenderOptions<String, JsonNode> senderOptions = SenderOptions.create(props);
return reactor.kafka.sender.KafkaSender.create(senderOptions);
}
@Override
public void generateKafkaMessages() {
kafkaSender = getKafkaSender();
LoyaltyRewardsProducer producer = new LoyaltyRewardsProducer();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
producer.generateMessages(kafkaSender, TOPIC, 1);
}
}, 0, 4000);
}
}
bootstrap.server=localhost:9092
kafka.clientId=rewards-generator
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment