Commit 613bd984 authored by Alex Segers's avatar Alex Segers

Merge branch 'dev' into 'AFP-134'

# Conflicts:
#   src/main/java/com/afp/ordermanagement/controller/OrderController.java
parents 5ee47bf5 ab7415fe
Pipeline #1751 failed with stage
in 38 seconds
No preview for this file type
...@@ -3,6 +3,7 @@ package com.afp.ordermanagement.controller; ...@@ -3,6 +3,7 @@ package com.afp.ordermanagement.controller;
import com.afp.ordermanagement.model.Order; import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.reactivekafkaservice.Receiver; import com.afp.ordermanagement.reactivekafkaservice.Receiver;
import com.afp.ordermanagement.reactivekafkaservice.Sender; import com.afp.ordermanagement.reactivekafkaservice.Sender;
import com.afp.ordermanagement.service.EmailService;
import com.afp.ordermanagement.service.OrderService; import com.afp.ordermanagement.service.OrderService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
...@@ -41,8 +42,13 @@ public class OrderController { ...@@ -41,8 +42,13 @@ public class OrderController {
@PostMapping("/orders") @PostMapping("/orders")
@ResponseStatus(HttpStatus.CREATED) @ResponseStatus(HttpStatus.CREATED)
public Mono<Order> getOrderFromEcom(@RequestBody Order orderObject) { public Mono<Order> getOrderFromEcom(@RequestBody Order orderObject) {
sender.sendOrderToWarehouse(orderObject); // Mono<Order> orderCreated = orderService.createOrder(orderObject);
return orderService.createOrder(orderObject); return orderService.createOrder(orderObject)
.doOnSuccess(order -> {
orderObject.setId(order.getId());
System.out.println("sending order*******************" + orderObject.getId());
sender.sendOrderToTopic(orderObject);
});
} }
@GetMapping("/orders") @GetMapping("/orders")
...@@ -79,4 +85,12 @@ public class OrderController { ...@@ -79,4 +85,12 @@ public class OrderController {
public void deleteOrderbyId(@PathVariable(value = "orderId") String orderId) { public void deleteOrderbyId(@PathVariable(value = "orderId") String orderId) {
orderService.deleteOrderById(orderId); orderService.deleteOrderById(orderId);
} }
@PostMapping("/orderStatus")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Order> getUpdatedOrderFromWarehouseTopic(@RequestBody Order order) {
sender.sendUpdatedOrderToWarehouseTopic(order);
return orderService.updateOrderByOrderId(order.getId(), order);
}
} }
package com.afp.ordermanagement.model;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import java.util.Date;
import java.util.List;
@Getter
@Setter
@ToString
public class WarehouseOrder {
@Id
private String id;
private String orderId;
private String status;
private Date createdAt;
private Date modifiedAt;
private List<Item> orderItems;
private String address;
public WarehouseOrder() {
}
}
...@@ -4,7 +4,9 @@ import com.afp.ordermanagement.controller.OrderController; ...@@ -4,7 +4,9 @@ import com.afp.ordermanagement.controller.OrderController;
import com.afp.ordermanagement.model.Item; import com.afp.ordermanagement.model.Item;
import com.afp.ordermanagement.model.Order; import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus; import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.model.WarehouseOrder;
import com.afp.ordermanagement.repository.OrderRepository; import com.afp.ordermanagement.repository.OrderRepository;
import com.afp.ordermanagement.service.EmailService;
import com.afp.ordermanagement.service.OrderService; import com.afp.ordermanagement.service.OrderService;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
...@@ -34,6 +36,8 @@ public class Receiver { ...@@ -34,6 +36,8 @@ public class Receiver {
@Autowired @Autowired
private OrderRepository orderRepository; private OrderRepository orderRepository;
@Autowired
private EmailService emailService;
@EventListener(ApplicationStartedEvent.class) @EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() { public void consumeOrderStatus() {
...@@ -51,12 +55,19 @@ public class Receiver { ...@@ -51,12 +55,19 @@ public class Receiver {
private void updateOrderStatus(String orderStr) { private void updateOrderStatus(String orderStr) {
try { try {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
Order order = objectMapper.readValue(orderStr, Order.class); WarehouseOrder order = objectMapper.readValue(orderStr, WarehouseOrder.class);
log.info("ORDER objectMapper {}", order); log.info("order received from warehouse {}", order);
String orderId = order.getId(); String orderId = order.getOrderId();
Mono<Order> updated = orderService.updateOrderByOrderId(orderId, order); System.out.println("About to try sending an email.");
updated.block(); Mono<Order> convertedOrder = orderService.getOrderById(order.getOrderId());
convertedOrder.doOnSuccess(o -> {
log.info("Existing order in the db: {}", o);
o.setOrderStatus(OrderStatus.valueOf(order.getStatus()));
emailService.emailCreator(o);
orderService.updateOrderByOrderId(orderId, o).subscribe();
System.out.println("ayayayayayayayaya receiver" + o);
}).subscribe();
} catch (Exception e) { } catch (Exception e) {
log.error("Caught error on UpdateOrderStatus method", e); log.error("Caught error on UpdateOrderStatus method", e);
......
package com.afp.ordermanagement.reactivekafkaservice; package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.model.Order; import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.service.EmailService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -11,24 +12,29 @@ import reactor.kafka.sender.KafkaSender; ...@@ -11,24 +12,29 @@ import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord; import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult; import reactor.kafka.sender.SenderResult;
import java.util.UUID;
@Service @Service
@Slf4j @Slf4j
public class Sender { public class Sender {
@Autowired
private EmailService emailService;
@Autowired @Autowired
private KafkaSender<String, Order> kafkaEventProducer; private KafkaSender<String, Order> kafkaEventProducer;
private static final String ORDER_TOPIC = "orders"; private static final String ORDER_TOPIC = "OMS_ORDER_UPDATE";
private static final String WAREHOUSE_TOPIC = "WMOS_ORDER_UPDATE";
public void sendOrderToWarehouse(Order orderObject) { public void sendOrderToTopic(Order orderObject) {
log.info(String.format("##### -> Sender sending message: %s ", orderObject)); log.info(String.format("##### -> Sender sending message: %s ", orderObject));
ProducerRecord<String, Order> record = new ProducerRecord<>(ORDER_TOPIC, orderObject); ProducerRecord<String, Order> record = new ProducerRecord<>(ORDER_TOPIC, orderObject);
Flux<SenderResult<Order>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, orderObject))) System.out.println("In sendOrderToWarehouse");
emailService.emailCreator(orderObject);
Flux<SenderResult<UUID>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, UUID.randomUUID())))
.doOnError(throwable -> System.out.println(throwable)) .doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> { .doOnNext(uuidSenderResult -> {
if (null != uuidSenderResult.exception()) { if (null != uuidSenderResult.exception()) {
...@@ -38,5 +44,18 @@ public class Sender { ...@@ -38,5 +44,18 @@ public class Sender {
working.doOnError(throwable -> log.error("some error")).subscribe(); working.doOnError(throwable -> log.error("some error")).subscribe();
} }
public void sendUpdatedOrderToWarehouseTopic(Order order) {
ProducerRecord<String, Order> record = new ProducerRecord<>(WAREHOUSE_TOPIC, order);
System.out.println("In sendUpdatedOrderToWarehouseTopic");
Flux<SenderResult<Order>> updated = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, order)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> {
if (null != uuidSenderResult.exception()) {
System.out.println("send order update");
}
});
updated.doOnError(throwable -> log.error("error on sendUpdatedOrderToWarehouse method")).subscribe();
}
} }
...@@ -22,7 +22,7 @@ public class EmailService { ...@@ -22,7 +22,7 @@ public class EmailService {
mailMessage.setSubject(subject); mailMessage.setSubject(subject);
mailMessage.setText(message); mailMessage.setText(message);
mailMessage.setFrom("NOREPLY@nisum.com"); mailMessage.setFrom("kaminskikeving@gmail.com");
javaMailSender.send(mailMessage); javaMailSender.send(mailMessage);
} }
...@@ -32,16 +32,16 @@ public class EmailService { ...@@ -32,16 +32,16 @@ public class EmailService {
String message1 = "message ", message2 = "build failed."; String message1 = "message ", message2 = "build failed.";
switch (status) { switch (status) {
case RECEIVED: case RECEIVED:
message1 = "Hello, you order #" + order.getId() + " has been received!"; message1 = "Hello, your order #" + order.getId() + " has been received!";
message2 = "We hope to have your order fulfilled soon."; message2 = " We hope to have your order fulfilled soon.\n\n" + order.getOrderItems();
break; break;
case CANCELLED: case CANCELLED:
message1 = "I'm sorry, your order #" + order.getId() + " has been canceled."; message1 = "I'm sorry, your order #" + order.getId() + " has been canceled.";
message2 = "For more information, contact {NULL}."; message2 = " For more information, contact {NULL}.";
break; break;
case FULFILLED: case FULFILLED:
message1 = "Good news everyone! Your order #" + order.getId() + " has been fulfilled."; message1 = "Good news everyone! Your order #" + order.getId() + " has been fulfilled.";
message2 = "Your tracking number is: " + order.getOrderTrackingCode(); message2 = " Your tracking number is: " + order.getOrderTrackingCode();
break; break;
} }
return message1 + message2; return message1 + message2;
...@@ -60,7 +60,7 @@ public class EmailService { ...@@ -60,7 +60,7 @@ public class EmailService {
status = "FULFILLED"; status = "FULFILLED";
break; break;
} }
return "Your order #" + order.getId() + " has been" + status; return "Your order #" + order.getId() + " has been " + status;
} }
public String toCreator(Order order) { public String toCreator(Order order) {
...@@ -73,6 +73,7 @@ public class EmailService { ...@@ -73,6 +73,7 @@ public class EmailService {
String to = toCreator(order); String to = toCreator(order);
sendMail(to, subject, message); sendMail(to, subject, message);
return "Email sent to customer!"; return "Email sent to customer!";
} }
} }
...@@ -38,6 +38,7 @@ public class OrderService { ...@@ -38,6 +38,7 @@ public class OrderService {
newOrder.setOrderTrackingCode(defaultOrderTrackingCode); newOrder.setOrderTrackingCode(defaultOrderTrackingCode);
newOrder.setOrderCreatedAt(serviceSystemTime); newOrder.setOrderCreatedAt(serviceSystemTime);
newOrder.setOrderUpdatedAt(serviceSystemTime); newOrder.setOrderUpdatedAt(serviceSystemTime);
//newOrder.setId("abc123");
//System.out.println(newOrder); //System.out.println(newOrder);
return orderRepository.save(newOrder); return orderRepository.save(newOrder);
} }
...@@ -95,12 +96,13 @@ public class OrderService { ...@@ -95,12 +96,13 @@ public class OrderService {
return orderRepository.findById(orderId) return orderRepository.findById(orderId)
.flatMap(existingOrder -> { .flatMap(existingOrder -> {
existingOrder.setCustomerAddress(newOrder.getCustomerAddress()); //existingOrder.setCustomerAddress(newOrder.getCustomerAddress());
existingOrder.setCustomerEmailAddress(newOrder.getCustomerEmailAddress()); //existingOrder.setCustomerEmailAddress(newOrder.getCustomerEmailAddress());
existingOrder.setOrderTrackingCode(newOrder.getOrderTrackingCode()); //existingOrder.setOrderTrackingCode(newOrder.getOrderTrackingCode());
existingOrder.setOrderItems(newOrder.getOrderItems()); //existingOrder.setOrderItems(newOrder.getOrderItems());
existingOrder.setOrderStatus(newOrder.getOrderStatus()); existingOrder.setOrderStatus(newOrder.getOrderStatus());
existingOrder.setOrderItems(newOrder.getOrderItems()); //existingOrder.setOrderItems(newOrder.getOrderItems());
System.out.println("serviceeee" + existingOrder);
return orderRepository.save(existingOrder); return orderRepository.save(existingOrder);
}); });
} }
......
spring.data.mongodb.uri=mongodb+srv://user:password2021@cluster0.g23rm.mongodb.net/myFirstDatabase?
kafka.producer.bootstrap-servers=localhost:9092 kafka.producer.bootstrap-servers=localhost:9092
kafka.producer.acks=all kafka.producer.acks=all
kafka.consumer.bootstrap-servers=localhost:9092 kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.group-id=group_id kafka.consumer.group-id=group_id
kafka.topic.input=orders kafka.topic.input=WMOS_ORDER_UPDATE
server.port=8084
# Config for MailTrap SMTP Mail testing service # Config for MailTrap SMTP Mail testing service
spring.mail.protocol=smtp spring.mail.protocol=smtp
spring.mail.host=smtp.mailtrap.io spring.mail.host=smtp.mailtrap.io
...@@ -10,4 +14,13 @@ spring.mail.port=2525 ...@@ -10,4 +14,13 @@ spring.mail.port=2525
spring.mail.username=945a9d376253be spring.mail.username=945a9d376253be
spring.mail.password=96d65f623868cc spring.mail.password=96d65f623868cc
spring.mail.properties.mail.smtp.auth = true spring.mail.properties.mail.smtp.auth = true
spring.mail.properties.mail.smtp.starttls.enable = true spring.mail.properties.mail.smtp.starttls.enable = false
\ No newline at end of file
## Config for GMail SMTP mail sending
#spring.mail.host=smtp.gmail.com
#spring.mail.port=587
#spring.mail.username=gmail email address
#spring.mail.password=gmail email password
#spring.mail.properties.mail.smtp.auth=true
#spring.mail.properties.mail.smtp.starttls.enable=true
#mail.smtp.debug=true
\ No newline at end of file
kafka.producer.bootstrap-servers=localhost:9092
kafka.producer.acks=all
kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.group-id=group_id
kafka.topic.input=orders
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment