Commit b7954c7f authored by Shanelle Valencia's avatar Shanelle Valencia

Merge branch 'new-fetch-status-warehouse' into 'dev'

New fetch status warehouse

See merge request !19
parents 9cfefa7f ddd01603
Pipeline #1692 failed with stage
in 39 seconds
No preview for this file type
......@@ -58,3 +58,40 @@ Example Console Response:
2021-05-05 17:13:29.932 INFO 82715 --- [ntainer#0-0-C-1] c.afp.ordermanagement.service.Consumer : #### -> Consumed message -> somethinghere
```
## Testing Kafka Producer and Consumer
Make sure kafka server and zookeeper are running
To create a kafka topic:
```
kafka-topics.sh --describe --topic <insert-topic-here> --bootstrap-server localhost:9092
```
if the above command doesn't work, try:
```
/usr/local/Cellar/kafka/2.8.0/bin/kafka-topics --describe --topic orders --bootstrap-server localhost:9092
```
To start kafka console for producer:
```
kafka-console-producer.sh --topic orders --bootstrap-server localhost:9092
```
-OR-
```
/usr/local/Cellar/kafka/2.8.0/bin/kafka-console-producer --topic orders --bootstrap-server localhost:9092
```
To start kafka console for consumer:
```
kafka-console-consumer.sh --topic orders --from-beginning --bootstrap-server localhost:9092
```
-OR-
```
/usr/local/Cellar/kafka/2.8.0/bin/kafka-console-consumer --topic orders --from-beginning --bootstrap-server localhost:9092
```
(--from-beginning flag will show all messages that were received for this particular topic)
if you're still getting errors finding the pathway for kafka, try running:
```
ls -alrth /usr/local/Cellar/kafka/
```
then find your way to the bin directory where you'll find all the kafka commands
\ No newline at end of file
......@@ -85,7 +85,6 @@
<artifactId>velocity</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
......
......@@ -32,4 +32,9 @@ public class Order {
public Order(){
}
public Order(OrderStatus status) {
this.orderStatus = status;
}
}
package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.model.Order;
import com.afp.ordermanagement.model.OrderStatus;
import com.afp.ordermanagement.service.OrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@Service
@Slf4j
public class Receiver {
......@@ -14,11 +26,65 @@ public class Receiver {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@Autowired
private OrderService orderService;
@EventListener(ApplicationStartedEvent.class)
public void consumeOrderStatus() {
kafkaReceiver.receive()
.doOnNext(record -> log.info(String.format("##### -> Receiver receiving message: %s ", record.value())))
.doOnNext(record -> log.info("record.value(): {} ", record.value()))
.doOnNext(record -> onOrderStatusReceived(record.value()))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
private void onOrderStatusReceived(String orderStatusStr) {
try {
//deserialize string into java object using ObjectMapper
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> orderStatus = objectMapper.readValue(orderStatusStr, Map.class);
String id = orderStatus.get("id");
String status = orderStatus.get("orderStatus").toUpperCase(Locale.ROOT);
updateOrderStatus(id, status);
log.info("orderStatus: {}", orderStatus);
} catch (Exception e) {
log.error("Caught error", e);
}
}
private void updateOrderStatus(String orderId, String orderStatus) {
if (checkExistingOrder(orderId)) {
log.info("Updating {} with status {}", orderId, orderStatus);
Order newOrder = new Order(OrderStatus.valueOf(orderStatus));
Mono<Order> updateOrder = orderService.updateOrderByOrderId(orderId, newOrder);
updateOrder.subscribe();
// updateOrder.block(); //subscribe vs block?
}
}
private boolean checkExistingOrder(String orderId) {
Flux<Order> orders = orderService.getAllOrders();
List<Order> orderList = orders.collectList().block();
Order res = orderList.stream()
.filter(order -> orderId.equals(order.getId()))
.findAny()
.orElse(null);
if (res == null) {
log.error("Order {} not found", orderId);
return false;
}
log.info("Order exists on the database");
return true;
}
}
package com.afp.ordermanagement.reactivekafkaservice;
import com.afp.ordermanagement.service.OrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@Service
@Slf4j
public class Sender {
@Autowired
private KafkaSender<String, String> kafkaEventProducer;
private static final String ORDER_TOPIC = "orders";
public void sendOrderIdToWarehouse(String id) {
log.info(String.format("##### -> Sender sending message: %s ", id));
// sendMessage(ORDER_TOPIC, id, id);
ProducerRecord<String, String> record = new ProducerRecord<>(ORDER_TOPIC, id);
Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, id)))
.doOnError(throwable -> System.out.println(throwable))
......@@ -33,4 +47,13 @@ public class Sender {
});
working.doOnError(throwable -> log.error("some error")).subscribe();
}
public void sendUpdatedStatus(String id, String status) {
log.info(String.format("Sender sending updated status for ordernumber: %s", id));
ProducerRecord<String, String> stat = new ProducerRecord<>(ORDER_TOPIC, status);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment