Commit 2ece3e60 authored by Darrick Yong's avatar Darrick Yong

Merge branch 'kafka' into 'master'

Kafka

See merge request !18
parents c9a5f1df 36c04e79
package com.ascendfinalproject.warehouse.config;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
......@@ -62,8 +63,14 @@ public class KafkaConfig {
@Bean
public KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer() {
public KafkaSender<String, WarehouseOrderRequest> kafkaOMSProducer() {
SenderOptions<String, WarehouseOrderRequest> senderOptions = SenderOptions.create(producerFactory());
return KafkaSender.create(senderOptions);
}
@Bean
public KafkaSender<String, WarehouseOrderResponse> kafkaUpdateEventProducer() {
SenderOptions<String, WarehouseOrderResponse> senderOptions = SenderOptions.create(producerFactory());
return KafkaSender.create(senderOptions);
}
}
......@@ -25,16 +25,6 @@ public class WarehouseController {
@Autowired
Sender sender;
// @CrossOrigin
// @GetMapping("/order/{id}")
// public void getOrderToPublish(@PathVariable String id) {
// Mono<WarehouseOrderResponse> record = orderService.getById(id);
// record.map(e -> {
// sender.sendOrder(e);
// return e;
// }).subscribe();
// }
@CrossOrigin
@GetMapping(value = "/orders")
......@@ -55,9 +45,7 @@ public class WarehouseController {
@CrossOrigin
@PostMapping(value = "/kafkaOrders")
public void createOrderKafka(@Valid @RequestBody WarehouseOrderRequest order) {
// STEP ONE
sender.sendOrder(order);
// return orderService.createOrder(order);
}
@CrossOrigin
......@@ -73,7 +61,7 @@ public class WarehouseController {
}
@CrossOrigin
@DeleteMapping(value = "/orders/{id}")
@DeleteMapping(value = "/kafkaOrders/{id}")
public Mono<Void> deleteOrder(@PathVariable(value = "id") String id) {
return orderService.deleteOrder(id);
}
......
......@@ -17,15 +17,19 @@ import reactor.kafka.sender.SenderResult;
public class Sender {
@Autowired
private KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer;
private KafkaSender<String, WarehouseOrderRequest> kafkaOMSProducer;
@Autowired
private KafkaSender<String, WarehouseOrderResponse> kafkaUpdateEventProducer;
private static final String TOPIC = "warehouse_management";
private static final String TOPIC = "test_topic";
private static final String OMS = "order_management";
public void sendOrder(WarehouseOrderRequest currentOrder) {
log.info(String.format("Sender message: %s ", currentOrder));
ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(TOPIC, currentOrder);
Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(OMS, currentOrder);
Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaOMSProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(t -> {
if (null != t.exception()) {
......@@ -35,4 +39,15 @@ public class Sender {
sendToKafka.doOnError(throwable -> log.error("error")).subscribe();
}
public void sendUpdatedOrder(WarehouseOrderResponse currentOrder) {
ProducerRecord<String, WarehouseOrderResponse> record = new ProducerRecord<>(TOPIC, currentOrder);
Flux<SenderResult<WarehouseOrderResponse>> sendToKafka = kafkaUpdateEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(t -> {
if (null != t.exception()) {
System.out.println("it works!");
}
});
sendToKafka.doOnError(throwable -> log.error("error")).subscribe();
}
}
package com.ascendfinalproject.warehouse.services;
import com.ascendfinalproject.warehouse.exceptions.NotFoundException;
import com.ascendfinalproject.warehouse.kafkaservice.Sender;
import com.ascendfinalproject.warehouse.models.Address;
import com.ascendfinalproject.warehouse.models.Item;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import com.ascendfinalproject.warehouse.repositories.WarehouseOrderRepository;
......@@ -13,6 +15,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Date;
import java.util.List;
@Service
public class WarehouseOrderService {
......@@ -24,6 +27,9 @@ public class WarehouseOrderService {
@Autowired
WarehouseOrderRepository orderRepository;
@Autowired
Sender sender;
public Mono<WarehouseOrderResponse> getById(String id) {
return orderRepository.findById(id);
}
......@@ -58,12 +64,13 @@ public class WarehouseOrderService {
if (order.getStatus().equals(FULFILLED) || order.getStatus().equals(CANCELLED)) {
existingOrder.setStatus(order.getStatus());
existingOrder.setModifiedAt(new Date(System.currentTimeMillis()));
sender.sendUpdatedOrder(existingOrder);
}
}
return orderRepository.save(existingOrder);
});
}
public Mono<Void> deleteOrder(String id) {
......
......@@ -5,4 +5,4 @@ spring.data.mongodb.database=test
kafka.producer.bootstrap-servers: localhost:9092
kafka.producer.acks: all
kafka.consumer.group-id: WAREHOUSE_MANAGEMENT
kafka.topic.input: test_topic
\ No newline at end of file
kafka.topic.input: order_management
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment