Commit 63ccba08 authored by Alex Pinto's avatar Alex Pinto

updated kafka connection

parent 3c2152e4
......@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: afp-warehouse-container
image: nexus.mynisum.com/afp-warehouse-backend:10
image: nexus.mynisum.com/afp-warehouse-backend:11
imagePullPolicy: Always
ports:
- containerPort: 8088
......
......@@ -42,11 +42,11 @@ public class WarehouseController {
.defaultIfEmpty(ResponseEntity.status(HttpStatus.NOT_FOUND).body(null));
}
// @CrossOrigin
// @PostMapping(value = "/kafkaOrders")
// public void createOrderKafka(@Valid @RequestBody WarehouseOrderRequest order) {
// sender.sendOrder(order);
// }
@CrossOrigin
@PostMapping(value = "/kafkaOrders")
public void createOrderKafka(@Valid @RequestBody WarehouseOrderRequest order) {
sender.sendOrder(order);
}
@CrossOrigin
@PostMapping(value = "/orders")
......
......@@ -27,17 +27,17 @@ public class Sender {
private static final String OMS = "OMS_ORDER_UPDATE";
// public void sendOrder(WarehouseOrderRequest currentOrder) {
// ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(TOPIC, currentOrder);
// Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaOMSProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
// .doOnError(throwable -> System.out.println(throwable))
// .doOnNext(t -> {
// if (null != t.exception()) {
// System.out.println("it works!");
// }
// });
// sendToKafka.doOnError(throwable -> log.error("error")).subscribe();
// }
public void sendOrder(WarehouseOrderRequest currentOrder) {
ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(TOPIC, currentOrder);
Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaOMSProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(t -> {
if (null != t.exception()) {
System.out.println("it works!");
}
});
sendToKafka.doOnError(throwable -> log.error("error")).subscribe();
}
public void sendUpdatedOrder(WarehouseOrderResponse currentOrder) {
ProducerRecord<String, WarehouseOrderResponse> record = new ProducerRecord<>(TOPIC, currentOrder);
......
spring.data.mongodb.uri=${MONGOCONNECTION}
spring.data.mongodb.uri=mongodb://afp-final-project-database:ZAHb4S4arnUGiyyFeOALt49qjyDuEGuh7U2j7R2Av4asZJ9tc7mAl89YxO0vsZHQiC6GQ3iGUt1Tl1NLH2j42Q==@afp-final-project-database.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@afp-final-project-database@
spring.data.mongodb.database=afp-warehouse-db
kafka.producer.bootstrap-servers: localhost:9092
kafka.producer.bootstrap-servers: 10.0.14.207:9092
kafka.producer.acks: all
kafka.consumer.group-id: WAREHOUSE_MANAGEMENT
kafka.topic.input: OMS_ORDER_UPDATE
#kafka.topic.input: OMS_ORDER_UPDATE
advertised.listeners=PLAINTEXT://10.0.14.207:9092
listeners=PLAINTEXT://0.0.0.0:9092
server.port=8080
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment