Commit 7bd58b51 authored by uday's avatar uday

used the library for Apache kafka and change the code accordinglly

parent fde0dbec
......@@ -4,32 +4,24 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Configuration
@Component
public class ProducerConfiguration {
public static final Logger logger = LoggerFactory.getLogger(ProducerConfiguration.class);
@Bean
public ProducerFactory<String, String> producerFactory() {
logger.info("Setting producer configuration ");
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.BOOTSTRAP_SERVER);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
@Qualifier("kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate()
{
return new KafkaTemplate<>(producerFactory());
public Properties configuration(){
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , AppConfigs.BOOTSTRAP_SERVER);
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
return properties;
}
}
}
......@@ -3,27 +3,33 @@ package com.safeway.epe.offersdataproducer.producer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.safeway.epe.offersdataproducer.appconfiguration.AppConfigs;
import com.safeway.epe.offersdataproducer.appconfiguration.ProducerConfiguration;
import com.safeway.epe.offersdataproducer.dtomodel.OffersDataDTO;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Future;
@Component
public class OffersDataProducer {
private static final Logger logger = LoggerFactory.getLogger(OffersDataProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private ProducerConfiguration producerConfiguration;
@Autowired
private ObjectMapper objectMapper;
public ListenableFuture<SendResult<String, String>> sendMessage(OffersDataDTO offersDataDTO) throws JsonProcessingException {
logger.info("Start :: OffersDataProducer.sendMessage");
public Future sendEvent(OffersDataDTO offersDataDTO) throws JsonProcessingException {
logger.info("Start :: OffersDataProducer.sendEvent");
String offersDataDTOResp = objectMapper.writeValueAsString(offersDataDTO);
ListenableFuture<SendResult<String, String>> message= kafkaTemplate.send(AppConfigs.TOPIC_NAME, offersDataDTOResp);
logger.info("End :: OffersDataProducer.sendMessage");
return message;
KafkaProducer producer = new KafkaProducer(producerConfiguration.configuration());
ProducerRecord record = new ProducerRecord(AppConfigs.TOPIC_NAME , offersDataDTOResp);
Future<RecordMetadata> future = producer.send(record);
logger.info("End :: OffersDataProducer.sendEvent");
return future;
}
}
......@@ -4,13 +4,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.safeway.epe.offersdataproducer.appconfiguration.AppConfigs;
import com.safeway.epe.offersdataproducer.domodel.OffersDataDO;
import com.safeway.epe.offersdataproducer.service.OffersDataProducerService;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.Future;
@RestController
@RequestMapping(AppConfigs.CLASS_LEVEL_PRODUCER_URL)
public class OffersDataProducerResource {
......@@ -18,8 +19,8 @@ public class OffersDataProducerResource {
private OffersDataProducerService offersDataProducerService;
@PostMapping(AppConfigs.CLASS_LEVEL_PRODUCER_URL)
@ResponseStatus(HttpStatus.CREATED)
public ResponseEntity<ListenableFuture<SendResult<String, String>>> publishOfferEvent(@RequestBody OffersDataDO offersDataDO) throws JsonProcessingException {
ListenableFuture<SendResult<String, String>> offerEventStatus = offersDataProducerService.publishOfferEvent(offersDataDO);
public ResponseEntity<Future<RecordMetadata>> publishOfferEvent(@RequestBody OffersDataDO offersDataDO) throws JsonProcessingException {
Future<RecordMetadata> offerEventStatus = offersDataProducerService.publishOfferEvent(offersDataDO);
return ResponseEntity.status(HttpStatus.CREATED).body(offerEventStatus);
}
}
......@@ -2,9 +2,9 @@ package com.safeway.epe.offersdataproducer.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.safeway.epe.offersdataproducer.domodel.OffersDataDO;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Future;
public interface OffersDataProducerService {
ListenableFuture<SendResult<String, String>> publishOfferEvent(OffersDataDO offersDataDO) throws JsonProcessingException;
public Future publishOfferEvent(OffersDataDO offersDataDO) throws JsonProcessingException;
}
......@@ -8,13 +8,17 @@ import com.safeway.epe.offersdataproducer.producer.OffersDataProducer;
import com.safeway.epe.offersdataproducer.service.OffersDataProducerService;
import com.safeway.epe.offersdataproducer.transformer.OffersDataBoToDTO;
import com.safeway.epe.offersdataproducer.transformer.OffersDataDoToBo;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Future;
@Service
public class OffersDataProducerServiceImpl implements OffersDataProducerService {
public static final Logger logger = LoggerFactory.getLogger(OffersDataProducerServiceImpl.class);
@Autowired
private OffersDataBoToDTO offersDataBoToDTO;
@Autowired
......@@ -22,10 +26,12 @@ public class OffersDataProducerServiceImpl implements OffersDataProducerService
@Autowired
private OffersDataProducer offersDataProducer;
@Override
public ListenableFuture<SendResult<String, String>> publishOfferEvent(OffersDataDO offersDataDO) throws JsonProcessingException {
public Future publishOfferEvent(OffersDataDO offersDataDO) throws JsonProcessingException {
logger.info("Start :: OffersDataProducerServiceImpl.publishOfferEvent");
OffersDataBO offersDataBO = offersDataDoToBo.transerToBo(offersDataDO);
OffersDataDTO offersDataDTO = offersDataBoToDTO.transferToDTO(offersDataBO);
ListenableFuture<SendResult<String, String>> eventStatus = offersDataProducer.sendMessage(offersDataDTO);
return eventStatus;
Future<RecordMetadata> future = offersDataProducer.sendEvent(offersDataDTO);
logger.info("End :: OffersDataProducerServiceImpl.publishOfferEvent");
return future;
}
}
{
"offerId": 135918555,
"offersDTO": [
{
"infoDTO":
{
"offerId": 135918555,
"idDTO":
{
"offerId": 135918555,
"manufacturerId": "MMM"
},
"offerProgramCode": "SC",
"terminalsDTO":
[
"Groceryworks (Safeway.com)",
"Smart Cart (QVS Virtual Terminals)",
"Bakery"
]
},
"rulesDTO":
{
"offerId": 135918555,
"startDate": "2019-07-27T07:00:00.000+00:00",
"endDate": "2021-02-27T06:59:58.000+00:00",
"benefitDTO":
{
"benefitValueType": "A",
"discountDTO":
[
{
"allowNegative": false,
"flexNegative": false,
"discountTierDTO":
[
{
"amount": 6.99,
"upTo": "3",
"itemLimit": 5,
"weightLimit": 2
}
]
}
],
"pointsDTO":
[
{
"generalPoints": 5,
"loyaltyPoints": 250
}
]
}
}
}
]
}
\ No newline at end of file
{ "offerId": 135918555, "offersDTO": [ { "infoDTO": { "offerId": 135918555, "idDTO": { "offerId": 135918555, "manufacturerId": "MMM" }, "offerProgramCode": "SC", "terminalsDTO": [ "Groceryworks (Safeway.com)", "Smart Cart (QVS Virtual Terminals)", "Bakery" ] }, "rulesDTO": { "offerId": 135918555, "startDate": "2019-07-27T07:00:00.000+00:00", "endDate": "2021-02-27T06:59:58.000+00:00", "benefitDTO": { "benefitValueType": "A", "discountDTO": [ { "allowNegative": false, "flexNegative": false, "discountTierDTO": [ { "amount": 6.99, "upTo": "3", "itemLimit": 5, "weightLimit": 2 } ] } ], "pointsDTO": [ { "generalPoints": 5, "loyaltyPoints": 250 } ] } } } ] }
\ No newline at end of file
package com.safeway.epe.offersdataproducer.resource;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.safeway.epe.offersdataproducer.appconfiguration.AppConfigs;
import com.safeway.epe.offersdataproducer.domodel.*;
import com.safeway.epe.offersdataproducer.service.OffersDataProducerService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.skyscreamer.jsonassert.JSONAssert;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import java.util.concurrent.ExecutionException;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) // for restTemplate
@ActiveProfiles("test")
public class OffersDataProducerResourceTest {
@InjectMocks
private OffersDataProducerResource offersDataProducerResource;
@Mock
private OffersDataProducerService offersDataProducerService;
@Autowired
private TestRestTemplate testRestTemplate;
@Test
public void publishOfferEventTest() throws JsonProcessingException, ExecutionException, InterruptedException {
OffersDataDO offersDataDO = buildOffersDataDO();
ObjectMapper mapper = new ObjectMapper();
String excpected = mapper.writeValueAsString(offersDataDO);
SettableListenableFuture future = new SettableListenableFuture();
ProducerRecord<Integer, String> producerRecord = new ProducerRecord(AppConfigs.TOPIC_NAME, null, offersDataDO);
RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition(AppConfigs.TOPIC_NAME, 0),
8, 1, 342, System.currentTimeMillis(), -1, 581);
SendResult<Integer, String> sendResult = new SendResult<Integer, String>(producerRecord, recordMetadata);
future.set(sendResult);
Mockito.when(offersDataProducerService.publishOfferEvent(Mockito.any(OffersDataDO.class)))
.thenReturn(future);
ResponseEntity<ListenableFuture<SendResult<String, String>>> result = offersDataProducerResource.publishOfferEvent(offersDataDO);
SendResult<String, String> sendResult1 = result.getBody().get();
assert sendResult1.getRecordMetadata().partition() == 0;
String actual = mapper.writeValueAsString(result.getBody().get().getProducerRecord().value());
JSONAssert.assertEquals(excpected ,actual ,false);
}
private OffersDataDO buildOffersDataDO() {
DiscountTierDO discountTierDO = DiscountTierDO
.builder().amount(6.99f).upTo("3")
.itemLimit(5).weightLimit(2).build();
PointsDO pointsDO = PointsDO.builder().generalPoints(5).loyaltyPoints(250).build();
DiscountDO discountDO = DiscountDO
.builder().discountTierDO(Lists.newArrayList(discountTierDO))
.allowNegative(false).flexNegative(false).build();
BenefitDO benefitDO = BenefitDO
.builder().discountDO(Lists.newArrayList(discountDO))
.pointsDO(Lists.newArrayList(pointsDO)).benefitValueType("A")
.build();
IdDO idDO = IdDO.builder().manufacturerId("MMM").offerId(135918444).build();
InfoDO infoDO = InfoDO
.builder().offerId(135918444).idDO(idDO).offerProgramCode("SC")
.terminalsDO(Lists.newArrayList("Groceryworks (Safeway.com)",
"Smart Cart (QVS Virtual Terminals)",
"Bakery")).build();
RulesDO rulesDO = RulesDO.builder().offerId(135918444).benefitDO(benefitDO).endDate("2021-02-27T06:59:58.000+00:00")
.startDate("2019-07-27T07:00:00.000+00:00").build();
OffersDO offersDO = OffersDO.builder().infoDO(infoDO).rulesDO(rulesDO).build();
OffersDataDO offersDataDO = OffersDataDO.builder().offerId(135918444).offersDO(Lists.newArrayList(offersDO)).build();
return offersDataDO;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment