Integrated kafka susbscriber changes

parent 34afbb79
......@@ -26,6 +26,17 @@
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
......
package com.nisum.abs.loyaltyService.controller;
import com.fasterxml.jackson.databind.JsonNode;
import com.nisum.abs.loyaltyService.dto.LoyaltyDto;
import com.nisum.abs.loyaltyService.service.LoyaltyService;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.internals.ConsumerFactory;
import reactor.kafka.receiver.internals.DefaultKafkaReceiver;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
@RequestMapping("loyalty")
public class LoyaltyController {
private static final String LOYALTY_REWARDS_TOPIC = "loyalty-rewards-info";
@Autowired
private LoyaltyService loyaltyService;
......@@ -46,4 +62,37 @@ public class LoyaltyController {
}
@GetMapping(value = "/loyaltyRewards", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<JsonNode> getOrdersEventsFlux(@RequestParam(name = "loyaltyId") String loyaltyId){
Map<String, Object> propsMaps = this.kafkaReceiverConfigurations(loyaltyId);
DefaultKafkaReceiver<String, JsonNode> kafkaReceiver =
new DefaultKafkaReceiver(ConsumerFactory.INSTANCE, ReceiverOptions.create(propsMaps).subscription(Collections.singleton(LOYALTY_REWARDS_TOPIC)));
Flux<ReceiverRecord<String, JsonNode>> kafkaFlux = kafkaReceiver.receive();
return kafkaFlux
.filter(receivedRecord -> {
receivedRecord.receiverOffset().acknowledge();
return receivedRecord.value().get("loyaltyId").asText().equals(loyaltyId);
})
.map(ReceiverRecord::value).log();
}
private Map<String, Object> kafkaReceiverConfigurations(String id){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "loyalty-consumer-"+id+"-"+ UUID.randomUUID());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, id);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return props;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment