Commit f0f49896 authored by Philippe Fonzin's avatar Philippe Fonzin

can get record from mongo

parent 0fe0a12f
...@@ -92,6 +92,13 @@ ...@@ -92,6 +92,13 @@
<artifactId>google-api-client</artifactId> <artifactId>google-api-client</artifactId>
<version>1.31.2</version> <version>1.31.2</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies> </dependencies>
......
package com.ascendfinalproject.warehouse.config; package com.ascendfinalproject.warehouse.config;
import com.ascendfinalproject.warehouse.models.WarehouseOrder;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
...@@ -7,6 +8,7 @@ import org.apache.kafka.common.serialization.StringSerializer; ...@@ -7,6 +8,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.KafkaSender;
...@@ -60,9 +62,9 @@ public class KafkaConfig { ...@@ -60,9 +62,9 @@ public class KafkaConfig {
} }
@Bean @Bean
public KafkaSender<String, String> kafkaEventProducer() { public KafkaSender<String, Mono<WarehouseOrder>> kafkaEventProducer() {
// creates specified config options for kafkaSender using producerFactory // creates specified config options for kafkaSender using producerFactory
SenderOptions<String, String> senderOptions = SenderOptions.create(producerFactory()); SenderOptions<String, Mono<WarehouseOrder>> senderOptions = SenderOptions.create(producerFactory());
return KafkaSender.create(senderOptions); return KafkaSender.create(senderOptions);
} }
......
...@@ -23,9 +23,15 @@ public class WarehouseController { ...@@ -23,9 +23,15 @@ public class WarehouseController {
@Autowired @Autowired
Sender sender; Sender sender;
@PostMapping("/fulfilled") // @PostMapping("/fulfilled")
public void getOrderStatusFromWarehouse() { // public void getOrderStatus() {
sender.sendOrderStatus("fulfilled"); // sender.sendOrderStatus("fulfilled");
// }
@CrossOrigin
@GetMapping("/order/{id}")
public void getOrderToPublish(@PathVariable String id) {
sender.sendOrder(orderService.findOrderById(id));
} }
......
...@@ -14,6 +14,7 @@ public class Receiver { ...@@ -14,6 +14,7 @@ public class Receiver {
@Autowired @Autowired
private KafkaReceiver<String, String> kafkaReceiver; private KafkaReceiver<String, String> kafkaReceiver;
@EventListener(ApplicationStartedEvent.class) @EventListener(ApplicationStartedEvent.class)
public void consumeNewOrder() { public void consumeNewOrder() {
kafkaReceiver.receive() kafkaReceiver.receive()
......
package com.ascendfinalproject.warehouse.kafkaservice; package com.ascendfinalproject.warehouse.kafkaservice;
import com.ascendfinalproject.warehouse.models.WarehouseOrder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -15,21 +16,33 @@ import reactor.kafka.sender.SenderResult; ...@@ -15,21 +16,33 @@ import reactor.kafka.sender.SenderResult;
public class Sender { public class Sender {
@Autowired @Autowired
private KafkaSender<String, String> kafkaEventProducer; private KafkaSender<String, Mono<WarehouseOrder>> kafkaEventProducer;
private static final String TOPIC = "order"; private static final String TOPIC = "order";
public void sendOrder(Mono<WarehouseOrder> currentOrder) {
public void sendOrderStatus(String status) { log.info(String.format("Sender message: %s ", currentOrder));
log.info(String.format("Sender message: %s ", status)); ProducerRecord<String, Mono<WarehouseOrder>> record = new ProducerRecord<>(TOPIC, currentOrder);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, status); Flux<SenderResult<Mono<WarehouseOrder>>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, status)))
.doOnError(throwable -> System.out.println(throwable)) .doOnError(throwable -> System.out.println(throwable))
.doOnNext(uuidSenderResult -> { .doOnNext(t -> {
if (null != uuidSenderResult.exception()) { if (null != t.exception()) {
System.out.println("it works!"); System.out.println("it works!");
} }
}); });
working.doOnError(throwable -> log.error("error")).subscribe(); working.doOnError(throwable -> log.error("error")).subscribe();
} }
// public void sendOrderStatus(String status) {
// log.info(String.format("Sender message: %s ", status));
// ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, status);
// Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, status)))
// .doOnError(throwable -> System.out.println(throwable))
// .doOnNext(uuidSenderResult -> {
// if (null != uuidSenderResult.exception()) {
// System.out.println("it works!");
// }
// });
// working.doOnError(throwable -> log.error("error")).subscribe();
// }
} }
...@@ -20,5 +20,6 @@ public class WarehouseOrder { ...@@ -20,5 +20,6 @@ public class WarehouseOrder {
private String address; private String address;
public WarehouseOrder() { public WarehouseOrder() {
} }
} }
...@@ -5,4 +5,79 @@ spring.data.mongodb.database=test ...@@ -5,4 +5,79 @@ spring.data.mongodb.database=test
kafka.producer.bootstrap-servers: localhost:9092 kafka.producer.bootstrap-servers: localhost:9092
kafka.producer.acks: all kafka.producer.acks: all
kafka.consumer.group-id: WAREHOUSE_MANAGEMENT kafka.consumer.group-id: WAREHOUSE_MANAGEMENT
kafka.topic.input: test_topic kafka.topic.input: test_topic
\ No newline at end of file
######### Mongo Sink Connector Properties ###########
name=mongo-sink
topics=sourceA,sourceB
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
# Message types
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb+srv://warehouse1:ascendWarehouseProject@warehouse-cluster.xopll.mongodb.net/myFirstDatabase?retryWrites=true&w=majority
database=warehouse-cluster
collection=warehouseOrder
max.num.retries=1
retries.defer.timeout=5000
## Document manipulation settings
key.projection.type=none
key.projection.list=
value.projection.type=none
value.projection.list=
field.renamer.mapping=[]
field.renamer.regex=[]
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
# Write configuration
delete.on.null.values=false
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
max.batch.size = 0
rate.limiting.timeout=0
rate.limiting.every.n=0
# Change Data Capture handling
change.data.capture.handler=
# Topic override examples for the sourceB topic
topic.override.sourceB.collection=sourceB
topic.override.sourceB.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
########## Mongo Source Connector #########
name=warehouseOrder
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb+srv://warehouse1:ascendWarehouseProject@warehouse-cluster.xopll.mongodb.net/myFirstDatabase?retryWrites=true&w=majority
database=warehouse-cluster
collection=warehouseOrder
topic.prefix=
topic.suffix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment