Commit 8233462e authored by Philippe Fonzin's avatar Philippe Fonzin

update order and send kafka message

parent 57efa5a0
package com.ascendfinalproject.warehouse.config; package com.ascendfinalproject.warehouse.config;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest; import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
...@@ -66,4 +67,10 @@ public class KafkaConfig { ...@@ -66,4 +67,10 @@ public class KafkaConfig {
SenderOptions<String, WarehouseOrderRequest> senderOptions = SenderOptions.create(producerFactory()); SenderOptions<String, WarehouseOrderRequest> senderOptions = SenderOptions.create(producerFactory());
return KafkaSender.create(senderOptions); return KafkaSender.create(senderOptions);
} }
@Bean
public KafkaSender<String, WarehouseOrderResponse> kafkaUpdateEventProducer() {
SenderOptions<String, WarehouseOrderResponse> senderOptions = SenderOptions.create(producerFactory());
return KafkaSender.create(senderOptions);
}
} }
...@@ -25,16 +25,6 @@ public class WarehouseController { ...@@ -25,16 +25,6 @@ public class WarehouseController {
@Autowired @Autowired
Sender sender; Sender sender;
// @CrossOrigin
// @GetMapping("/order/{id}")
// public void getOrderToPublish(@PathVariable String id) {
// Mono<WarehouseOrderResponse> record = orderService.getById(id);
// record.map(e -> {
// sender.sendOrder(e);
// return e;
// }).subscribe();
// }
@CrossOrigin @CrossOrigin
@GetMapping(value = "/orders") @GetMapping(value = "/orders")
...@@ -55,9 +45,7 @@ public class WarehouseController { ...@@ -55,9 +45,7 @@ public class WarehouseController {
@CrossOrigin @CrossOrigin
@PostMapping(value = "/kafkaOrders") @PostMapping(value = "/kafkaOrders")
public void createOrderKafka(@Valid @RequestBody WarehouseOrderRequest order) { public void createOrderKafka(@Valid @RequestBody WarehouseOrderRequest order) {
// STEP ONE
sender.sendOrder(order); sender.sendOrder(order);
// return orderService.createOrder(order);
} }
@CrossOrigin @CrossOrigin
...@@ -73,7 +61,7 @@ public class WarehouseController { ...@@ -73,7 +61,7 @@ public class WarehouseController {
} }
@CrossOrigin @CrossOrigin
@DeleteMapping(value = "/orders/{id}") @DeleteMapping(value = "/kafkaOrders/{id}")
public Mono<Void> deleteOrder(@PathVariable(value = "id") String id) { public Mono<Void> deleteOrder(@PathVariable(value = "id") String id) {
return orderService.deleteOrder(id); return orderService.deleteOrder(id);
} }
......
...@@ -19,11 +19,13 @@ public class Sender { ...@@ -19,11 +19,13 @@ public class Sender {
@Autowired @Autowired
private KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer; private KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer;
@Autowired
private KafkaSender<String, WarehouseOrderResponse> kafkaUpdateEventProducer;
private static final String TOPIC = "test_topic"; private static final String TOPIC = "test_topic";
public void sendOrder(WarehouseOrderRequest currentOrder) { public void sendOrder(WarehouseOrderRequest currentOrder) {
log.info(String.format("Sender message: %s ", currentOrder));
ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(TOPIC, currentOrder); ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(TOPIC, currentOrder);
Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder))) Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
.doOnError(throwable -> System.out.println(throwable)) .doOnError(throwable -> System.out.println(throwable))
...@@ -35,4 +37,15 @@ public class Sender { ...@@ -35,4 +37,15 @@ public class Sender {
sendToKafka.doOnError(throwable -> log.error("error")).subscribe(); sendToKafka.doOnError(throwable -> log.error("error")).subscribe();
} }
public void sendUpdatedOrder(WarehouseOrderResponse currentOrder) {
ProducerRecord<String, WarehouseOrderResponse> record = new ProducerRecord<>(TOPIC, currentOrder);
Flux<SenderResult<WarehouseOrderResponse>> sendToKafka = kafkaUpdateEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(t -> {
if (null != t.exception()) {
System.out.println("it works!");
}
});
sendToKafka.doOnError(throwable -> log.error("error")).subscribe();
}
} }
package com.ascendfinalproject.warehouse.services; package com.ascendfinalproject.warehouse.services;
import com.ascendfinalproject.warehouse.exceptions.NotFoundException; import com.ascendfinalproject.warehouse.exceptions.NotFoundException;
import com.ascendfinalproject.warehouse.kafkaservice.Sender;
import com.ascendfinalproject.warehouse.models.Address; import com.ascendfinalproject.warehouse.models.Address;
import com.ascendfinalproject.warehouse.models.Item;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest; import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse; import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import com.ascendfinalproject.warehouse.repositories.WarehouseOrderRepository; import com.ascendfinalproject.warehouse.repositories.WarehouseOrderRepository;
...@@ -13,6 +15,7 @@ import reactor.core.publisher.Flux; ...@@ -13,6 +15,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.Date; import java.util.Date;
import java.util.List;
@Service @Service
public class WarehouseOrderService { public class WarehouseOrderService {
...@@ -24,6 +27,9 @@ public class WarehouseOrderService { ...@@ -24,6 +27,9 @@ public class WarehouseOrderService {
@Autowired @Autowired
WarehouseOrderRepository orderRepository; WarehouseOrderRepository orderRepository;
@Autowired
Sender sender;
public Mono<WarehouseOrderResponse> getById(String id) { public Mono<WarehouseOrderResponse> getById(String id) {
return orderRepository.findById(id); return orderRepository.findById(id);
} }
...@@ -58,12 +64,13 @@ public class WarehouseOrderService { ...@@ -58,12 +64,13 @@ public class WarehouseOrderService {
if (order.getStatus().equals(FULFILLED) || order.getStatus().equals(CANCELLED)) { if (order.getStatus().equals(FULFILLED) || order.getStatus().equals(CANCELLED)) {
existingOrder.setStatus(order.getStatus()); existingOrder.setStatus(order.getStatus());
existingOrder.setModifiedAt(new Date(System.currentTimeMillis())); existingOrder.setModifiedAt(new Date(System.currentTimeMillis()));
sender.sendUpdatedOrder(existingOrder);
} }
} }
return orderRepository.save(existingOrder); return orderRepository.save(existingOrder);
}); });
} }
public Mono<Void> deleteOrder(String id) { public Mono<Void> deleteOrder(String id) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment