Commit 3075cea2 authored by Alex Segers's avatar Alex Segers

[AFP-91] 🔀 Resolve merge conflict in 'pom.xml'

parents 68dc4600 c4da26ab
Pipeline #1705 failed with stage
in 38 seconds
File added
FROM maven:3.6.0-jdk-11-slim AS build
COPY src /home/app/src
COPY pom.xml /home/app
RUN mvn -f /home/app/pom.xml clean package -DskipTests
FROM openjdk:11-jre-slim FROM openjdk:11-jre-slim
COPY --from=build /home/app/target/order-management-0.0.1-SNAPSHOT.jar /usr/local/lib/order-management.jar COPY target/order-management-0.0.1-SNAPSHOT.jar /usr/local/lib/order-management.jar
EXPOSE 8080 EXPOSE 8080
ENTRYPOINT ["java","-jar","/usr/local/lib/order-management.jar"] ENTRYPOINT ["java","-jar","/usr/local/lib/order-management.jar"]
#### BELOW IS FOR LOCAL TESTING ONLY, DO NOT COMMENT/UNCOMMENT!
#FROM maven:3.6.0-jdk-11-slim AS build
#COPY src /home/app/src
#COPY pom.xml /home/app
#RUN mvn -f /home/app/pom.xml clean package -DskipTests
#FROM openjdk:11-jre-slim
#COPY --from=build /home/app/target/order-management-0.0.1-SNAPSHOT.jar /usr/local/lib/order-management.jar
#EXPOSE 8080
#ENTRYPOINT ["java","-jar","/usr/local/lib/order-management.jar"]
\ No newline at end of file
...@@ -58,3 +58,40 @@ Example Console Response: ...@@ -58,3 +58,40 @@ Example Console Response:
2021-05-05 17:13:29.932 INFO 82715 --- [ntainer#0-0-C-1] c.afp.ordermanagement.service.Consumer : #### -> Consumed message -> somethinghere 2021-05-05 17:13:29.932 INFO 82715 --- [ntainer#0-0-C-1] c.afp.ordermanagement.service.Consumer : #### -> Consumed message -> somethinghere
``` ```
## Testing Kafka Producer and Consumer
Make sure kafka server and zookeeper are running
To create a kafka topic:
```
kafka-topics.sh --describe --topic <insert-topic-here> --bootstrap-server localhost:9092
```
if the above command doesn't work, try:
```
/usr/local/Cellar/kafka/2.8.0/bin/kafka-topics --describe --topic orders --bootstrap-server localhost:9092
```
To start kafka console for producer:
```
kafka-console-producer.sh --topic orders --bootstrap-server localhost:9092
```
-OR-
```
/usr/local/Cellar/kafka/2.8.0/bin/kafka-console-producer --topic orders --bootstrap-server localhost:9092
```
To start kafka console for consumer:
```
kafka-console-consumer.sh --topic orders --from-beginning --bootstrap-server localhost:9092
```
-OR-
```
/usr/local/Cellar/kafka/2.8.0/bin/kafka-console-consumer --topic orders --from-beginning --bootstrap-server localhost:9092
```
(--from-beginning flag will show all messages that were received for this particular topic)
if you're still getting errors finding the pathway for kafka, try running:
```
ls -alrth /usr/local/Cellar/kafka/
```
then find your way to the bin directory where you'll find all the kafka commands
\ No newline at end of file
...@@ -25,6 +25,21 @@ ...@@ -25,6 +25,21 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId> <artifactId>spring-boot-starter-webflux</artifactId>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-json-serializer -->
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-dist -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
...@@ -35,13 +50,6 @@ ...@@ -35,13 +50,6 @@
<artifactId>reactor-test</artifactId> <artifactId>reactor-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.javafaker/javafaker --> <!-- https://mvnrepository.com/artifact/com.github.javafaker/javafaker -->
<dependency> <dependency>
<groupId>com.github.javafaker</groupId> <groupId>com.github.javafaker</groupId>
...@@ -87,6 +95,11 @@ ...@@ -87,6 +95,11 @@
<artifactId>google-api-client</artifactId> <artifactId>google-api-client</artifactId>
<version>1.31.1</version> <version>1.31.1</version>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
package com.afp.ordermanagement.config; package com.afp.ordermanagement.config;
import com.afp.ordermanagement.reactivekafkaservice.Sender; import com.afp.ordermanagement.model.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
...@@ -8,16 +8,14 @@ import org.apache.kafka.common.serialization.StringSerializer; ...@@ -8,16 +8,14 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
/*import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;*/
import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderOptions;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
...@@ -42,14 +40,14 @@ public class KafkaConfig { ...@@ -42,14 +40,14 @@ public class KafkaConfig {
senderConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); senderConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
senderConfigProps.put(ProducerConfig.ACKS_CONFIG, acks); senderConfigProps.put(ProducerConfig.ACKS_CONFIG, acks);
senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return senderConfigProps; return senderConfigProps;
//return new DefaultKafkaProducerFactory<>(configProps); //return new DefaultKafkaProducerFactory<>(senderConfigProps);
} }
@Bean @Bean
public KafkaSender<String, String> kafkaEventProducer() { public KafkaSender<String, Order> kafkaEventProducer() {
SenderOptions<String, String> senderOptions = SenderOptions.create(producerFactoryString()); SenderOptions<String, Order> senderOptions = SenderOptions.create(producerFactoryString());
return KafkaSender.create(senderOptions); return KafkaSender.create(senderOptions);
} }
......
...@@ -12,6 +12,7 @@ import reactor.core.publisher.Mono; ...@@ -12,6 +12,7 @@ import reactor.core.publisher.Mono;
@RestController @RestController
@RequestMapping("/api") @RequestMapping("/api")
@CrossOrigin
public class OrderController { public class OrderController {
@Autowired @Autowired
...@@ -22,12 +23,17 @@ public class OrderController { ...@@ -22,12 +23,17 @@ public class OrderController {
/** /**
* DESC - This route will let order manager get order status from warehouse * DESC - Persisting order information in the database and sending the
* @param orderId * order object via orders kafka topic so that warehouse can
* handle that
* @param orderObject
* @return
*/ */
@GetMapping("/orderStatus/{orderId}") @PostMapping("/ordersFromEcom")
public void getOrderStatusFromWarehouse(@PathVariable String orderId) { @ResponseStatus(HttpStatus.CREATED)
sender.sendOrderIdToWarehouse(orderId); public Mono<Order> getOrderFromEcom(@RequestBody Order orderObject) {
sender.sendOrderToWarehouse(orderObject);
return orderService.createOrder(orderObject);
} }
@GetMapping("/orders") @GetMapping("/orders")
...@@ -47,13 +53,6 @@ public class OrderController { ...@@ -47,13 +53,6 @@ public class OrderController {
return orderService.getAllOrdersByCustomerId(customerId); return orderService.getAllOrdersByCustomerId(customerId);
} }
@PostMapping("/orders")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Order> saveOrder(@RequestBody Order order){
return orderService.createOrder(order);
}
@PutMapping("/order/{orderId}") @PutMapping("/order/{orderId}")
public Mono<ResponseEntity<Order>> updateOrder(@PathVariable(value = "orderId") String orderId, @RequestBody Order order){ public Mono<ResponseEntity<Order>> updateOrder(@PathVariable(value = "orderId") String orderId, @RequestBody Order order){
return orderService.updateOrderByOrderId(orderId, order) return orderService.updateOrderByOrderId(orderId, order)
......
...@@ -32,4 +32,9 @@ public class Order { ...@@ -32,4 +32,9 @@ public class Order {
public Order(){ public Order(){
} }
public Order(OrderStatus status) {
this.orderStatus = status;
}
} }
package com.afp.ordermanagement.reactivekafkaservice; package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.service.OrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.KafkaReceiver;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@Service @Service
@Slf4j @Slf4j
public class Receiver { public class Receiver {
...@@ -14,11 +26,65 @@ public class Receiver { ...@@ -14,11 +26,65 @@ public class Receiver {
@Autowired @Autowired
private KafkaReceiver<String, String> kafkaReceiver; private KafkaReceiver<String, String> kafkaReceiver;
@Autowired
private OrderService orderService;
@EventListener(ApplicationStartedEvent.class) @EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() { public void consumeOrderStatus() {
kafkaReceiver.receive() kafkaReceiver.receive()
.doOnNext(record -> log.info(String.format("##### -> Receiver receiving message: %s ", record.value()))) .doOnNext(record -> System.out.println(record))
.doOnNext(record -> log.info("record.value(): {} ", record.value()))
.doOnNext(record -> onOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe(); .subscribe();
} }
private void onOrderStatusReceived(String orderStatusStr) {
try {
//deserialize string into java object using ObjectMapper
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> orderStatus = objectMapper.readValue(orderStatusStr, Map.class);
String id = orderStatus.get("id");
String status = orderStatus.get("orderStatus").toUpperCase(Locale.ROOT);
updateOrderStatus(id, status);
log.info("orderStatus: {}", orderStatus);
} catch (Exception e) {
log.error("Caught error", e);
}
}
private void updateOrderStatus(String orderId, String orderStatus) {
if (checkExistingOrder(orderId)) {
log.info("Updating {} with status {}", orderId, orderStatus);
Order newOrder = new Order(OrderStatus.valueOf(orderStatus));
Mono<Order> updateOrder = orderService.updateOrderByOrderId(orderId, newOrder);
updateOrder.subscribe();
// updateOrder.block(); //subscribe vs block?
}
}
private boolean checkExistingOrder(String orderId) {
Flux<Order> orders = orderService.getAllOrders();
List<Order> orderList = orders.collectList().block();
Order res = orderList.stream()
.filter(order -> orderId.equals(order.getId()))
.findAny()
.orElse(null);
if (res == null) {
log.error("Order {} not found", orderId);
return false;
}
log.info("Order exists on the database");
return true;
}
} }
package com.afp.ordermanagement.reactivekafkaservice; package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.model.Order;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -16,15 +17,18 @@ import reactor.kafka.sender.SenderResult; ...@@ -16,15 +17,18 @@ import reactor.kafka.sender.SenderResult;
public class Sender { public class Sender {
@Autowired @Autowired
private KafkaSender<String, String> kafkaEventProducer; private KafkaSender<String, Order> kafkaEventProducer;
private static final String ORDER_TOPIC = "orders"; private static final String ORDER_TOPIC = "orders";
public void sendOrderIdToWarehouse(String id) {
log.info(String.format("##### -> Sender sending message: %s ", id)); public void sendOrderToWarehouse(Order orderObject) {
ProducerRecord<String, String> record = new ProducerRecord<>(ORDER_TOPIC, id); log.info(String.format("##### -> Sender sending message: %s ", orderObject));
Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, id))) ProducerRecord<String, Order> record = new ProducerRecord<>(ORDER_TOPIC, orderObject);
Flux<SenderResult<Order>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, orderObject)))
.doOnError(throwable -> System.out.println(throwable)) .doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> { .doOnNext(uuidSenderResult -> {
if (null != uuidSenderResult.exception()) { if (null != uuidSenderResult.exception()) {
...@@ -33,4 +37,13 @@ public class Sender { ...@@ -33,4 +37,13 @@ public class Sender {
}); });
working.doOnError(throwable -> log.error("some error")).subscribe(); working.doOnError(throwable -> log.error("some error")).subscribe();
} }
// public void sendUpdatedStatus(String id, String status) {
// log.info(String.format("Sender sending updated status for ordernumber: %s", id));
// ProducerRecord<String, String> stat = new ProducerRecord<>(ORDER_TOPIC, status);
//
// }
} }
...@@ -15,6 +15,7 @@ public class OrderService { ...@@ -15,6 +15,7 @@ public class OrderService {
OrderRepository orderRepository; OrderRepository orderRepository;
public Mono<Order> createOrder(Order newOrder){ public Mono<Order> createOrder(Order newOrder){
System.out.println("here");
String defaultOrderTrackingCode = "N/A"; String defaultOrderTrackingCode = "N/A";
OrderStatus defaultOrderStatus = OrderStatus.RECEIVED; OrderStatus defaultOrderStatus = OrderStatus.RECEIVED;
long serviceSystemTime = System.currentTimeMillis(); long serviceSystemTime = System.currentTimeMillis();
...@@ -22,6 +23,7 @@ public class OrderService { ...@@ -22,6 +23,7 @@ public class OrderService {
newOrder.setOrderTrackingCode(defaultOrderTrackingCode); newOrder.setOrderTrackingCode(defaultOrderTrackingCode);
newOrder.setOrderCreatedAt(serviceSystemTime); newOrder.setOrderCreatedAt(serviceSystemTime);
newOrder.setOrderUpdatedAt(serviceSystemTime); newOrder.setOrderUpdatedAt(serviceSystemTime);
//System.out.println(newOrder);
return orderRepository.save(newOrder); return orderRepository.save(newOrder);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment