Commit 79a4f57c authored by Mahesh Rohra's avatar Mahesh Rohra

Handled edge case if division kafka is down.

parent 2cf7eeec
...@@ -3,8 +3,10 @@ package com.safeway.epe; ...@@ -3,8 +3,10 @@ package com.safeway.epe;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication @SpringBootApplication
@EnableScheduling
@EnableFeignClients("com.safeway.epe.*") @EnableFeignClients("com.safeway.epe.*")
public class StoreProducerApplication { public class StoreProducerApplication {
......
package com.safeway.epe.repository; package com.safeway.epe.repository;
import com.safeway.epe.domain.TransactionRecorder; import com.safeway.epe.domain.TransactionRecorder;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.UUID; import java.util.UUID;
@Repository @Repository
public interface TransactionRepository extends CrudRepository<TransactionRecorder, UUID> public interface TransactionRepository extends CrudRepository<TransactionRecorder, UUID>
{ {
@Modifying
@Transactional
@Query("update TransactionRecorder TR set TR.isProcessed=:processedFlag WHERE TR.uuid=:transactionId")
int updateProcessedFlagByTransactionId(@Param("processedFlag") boolean processedFlag, @Param("transactionId") UUID transactionId);
List<TransactionRecorder> findByIsProcessed(boolean processedFlag);
} }
...@@ -5,11 +5,13 @@ import com.safeway.epe.domain.ConsumerPayload; ...@@ -5,11 +5,13 @@ import com.safeway.epe.domain.ConsumerPayload;
import com.safeway.epe.domain.Offsets; import com.safeway.epe.domain.Offsets;
import com.safeway.epe.domain.TransactionPayload; import com.safeway.epe.domain.TransactionPayload;
import com.safeway.epe.domain.TransactionRecorder; import com.safeway.epe.domain.TransactionRecorder;
import com.safeway.epe.repository.TransactionRepository;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List; import java.util.List;
@Component @Component
...@@ -23,9 +25,21 @@ public class ProducerService ...@@ -23,9 +25,21 @@ public class ProducerService
@Autowired @Autowired
private ProducerServiceProxy producerServiceProxy; private ProducerServiceProxy producerServiceProxy;
@Autowired
private TransactionRepository transactionRepository;
public List<Offsets> produce(TransactionPayload request, String topic, TransactionRecorder transactionRecorder, boolean isReprocesed) public List<Offsets> produce(TransactionPayload request, String topic, TransactionRecorder transactionRecorder, boolean isReprocesed)
{ {
ResponseEntity<ConsumerPayload> response = producerServiceProxy.produceTransactionService(topic,request); logger.info("TransactionId :" + transactionRecorder.getUuid());
ResponseEntity<ConsumerPayload> response = null;
try
{
response = producerServiceProxy.produceTransactionService(topic, request);
}
catch(Exception e)
{
logger.error("Exception " + e.getMessage());
}
logger.info("response ::" + response); logger.info("response ::" + response);
if(response != null && (response.getStatusCodeValue() == 200 || response.getStatusCode().is2xxSuccessful())) if(response != null && (response.getStatusCodeValue() == 200 || response.getStatusCode().is2xxSuccessful()))
...@@ -35,9 +49,10 @@ public class ProducerService ...@@ -35,9 +49,10 @@ public class ProducerService
} }
else else
{ {
transactionRepository.updateProcessedFlagByTransactionId(false,transactionRecorder.getUuid());
transactionService.errorResponse(transactionRecorder, "failed to insert data to topic", false); transactionService.errorResponse(transactionRecorder, "failed to insert data to topic", false);
logger.info("failed to insert data to topic"); logger.info("failed to insert data to topic");
return response.getBody().getOffsetsList(); return new ArrayList<>();
} }
} }
......
...@@ -3,11 +3,11 @@ package com.safeway.epe.service; ...@@ -3,11 +3,11 @@ package com.safeway.epe.service;
import com.safeway.epe.domain.Offsets; import com.safeway.epe.domain.Offsets;
import com.safeway.epe.domain.TransactionRecorder; import com.safeway.epe.domain.TransactionRecorder;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import java.util.List; import java.util.List;
public interface TransactionService public interface TransactionService
{ {
ResponseEntity<List<Offsets>> produceMessage(String uuid); ResponseEntity<List<Offsets>> produceMessage(String uuid);
ResponseEntity<List<Offsets>> sendData(TransactionRecorder transactionRecorder, boolean isReprocesed);
ResponseEntity<String> errorResponse(TransactionRecorder transactionRecorder, String exceptionMessage, boolean isReprocesed); ResponseEntity<String> errorResponse(TransactionRecorder transactionRecorder, String exceptionMessage, boolean isReprocesed);
} }
...@@ -35,16 +35,22 @@ public class TransactionServiceImpl implements TransactionService ...@@ -35,16 +35,22 @@ public class TransactionServiceImpl implements TransactionService
public ResponseEntity<List<Offsets>> produceMessage(String uuid) { public ResponseEntity<List<Offsets>> produceMessage(String uuid) {
Optional<TransactionRecorder> optionalTransaction = repository.findById(UUID.fromString(uuid)); Optional<TransactionRecorder> optionalTransaction = repository.findById(UUID.fromString(uuid));
if (optionalTransaction.isPresent()) { if (optionalTransaction.isPresent()) {
return sendData(optionalTransaction.get(),false); return sendData(optionalTransaction.get(),true);
} }
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(Arrays.asList(new Offsets())); return ResponseEntity.status(HttpStatus.NOT_FOUND).body(Arrays.asList(new Offsets()));
} }
private ResponseEntity<List<Offsets>> sendData(TransactionRecorder transactionRecorder, boolean isReprocesed) { public ResponseEntity<List<Offsets>> sendData(TransactionRecorder transactionRecorder, boolean isReprocesed) {
Record record = TransactionRecordMapper.INSTANCE.recordConvertion(transactionRecorder.getUuid().toString(), transactionRecorder); Record record = TransactionRecordMapper.INSTANCE.recordConvertion(transactionRecorder.getUuid().toString(), transactionRecorder);
logger.info("flag value :: " +transactionRecorder.isProcessed());
if(!transactionRecorder.isProcessed())
{
logger.info("updating database flag only if flag is false");
repository.updateProcessedFlagByTransactionId(Boolean.TRUE, transactionRecorder.getUuid());
}
TransactionPayload transactionPayload = TransactionPayloadMapper.INSTANCE.transactionPayloadMapper(record); TransactionPayload transactionPayload = TransactionPayloadMapper.INSTANCE.transactionPayloadMapper(record);
List<Offsets> offsetsList = producerService.produce(transactionPayload,configProps.getOfferTransactionRecordTopic(),transactionRecorder,isReprocesed); List<Offsets> offsetsList = producerService.produce(transactionPayload,configProps.getOfferTransactionRecordTopic(),transactionRecorder,isReprocesed);
return ResponseEntity.ok(offsetsList); return ResponseEntity.ok(offsetsList);
} }
public ResponseEntity<String> errorResponse(TransactionRecorder transactionRecorder, String exceptionMessage, boolean isReprocesed) { public ResponseEntity<String> errorResponse(TransactionRecorder transactionRecorder, String exceptionMessage, boolean isReprocesed) {
...@@ -53,6 +59,4 @@ public class TransactionServiceImpl implements TransactionService ...@@ -53,6 +59,4 @@ public class TransactionServiceImpl implements TransactionService
} }
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error"); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error");
} }
} }
package com.safeway.epe.util;
import com.safeway.epe.domain.TransactionRecorder;
import com.safeway.epe.repository.TransactionRepository;
import com.safeway.epe.service.TransactionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Configuration
public class StoreScheduler
{
@Autowired
TransactionRepository transactionRepository;
@Autowired
private TransactionService transactionService;
@Scheduled(fixedRateString = "${offertransaction.reprocess.timedelay}",initialDelay = 77777)
public void processTransactionData(){
List<TransactionRecorder> transactionRecords = transactionRepository.findByIsProcessed(Boolean.FALSE);
if(!CollectionUtils.isEmpty(transactionRecords)){
transactionRecords.forEach(transactionRecord -> {
transactionService.sendData(transactionRecord,true);
});
}
}
}
...@@ -21,3 +21,7 @@ kafka: ...@@ -21,3 +21,7 @@ kafka:
schema: schema:
api: api:
baseurl: http://localhost:8082 baseurl: http://localhost:8082
offertransaction:
reprocess:
timedelay : 10000
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment