Commit 7a44b79c authored by Darrick Yong's avatar Darrick Yong

Merge branch 'kafka' into 'master'

Kafka

See merge request !16
parents d25573e1 0b432aa1
{
"name": "warehouse",
"version": "0.1.0",
"lockfileVersion": 1,
"lockfileVersion": 2,
"requires": true,
<<<<<<< HEAD
=======
"packages": {
"": {
"name": "warehouse",
......@@ -19870,7 +19868,6 @@
}
}
},
>>>>>>> master
"dependencies": {
"@babel/code-frame": {
"version": "7.12.13",
......@@ -34038,6 +34035,21 @@
"resolved": "https://registry.npmjs.org/strict-uri-encode/-/strict-uri-encode-1.1.0.tgz",
"integrity": "sha1-J5siXfHVgrH1TmWt3UNS4Y+qBxM="
},
"string_decoder": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz",
"integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==",
"requires": {
"safe-buffer": "~5.2.0"
},
"dependencies": {
"safe-buffer": {
"version": "5.2.1",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz",
"integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ=="
}
}
},
"string-length": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/string-length/-/string-length-4.0.2.tgz",
......@@ -34094,21 +34106,6 @@
"define-properties": "^1.1.3"
}
},
"string_decoder": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz",
"integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==",
"requires": {
"safe-buffer": "~5.2.0"
},
"dependencies": {
"safe-buffer": {
"version": "5.2.1",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz",
"integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ=="
}
}
},
"stringify-object": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/stringify-object/-/stringify-object-3.3.0.tgz",
......@@ -35951,6 +35948,14 @@
"resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz",
"integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw=="
},
"string_decoder": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz",
"integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==",
"requires": {
"safe-buffer": "~5.1.0"
}
},
"string-width": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/string-width/-/string-width-3.1.0.tgz",
......@@ -35976,14 +35981,6 @@
}
}
},
"string_decoder": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz",
"integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==",
"requires": {
"safe-buffer": "~5.1.0"
}
},
"strip-ansi": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz",
......@@ -29,10 +29,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
......@@ -75,6 +71,18 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
......@@ -98,6 +106,7 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
......
package com.ascendfinalproject.warehouse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
// this is placeholder
@KafkaListener(topics = "fulfilled", groupId = "WAREHOUSE_MANAGEMENT")
public void consume(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
}
package com.ascendfinalproject.warehouse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String FULFILLED = "fulfilled";
private static final String CANCELLED = "cancelled";
@Autowired
// publish messages to the topic
private KafkaTemplate<String, String> kafkaTemplate;
public void orderFulfilled(String message) {
logger.info(String.format("#### -> this order is fulfilled -> %s", message));
this.kafkaTemplate.send(FULFILLED, message);
}
public void orderCancelled(String message) {
logger.info(String.format("#### -> this order is cancelled -> %s", message));
this.kafkaTemplate.send(CANCELLED, message);
}
}
package com.ascendfinalproject.warehouse.config;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.producer.acks}")
private String acks;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Bean
public Map<String, Object> consumerFactory() {
Map<String, Object> receiverConfigProps = new HashMap<>();
receiverConfigProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
receiverConfigProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
receiverConfigProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
receiverConfigProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return receiverConfigProps;
}
@Bean
public KafkaReceiver<String, String> kafkaEventReceiver(@Value("${kafka.topic.input}") String topic) {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerFactory());
receiverOptions.maxCommitAttempts(5);
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator).subscription(Collections.singleton(topic)));
}
@Bean
public Map<String, Object> producerFactory() {
Map<String, Object> senderConfigProps = new HashMap<>();
senderConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
senderConfigProps.put(ProducerConfig.ACKS_CONFIG, acks);
senderConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return senderConfigProps;
}
@Bean
public KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer() {
SenderOptions<String, WarehouseOrderRequest> senderOptions = SenderOptions.create(producerFactory());
return KafkaSender.create(senderOptions);
}
}
package com.ascendfinalproject.warehouse.controllers;
import com.ascendfinalproject.warehouse.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/fulfilled")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.orderFulfilled(message);
}
}
package com.ascendfinalproject.warehouse.controllers;
import com.ascendfinalproject.warehouse.kafkaservice.Sender;
import com.ascendfinalproject.warehouse.models.Session;
import com.ascendfinalproject.warehouse.services.SessionService;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import com.ascendfinalproject.warehouse.services.WarehouseOrderService;
......@@ -19,12 +22,27 @@ public class WarehouseController {
@Autowired
WarehouseOrderService orderService;
@Autowired
Sender sender;
// @CrossOrigin
// @GetMapping("/order/{id}")
// public void getOrderToPublish(@PathVariable String id) {
// Mono<WarehouseOrderResponse> record = orderService.getById(id);
// record.map(e -> {
// sender.sendOrder(e);
// return e;
// }).subscribe();
// }
@CrossOrigin
@GetMapping(value = "/orders")
public Flux<WarehouseOrderResponse> getOrders() {
return orderService.getOrders();
}
@CrossOrigin
@GetMapping("/orders/{id}")
public Mono<ResponseEntity> getById(@PathVariable String id) {
......@@ -34,6 +52,14 @@ public class WarehouseController {
.defaultIfEmpty(ResponseEntity.status(HttpStatus.NOT_FOUND).body(null));
}
@CrossOrigin
@PostMapping(value = "/kafkaOrders")
public void createOrderKafka(@Valid @RequestBody WarehouseOrderRequest order) {
// STEP ONE
sender.sendOrder(order);
// return orderService.createOrder(order);
}
@CrossOrigin
@PostMapping(value = "/orders")
public Mono<WarehouseOrderResponse> createOrder(@Valid @RequestBody WarehouseOrderRequest order) {
......
package com.ascendfinalproject.warehouse.kafkaservice;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import com.ascendfinalproject.warehouse.services.WarehouseOrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
@Service
@Slf4j
public class Receiver {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@Autowired
private WarehouseOrderService orderService;
@EventListener(ApplicationStartedEvent.class)
public void consumeNewOrder() {
kafkaReceiver.receive()
.doOnNext(record -> {
try {
System.out.println(record.value());
ObjectMapper objectMapper = new ObjectMapper();
WarehouseOrderRequest order = objectMapper.readValue(record.value(), WarehouseOrderRequest.class);
log.info("ORDER objectMapper {}", order);
Mono<WarehouseOrderResponse> mono = orderService.createOrder(order);
mono.subscribe();
} catch (Exception e) {
log.error("Caught error", e);
}
})
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.subscribe();
}
}
package com.ascendfinalproject.warehouse.kafkaservice;
import com.ascendfinalproject.warehouse.models.WarehouseOrderRequest;
import com.ascendfinalproject.warehouse.models.WarehouseOrderResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@Service
@Slf4j
public class Sender {
@Autowired
private KafkaSender<String, WarehouseOrderRequest> kafkaEventProducer;
private static final String TOPIC = "test_topic";
public void sendOrder(WarehouseOrderRequest currentOrder) {
log.info(String.format("Sender message: %s ", currentOrder));
ProducerRecord<String, WarehouseOrderRequest> record = new ProducerRecord<>(TOPIC, currentOrder);
Flux<SenderResult<WarehouseOrderRequest>> sendToKafka = kafkaEventProducer.send(Mono.just(SenderRecord.create(record, currentOrder)))
.doOnError(throwable -> System.out.println(throwable))
.doOnNext(t -> {
if (null != t.exception()) {
System.out.println("it works!");
}
});
sendToKafka.doOnError(throwable -> log.error("error")).subscribe();
}
}
......@@ -2,11 +2,17 @@ package com.ascendfinalproject.warehouse.models;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Getter
@Setter
@ToString
public class Address {
public Address() {
}
private String street;
private String city;
private String state;
......
......@@ -2,11 +2,13 @@ package com.ascendfinalproject.warehouse.models;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.data.annotation.Id;
@Getter
@Setter
@ToString
public class Item {
private String itemId;
......@@ -22,4 +24,8 @@ public class Item {
this.itemPrice = itemPrice;
this.itemSku = itemSku;
}
public Item() {
}
}
......@@ -2,12 +2,14 @@ package com.ascendfinalproject.warehouse.models;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import java.util.List;
@Getter
@Setter
@ToString
public class WarehouseOrderRequest {
@Id
......
......@@ -2,6 +2,7 @@ package com.ascendfinalproject.warehouse.models;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import java.util.Date;
......@@ -9,6 +10,7 @@ import java.util.List;
@Getter
@Setter
@ToString
public class WarehouseOrderResponse {
@Id
private String id;
......@@ -19,6 +21,10 @@ public class WarehouseOrderResponse {
private List<Item> orderItems;
private String address;
public WarehouseOrderResponse() {
}
}
......@@ -33,7 +33,10 @@ public class WarehouseOrderService {
}
public Mono<WarehouseOrderResponse> createOrder(WarehouseOrderRequest order) {
System.out.println("coming in from receiver " + order);
WarehouseOrderResponse response = new WarehouseOrderResponse();
response.setOrderId(order.getId());
response.setStatus(RECEIVED);
response.setCreatedAt(new Date(order.getOrderCreatedAt()));
......@@ -47,6 +50,7 @@ public class WarehouseOrderService {
return orderRepository.save(response);
}
public Mono<WarehouseOrderResponse> updateOrder(WarehouseOrderResponse order, String id) {
return orderRepository.findById(id)
.flatMap(existingOrder -> {
......@@ -70,5 +74,4 @@ public class WarehouseOrderService {
return orderRepository.deleteAll();
}
}
spring.data.mongodb.uri=mongodb+srv://warehouse1:ascendWarehouseProject@warehouse-cluster.xopll.mongodb.net/myFirstDatabase?retryWrites=true&w=majority
spring.data.mongodb.database=test
server:
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: WAREHOUSE_MANAGEMENT
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
\ No newline at end of file
kafka.producer.bootstrap-servers: localhost:9092
kafka.producer.acks: all
kafka.consumer.group-id: WAREHOUSE_MANAGEMENT
kafka.topic.input: test_topic
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment