Commit 02039659 authored by Philippe Fonzin's avatar Philippe Fonzin

consume and produce Kafka messages

parent 18d4cafb
......@@ -29,10 +29,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
......@@ -93,15 +89,16 @@
<version>1.31.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
......
package com.ascendfinalproject.warehouse.config;
import com.ascendfinalproject.warehouse.models.WarehouseOrder;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
......@@ -8,11 +9,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Collection;
import java.util.Collections;
......@@ -39,6 +42,7 @@ public class KafkaConfig {
receiverConfigProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
receiverConfigProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
receiverConfigProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// receiverConfigProps.put(JsonDeserializer.TRUSTED_PACKAGES, WarehouseOrderRequest.class);
return receiverConfigProps;
}
......@@ -57,14 +61,15 @@ public class KafkaConfig {
senderConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
senderConfigProps.put(ProducerConfig.ACKS_CONFIG, acks);
senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return senderConfigProps;
}
@Bean
public KafkaSender<String, Mono<WarehouseOrder>> kafkaEventProducer() {
public KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer() {
// creates specified config options for kafkaSender using producerFactory
SenderOptions<String, Mono<WarehouseOrder>> senderOptions = SenderOptions.create(producerFactory());
SenderOptions<String, WarehouseOrderRequest> senderOptions = SenderOptions.create(producerFactory());
return KafkaSender.create(senderOptions);
}
......
package com.ascendfinalproject.warehouse.controllers;
<<<<<<< HEAD
import com.ascendfinalproject.warehouse.kafkaservice.Sender;
import com.ascendfinalproject.warehouse.models.OrderResponse;
import com.ascendfinalproject.warehouse.models.Session;
import com.ascendfinalproject.warehouse.models.WarehouseOrder;
import com.ascendfinalproject.warehouse.services.SessionService;
=======
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
>>>>>>> master
import com.ascendfinalproject.warehouse.services.WarehouseOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
......@@ -30,17 +25,16 @@ public class WarehouseController {
@Autowired
Sender sender;
// @PostMapping("/fulfilled")
// public void getOrderStatus() {
// sender.sendOrderStatus("fulfilled");
// @CrossOrigin
// @GetMapping("/order/{id}")
// public void getOrderToPublish(@PathVariable String id) {
// Mono<WarehouseOrderResponse> record = orderService.getById(id);
// record.map(e -> {
// sender.sendOrder(e);
// return e;
// }).subscribe();
// }
@CrossOrigin
@GetMapping("/order/{id}")
public void getOrderToPublish(@PathVariable String id) {
sender.sendOrder(orderService.findOrderById(id));
}
@CrossOrigin
@GetMapping(value = "/orders")
......@@ -48,6 +42,7 @@ public class WarehouseController {
return orderService.getOrders();
}
@CrossOrigin
@GetMapping("/orders/{id}")
public Mono<ResponseEntity> getById(@PathVariable String id) {
......@@ -57,6 +52,14 @@ public class WarehouseController {
.defaultIfEmpty(ResponseEntity.status(HttpStatus.NOT_FOUND).body(null));
}
@CrossOrigin
@PostMapping(value = "/kafkaOrders")
public void createOrderKafka(@Valid @RequestBody WarehouseOrderRequest order) {
// STEP ONE
sender.sendOrder(order);
// return orderService.createOrder(order);
}
@CrossOrigin
@PostMapping(value = "/orders")
public Mono<WarehouseOrderResponse> createOrder(@Valid @RequestBody WarehouseOrderRequest order) {
......
package com.ascendfinalproject.warehouse.kafkaservice;
import com.ascendfinalproject.warehouse.models.Address;
import com.ascendfinalproject.warehouse.models.Item;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import com.ascendfinalproject.warehouse.services.WarehouseOrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import java.util.Arrays;
import java.util.List;
@Service
@Slf4j
public class Receiver {
......@@ -14,11 +28,27 @@ public class Receiver {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@Autowired
private WarehouseOrderService orderService;
@EventListener(ApplicationStartedEvent.class)
public void consumeNewOrder() {
kafkaReceiver.receive()
.doOnNext(record -> log.info(String.format("Receive message: %s ", record.value())))
.doOnNext(record -> {
try {
System.out.println(record.value());
ObjectMapper objectMapper = new ObjectMapper();
WarehouseOrderRequest order = objectMapper.readValue(record.value(), WarehouseOrderRequest.class);
log.info("ORDER objectMapper {}", order);
Mono<WarehouseOrderResponse> mono = orderService.createOrder(order);
mono.subscribe();
// mono.block();
} catch (Exception e) {
log.error("Caught error", e);
}
})
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
......
package com.ascendfinalproject.warehouse.kafkaservice;
import com.ascendfinalproject.warehouse.models.WarehouseOrder;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -16,33 +17,25 @@ import reactor.kafka.sender.SenderResult;
public class Sender {
@Autowired
private KafkaSender<String, Mono<WarehouseOrder>> kafkaEventProducer;
private KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer;
private static final String TOPIC = "order";
public void sendOrder(Mono<WarehouseOrder> currentOrder) {
private static final String TOPIC = "test_topic";
//STEP TWO
public void sendOrder(WarehouseOrderRequest currentOrder) {
log.info(String.format("Sender message: %s ", currentOrder));
ProducerRecord<String, Mono<WarehouseOrder>> record = new ProducerRecord<>(TOPIC, currentOrder);
Flux<SenderResult<Mono<WarehouseOrder>>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
// CREATE RECORD
ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(TOPIC, currentOrder);
//SEND RECORD
Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(t -> {
if (null != t.exception()) {
System.out.println("it works!");
}
});
working.doOnError(throwable -> log.error("error")).subscribe();
sendToKafka.doOnError(throwable -> log.error("error")).subscribe();
}
// public void sendOrderStatus(String status) {
// log.info(String.format("Sender message: %s ", status));
// ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, status);
// Flux<SenderResult<String>> working = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, status)))
// .doOnError(throwable -> System.out.println(throwable))
// .doOnNext(uuidSenderResult -> {
// if (null != uuidSenderResult.exception()) {
// System.out.println("it works!");
// }
// });
// working.doOnError(throwable -> log.error("error")).subscribe();
// }
}
......@@ -2,11 +2,17 @@ package com.ascendfinalproject.warehouse.models;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Getter
@Setter
@ToString
public class Address {
public Address() {
}
private String street;
private String city;
private String state;
......
......@@ -2,11 +2,13 @@ package com.ascendfinalproject.warehouse.models;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.data.annotation.Id;
@Getter
@Setter
@ToString
public class Item {
private String itemId;
......@@ -22,4 +24,8 @@ public class Item {
this.itemPrice = itemPrice;
this.itemSku = itemSku;
}
public Item() {
}
}
......@@ -2,12 +2,14 @@ package com.ascendfinalproject.warehouse.models;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import java.util.List;
@Getter
@Setter
@ToString
public class WarehouseOrderRequest {
@Id
......
......@@ -2,6 +2,7 @@ package com.ascendfinalproject.warehouse.models;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import java.util.Date;
......@@ -9,6 +10,7 @@ import java.util.List;
@Getter
@Setter
@ToString
public class WarehouseOrderResponse {
@Id
private String id;
......@@ -19,11 +21,10 @@ public class WarehouseOrderResponse {
private List<Item> orderItems;
private String address;
<<<<<<< HEAD:src/main/java/com/ascendfinalproject/warehouse/models/WarehouseOrder.java
public WarehouseOrder() {
=======
public WarehouseOrderResponse() {
>>>>>>> master:src/main/java/com/ascendfinalproject/warehouse/models/WarehouseOrderResponse.java
}
}
......@@ -28,25 +28,48 @@ public class WarehouseOrderService {
}
public Mono<WarehouseOrderResponse> createOrder(WarehouseOrderRequest order) {
System.out.println("coming in from receiver " + order);
WarehouseOrderResponse response = new WarehouseOrderResponse();
response.setOrderId(order.getId());
response.setStatus("RECEIVED");
// response.setCreatedAt(order.getOrderCreatedAt());
// response.setModifiedAt(System.currentTimeMillis());
response.setCreatedAt(new Date(order.getOrderCreatedAt()));
response.setModifiedAt(new Date(order.getOrderUpdatedAt()));
response.setModifiedAt(new Date(System.currentTimeMillis()));
response.setOrderItems(order.getOrderItems());
Address address = order.getCustomerAddress();
response.setAddress(address.getStreet() + ", " +
address.getCity() + ", " +
address.getState() + ", " +
response.setAddress(address.getStreet() + " " +
address.getCity() + " " +
address.getState() + " " +
address.getZip()
);
System.out.println("built response " + response);
System.out.println("-----------");
return orderRepository.save(response);
}
// WarehouseOrderResponse{id='null', orderId='6085cb1e7681124ea05d2cab', status='RECEIVED', createdAt=Fri May 07 16:23:18 PDT 2021,
// modifiedAt=Fri May 07 16:21:39 PDT 2021, orderItems=[com.ascendfinalproject.warehouse.models.Item@2bfa9ace,
// com.ascendfinalproject.warehouse.models.Item@4a236e68, com.ascendfinalproject.warehouse.models.Item@8c53158,
// com.ascendfinalproject.warehouse.models.Item@b50f9d1], address='390 15th st., New York, NY, 10010'}
//
//
// ++++++++WarehouseOrderResponse(id=null, orderId=6085cb1e7681124ea05d2cab, status=RECEIVED, createdAt=Fri May 07 19:23:18 EDT 2021,
// modifiedAt=Wed Dec 31 19:00:00 EST 1969, orderItems=[Item(itemId=1, itemName=item name, itemQuantity=5,
// itemPrice=5.99, itemSku=1234)], address=390 15th st., New York, NY, 10010)
public Mono<WarehouseOrderResponse> updateOrder(WarehouseOrderResponse order, String id) {
return orderRepository.findById(id)
.flatMap(existingOrder -> {
existingOrder.setStatus(order.getStatus());
// existingOrder.setModifiedAt(System.currentTimeMillis());
existingOrder.setModifiedAt(new Date(System.currentTimeMillis()));
return orderRepository.save(existingOrder);
});
......@@ -60,5 +83,4 @@ public class WarehouseOrderService {
return orderRepository.deleteAll();
}
}
......@@ -6,78 +6,3 @@ kafka.producer.bootstrap-servers: localhost:9092
kafka.producer.acks: all
kafka.consumer.group-id: WAREHOUSE_MANAGEMENT
kafka.topic.input: test_topic
\ No newline at end of file
######### Mongo Sink Connector Properties ###########
name=mongo-sink
topics=sourceA,sourceB
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
# Message types
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb+srv://warehouse1:ascendWarehouseProject@warehouse-cluster.xopll.mongodb.net/myFirstDatabase?retryWrites=true&w=majority
database=warehouse-cluster
collection=warehouseOrder
max.num.retries=1
retries.defer.timeout=5000
## Document manipulation settings
key.projection.type=none
key.projection.list=
value.projection.type=none
value.projection.list=
field.renamer.mapping=[]
field.renamer.regex=[]
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
# Write configuration
delete.on.null.values=false
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
max.batch.size = 0
rate.limiting.timeout=0
rate.limiting.every.n=0
# Change Data Capture handling
change.data.capture.handler=
# Topic override examples for the sourceB topic
topic.override.sourceB.collection=sourceB
topic.override.sourceB.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
########## Mongo Source Connector #########
name=warehouseOrder
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb+srv://warehouse1:ascendWarehouseProject@warehouse-cluster.xopll.mongodb.net/myFirstDatabase?retryWrites=true&w=majority
database=warehouse-cluster
collection=warehouseOrder
topic.prefix=
topic.suffix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment