chore: initial commit.

parents
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
.mvn/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/
# Order Processing API
1. order-processing-api will consume order information from kafka topic using reactive kafka connector and process the received record
2. This API using the conversion of Dto to Collection and vice-versa using @mapStruct API dependency
3. This API using reactiveOperations framework instead of repository enablers for each mongo db collection.
4. This API is demonstrating CRUD operations of Order information.
Technologies Used :
1. Spring boot
2. Java8
3. Reactive Kafka
4. Reactive Mongo
5. Spring Webflux
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.BUILD-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.nisum</groupId>
<artifactId>order-processing-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>order-processing-api</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor.kafka/reactor-kafka -->
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>1.5.3.Final</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.0.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<annotationProcessorPaths>
<path>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>1.5.3.Final</version>
</path>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</path>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok-mapstruct-binding</artifactId>
<version>0.2.0</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.nisum.orderprocessingapi;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import java.util.HashMap;
import java.util.Map;
@SpringBootApplication
public class OrderProcessingApplication {
private static final String TOPIC = "test-order";
private static final String BOOTSTRAP_SERVERS = "HYD-LAP-00484.corp.nisum.com:9092";
private static final String CLIENT_ID_CONFIG = "trans-string-consumer-test-order";
public static void main(String[] args) {
SpringApplication.run(OrderProcessingApplication.class, args);
}
@Bean
public KafkaSender<String, String> kafkaSender()
{
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
return KafkaSender.create(senderOptions);
}
@Bean
public Gson gson ()
{
return new Gson();
}
}
package com.nisum.orderprocessingapi.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.internals.ConsumerFactory;
import reactor.kafka.receiver.internals.DefaultKafkaReceiver;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguaration {
private static final String TOPIC = "test-order";
private static final String BOOTSTRAP_SERVERS = "HYD-LAP-00484.corp.nisum.com:9092";
private static final String CLIENT_ID_CONFIG = "trans-string-consumer-test-order";
private static final String GROUP_ID_CONFIG = "trans-string-cg-test-order";
@Bean
public KafkaReceiver kafkaReceiver(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaReceiver(ConsumerFactory.INSTANCE, ReceiverOptions.create(props).subscription(Collections.singleton(TOPIC)));
}
}
package com.nisum.orderprocessingapi.config;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
@Configuration
@EnableReactiveMongoRepositories(basePackages = "com.nisum.orderprocessingapi")
public class ReactiveMongoConfig extends AbstractReactiveMongoConfiguration{
@Override
public MongoClient reactiveMongoClient() {
return MongoClients.create();
}
@Override
protected String getDatabaseName() {
return "orderdb";
}
}
\ No newline at end of file
package com.nisum.orderprocessingapi.controller;
import com.google.gson.Gson;
import com.nisum.orderprocessingapi.dto.IdAndOrderIdMapper;
import com.nisum.orderprocessingapi.dto.OrderCollectionDto;
import com.nisum.orderprocessingapi.dto.OrderDto;
import com.nisum.orderprocessingapi.dto.OrderResponseDto;
import com.nisum.orderprocessingapi.producer.OrderEventGenerator;
import com.nisum.orderprocessingapi.service.OrderProcessingService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
@Slf4j
@RestController
@RequestMapping(path = "/order")
public class OrderController {
@Autowired
private KafkaReceiver<String,String> kafkaReceiver;
@Autowired
private OrderEventGenerator orderEventGenerator;
@Autowired
private OrderProcessingService orderProcessingService;
@Autowired
private Gson gson;
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux getAllOrderEvents(){
Flux<ReceiverRecord<String,String>> kafkaFlux = kafkaReceiver.receive();
return kafkaFlux.checkpoint("Messages are started being consumed").log()
.doOnNext(order->
{
String key = order.key();
OrderDto orderDto = gson.fromJson(order.value(), OrderDto.class);
orderProcessingService.save(orderDto,key).onErrorResume(o->{
log.info("Unable to save document in mongo DB {} ",o);
order.receiverOffset().acknowledge();
return Mono.empty();
}).subscribe(orderStatus -> {
log.info("Mongo DB : Order Key : {} saved status {}", key,orderStatus);
order.receiverOffset().acknowledge();
});
}).map(ReceiverRecord::value)
.checkpoint("Messages are done consumed");
}
@PostMapping(produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<OrderResponseDto> createOrderEventInKafka(@RequestBody OrderCollectionDto orderCollectionDto) throws InterruptedException {
log.info("orderCollection Received : {}",orderCollectionDto);
return orderEventGenerator.publishOrderEvent("test-order",orderCollectionDto);
}
@PostMapping(path = "/create",produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<IdAndOrderIdMapper> createOrderEvent(@RequestBody OrderCollectionDto orderCollectionDto) throws InterruptedException {
log.info("orderCollection Received : {}",orderCollectionDto);
return orderProcessingService.saveAll(orderCollectionDto);
}
@PutMapping(path = "/update/{id}",produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<OrderDto> updateOrderEvent(@PathVariable String id, @RequestBody OrderDto orderDto) throws InterruptedException {
log.info("orderCollection Received for update : {}",orderDto);
return orderProcessingService.update(orderDto);
}
@DeleteMapping("/delete/{id}")
public Mono<Boolean> deleteOrderEvent(@PathVariable String id) throws InterruptedException {
log.info("Order id Received for deletion : {}",id);
return orderProcessingService.delete(id);
}
@GetMapping("/get/{id}")
public Mono<OrderDto> readOrderEvent(@PathVariable String id) throws InterruptedException {
log.info("Order id Received for retrieval : {}",id);
return orderProcessingService.read(id);
}
}
package com.nisum.orderprocessingapi.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class BuyerInfoDto {
private String id;
private String buyerEmail;
private String buyerName;
private BuyerTaxInfoDto buyerTaxInfo;
private String purchaseOrderNumber;
}
package com.nisum.orderprocessingapi.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class BuyerTaxInfoDto {
private String id;
private String companyLegalName;
}
package com.nisum.orderprocessingapi.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class IdAndOrderIdMapper {
private String id;
private String orderId;
}
package com.nisum.orderprocessingapi.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OrderCollectionDto{
private List<OrderDto> ordersDto;
}
package com.nisum.orderprocessingapi.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class OrderDto {
private String id;
private String orderId;
private String purchaseDate;
private String lastUpdateDate;
private String orderStatus;
private String fulfillmentChannel;
private Integer numberOfItemsShipped;
private Integer numberOfItemsUnshipped;
private String paymentMethod;
private List<String> paymentMethodDetails;
private String marketplaceId;
private String shipmentServiceLevelCategory;
private String orderType;
private String earliestShipDate;
private String latestShipDate;
private ShippingAddressDto shippingAddress;
private BuyerInfoDto buyerInfo;
}
\ No newline at end of file
package com.nisum.orderprocessingapi.dto;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OrderResponseDto {
private String topicName;
@JsonIgnore
private List<IdAndOrderIdMapper> orderKey;
}
package com.nisum.orderprocessingapi.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ShippingAddressDto {
private String id;
private String name;
private String addressLine1;
private String city;
private String stateOrRegion;
private String postalCode;
private String countryCode;
}
package com.nisum.orderprocessingapi.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document("buyer_info")
public class BuyerInfo {
@Id
private String id;
private String buyerEmail;
private String buyerName;
private String purchaseOrderNumber;
}
package com.nisum.orderprocessingapi.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document("buyer_tax_info")
public class BuyerTaxInfo {
@Id
private String id;
private String companyLegalName;
}
package com.nisum.orderprocessingapi.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.List;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document("order")
public class Order {
@Id
private String id;
private String orderId;
private String purchaseDate;
private String lastUpdateDate;
private String orderStatus;
private String fulfillmentChannel;
private Integer numberOfItemsShipped;
private Integer numberOfItemsUnshipped;
private String paymentMethod;
private List<String> paymentMethodDetails;
private String marketplaceId;
private String shipmentServiceLevelCategory;
private String orderType;
private String earliestShipDate;
private String latestShipDate;
}
\ No newline at end of file
package com.nisum.orderprocessingapi.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document("shipping_address")
public class ShippingAddress {
@Id
private String id;
private String name;
private String addressLine1;
private String city;
private String stateOrRegion;
private String postalCode;
private String countryCode;
}
package com.nisum.orderprocessingapi.producer;
import com.google.gson.Gson;
import com.nisum.orderprocessingapi.dto.IdAndOrderIdMapper;
import com.nisum.orderprocessingapi.dto.OrderCollectionDto;
import com.nisum.orderprocessingapi.dto.OrderResponseDto;
import com.nisum.orderprocessingapi.service.OrderProcessingService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@Component
@Slf4j
public class OrderEventGenerator {
@Autowired
private KafkaSender<String, String> kafkaSender;
@Autowired
private OrderProcessingService orderProcessingService;
@Autowired
private Gson gson;
public Flux<OrderResponseDto> publishOrderEvent(String topic, OrderCollectionDto orderCollectionDto) throws InterruptedException {
SimpleDateFormat dateFormat = new SimpleDateFormat("HHmmssSSSzddMMyyyy");
OrderResponseDto response= new OrderResponseDto();
response.setTopicName(topic);
List<IdAndOrderIdMapper> idAndOrderIdMappers=orderCollectionDto.getOrdersDto().stream().map(orderDto -> {
String uuid= UUID.randomUUID().toString();
String orderId= orderDto.getOrderId();
String orderString=gson.toJson(orderDto);
log.info("Order Information: {} ",orderString);
kafkaSender.send(Flux.just(
SenderRecord.create(new ProducerRecord<>(topic, uuid, orderString),uuid)))
.doOnError(e -> log.error("Send failed", e))
.subscribe(r -> {
RecordMetadata metadata = r.recordMetadata();
System.out.printf("Message %s sent successfully, topic-partition=%s-%d offset=%d timestamp=%s%n",
r.correlationMetadata(),
metadata.topic(),
metadata.partition(),
metadata.offset(),
dateFormat.format(new Date(metadata.timestamp())));
});
return IdAndOrderIdMapper.builder().orderId(orderId).id(uuid).build();
}).collect(Collectors.toList());
response.setOrderKey(idAndOrderIdMappers);
return Flux.just(response);
}
}
package com.nisum.orderprocessingapi.repository;
import com.nisum.orderprocessingapi.dto.BuyerInfoDto;
import com.nisum.orderprocessingapi.dto.BuyerTaxInfoDto;
import com.nisum.orderprocessingapi.dto.OrderDto;
import com.nisum.orderprocessingapi.dto.ShippingAddressDto;
import com.nisum.orderprocessingapi.entity.BuyerInfo;
import com.nisum.orderprocessingapi.entity.BuyerTaxInfo;
import com.nisum.orderprocessingapi.entity.Order;
import com.nisum.orderprocessingapi.entity.ShippingAddress;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.factory.Mappers;
@Mapper
public interface OrderMapper {
OrderMapper INSTANCE = Mappers.getMapper(OrderMapper.class);
OrderDto orderEntityToDto(Order order);
@Mapping(target = "id", source = "orderKey")
Order orderDtoToEntity(OrderDto orderDto,String orderKey);
ShippingAddressDto shippingAddressEntityToDTO(ShippingAddress shippingAddress);
@Mapping(target = "id", source = "orderKey")
ShippingAddress shippingAddressDtoToEntity(ShippingAddressDto shippingAddressDto,String orderKey);
BuyerInfoDto buyerInfoEntityToDto(BuyerInfo buyerInfo);
@Mapping(target = "id", source = "orderKey")
BuyerInfo buyerInfoDtoToEntity(BuyerInfoDto buyerInfo,String orderKey);
@Mapping(target = "id", source = "orderKey")
BuyerTaxInfo buyerTaxInfoDtoToEntity(BuyerTaxInfoDto buyerTaxInfoDto,String orderKey);
BuyerTaxInfoDto buyerTaxInfoEntityToDto(BuyerTaxInfo buyerTaxInfo);
}
spring.data.mongodb.uri=mongodb://localhost:27017/orderdb
spring.data.mongodb.database=orderdb
package com.egen.serversentevents;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ServerSentEventsApplicationTests {
@Test
void contextLoads() {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment