Commit 6ddb2d93 authored by Mahesh Rohra's avatar Mahesh Rohra

Store data to Kafka topic

parent 04ee40c4
......@@ -35,6 +35,7 @@ dependencies {
compile (group: 'io.confluent', name: 'kafka-schema-registry', version: '5.4.0'){
exclude group:'org.slf4j', module: 'slf4j-log4j12'
}
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.30'
compile "org.apache.avro:avro:1.9.1"
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '5.4.1-ccs'
compileOnly 'org.projectlombok:lombok'
......
package com.safeway.epe.Config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.boot.web.client.RootUriTemplateHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriTemplateHandler;
@Data
@Configuration
......@@ -10,4 +17,22 @@ import org.springframework.context.annotation.Configuration;
public class ConfigProps {
private String offerTransactionRecordTopic;
@Value("${schema.api.baseurl}")
private String schemaBaseUrl;
private static final String USER_AGENT = "Producer";
private static final String APPLICATION_KAFKA_JSON_VALUE = "application/vnd.kafka.json.v2+json";
private static final String ACCEPT_APPLICATION_KAFKA_JSON_VALUE = "application/vnd.kafka.v2+json";
@Bean
RestTemplate restTemplate(RestTemplateBuilder builder) {
UriTemplateHandler uriTemplateHandler = new RootUriTemplateHandler(schemaBaseUrl);
return builder
.uriTemplateHandler(uriTemplateHandler)
.defaultHeader(HttpHeaders.USER_AGENT, USER_AGENT)
.defaultHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_KAFKA_JSON_VALUE)
.defaultHeader(HttpHeaders.ACCEPT, ACCEPT_APPLICATION_KAFKA_JSON_VALUE)
.build();
}
}
package com.safeway.epe.controller;
import com.safeway.epe.domain.Offsets;
import com.safeway.epe.domain.TransactionRecorder;
import com.safeway.epe.service.TransactionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
......
package com.safeway.epe.repository;
import com.safeway.epe.domain.TransactionRecorder;
import com.safeway.epe.service.TransactionService;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.UUID;
@Repository
......
package com.safeway.epe.service;
import com.safeway.epe.domain.ConsumerPayload;
import com.safeway.epe.domain.Offsets;
import com.safeway.epe.domain.TransactionPayload;
import com.safeway.epe.domain.TransactionRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import org.springframework.web.client.RestTemplate;
import java.util.List;
@Component
public class ProducerService {
public class ProducerService
{
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private TransactionService transactionService;
@Autowired
private RestTemplate restTemplate;
public List<Offsets> produce(TransactionPayload request, String topic, TransactionRecorder transactionRecorder, boolean isReprocesed){
public List<Offsets> produce(TransactionPayload request, String topic, TransactionRecorder transactionRecorder, boolean isReprocesed)
{
String topicUri = "/topics/".concat(topic);
ResponseEntity<ConsumerPayload> response = restTemplate.postForEntity(topicUri, request, ConsumerPayload.class);
logger.info("response ::response ::" + response);
return new ArrayList<>();
if(response != null && (response.getStatusCodeValue() == 200 || response.getStatusCode().is2xxSuccessful()))
{
logger.info("Data is inserted to topic");
return response.getBody().getOffsetsList();
}
else
{
transactionService.errorResponse(transactionRecorder, "failed to insert data to topic", false);
logger.info("failed to insert data to topic");
return response.getBody().getOffsetsList();
}
}
}
package com.safeway.epe.service;
import com.safeway.epe.controller.TransactionController;
import com.safeway.epe.domain.Offsets;
import com.safeway.epe.domain.TransactionRecorder;
import org.springframework.http.ResponseEntity;
......@@ -9,6 +8,6 @@ import java.util.List;
public interface TransactionService
{
ResponseEntity<List<Offsets>> produceMessage(String uuid);
ResponseEntity<String> errorResponse(TransactionRecorder transactionRecorder, java.lang.String exceptionMessage, boolean isReprocesed);
}
package com.safeway.epe.service;
import com.safeway.epe.Config.ConfigProps;
import com.safeway.epe.controller.TransactionController;
import com.safeway.epe.domain.Offsets;
import com.safeway.epe.domain.Record;
import com.safeway.epe.domain.TransactionPayload;
......@@ -9,6 +8,8 @@ import com.safeway.epe.domain.TransactionRecorder;
import com.safeway.epe.mapper.TransactionPayloadMapper;
import com.safeway.epe.mapper.TransactionRecordMapper;
import com.safeway.epe.repository.TransactionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
......@@ -19,6 +20,8 @@ import java.util.*;
@Service
public class TransactionServiceImpl implements TransactionService
{
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
TransactionRepository repository;
......@@ -40,9 +43,15 @@ public class TransactionServiceImpl implements TransactionService
private ResponseEntity<List<Offsets>> sendData(TransactionRecorder transactionRecorder, boolean isReprocesed) {
Record record = TransactionRecordMapper.INSTANCE.recordConvertion(transactionRecorder.getUuid().toString(), transactionRecorder);
TransactionPayload transactionPayload = TransactionPayloadMapper.INSTANCE.transactionPayloadMapper(record);
System.out.println(configProps.getOfferTransactionRecordTopic());
// return ResponseEntity.status(HttpStatus.NOT_FOUND).body(Arrays.asList(new Offsets()));
return ResponseEntity.ok(producerService.produce(transactionPayload,configProps.getOfferTransactionRecordTopic(),transactionRecorder,isReprocesed));
}
public ResponseEntity<String> errorResponse(TransactionRecorder transactionRecorder, String exceptionMessage, boolean isReprocesed) {
if(!isReprocesed){
logger.error("Error response : "+ exceptionMessage);
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error");
}
}
{
"name": "TransactionRecorder1",
"name": "TransactionRecorder",
"type": "record",
"namespace": "com.safeway.epe.domain",
"fields": [
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment