Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
O
order-management-backend
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Ascend
order-management-backend
Commits
d644e306
Commit
d644e306
authored
May 07, 2021
by
Vishal Vaddadhi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[AFP-100] Transition Kafka setup to reactive Kafka
parent
a275ea9d
Pipeline
#1673
failed with stage
in 38 seconds
Changes
11
Pipelines
1
Show whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
127 additions
and
225 deletions
+127
-225
pom.xml
backend/order-management/pom.xml
+5
-12
KafkaConfig.java
...main/java/com/afp/ordermanagement/config/KafkaConfig.java
+59
-23
ManagerMongoConfig.java
...va/com/afp/ordermanagement/config/ManagerMongoConfig.java
+0
-38
OrderMongoConfig.java
...java/com/afp/ordermanagement/config/OrderMongoConfig.java
+0
-39
KafkaController.java
...a/com/afp/ordermanagement/controller/KafkaController.java
+0
-26
ManagerController.java
...com/afp/ordermanagement/controller/ManagerController.java
+0
-1
OrderController.java
...a/com/afp/ordermanagement/controller/OrderController.java
+3
-4
Receiver.java
...om/afp/ordermanagement/reactivekafkaservice/Receiver.java
+24
-0
Sender.java
.../com/afp/ordermanagement/reactivekafkaservice/Sender.java
+36
-0
Consumer.java
...c/main/java/com/afp/ordermanagement/service/Consumer.java
+0
-28
Producer.java
...c/main/java/com/afp/ordermanagement/service/Producer.java
+0
-54
No files found.
backend/order-management/pom.xml
View file @
d644e306
...
@@ -26,19 +26,7 @@
...
@@ -26,19 +26,7 @@
<artifactId>
spring-boot-starter-webflux
</artifactId>
<artifactId>
spring-boot-starter-webflux
</artifactId>
</dependency>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>
org.springframework.kafka
</groupId>
<artifactId>
spring-kafka
</artifactId>
<version>
2.7.0
</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-dist -->
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-dist -->
<dependency>
<groupId>
org.springframework.kafka
</groupId>
<artifactId>
spring-kafka-dist
</artifactId>
<version>
2.7.0
</version>
<type>
pom
</type>
</dependency>
<dependency>
<dependency>
<groupId>
org.projectlombok
</groupId>
<groupId>
org.projectlombok
</groupId>
<artifactId>
lombok
</artifactId>
<artifactId>
lombok
</artifactId>
...
@@ -97,6 +85,11 @@
...
@@ -97,6 +85,11 @@
<artifactId>
velocity
</artifactId>
<artifactId>
velocity
</artifactId>
<version>
1.5
</version>
<version>
1.5
</version>
</dependency>
</dependency>
<dependency>
<groupId>
io.projectreactor.kafka
</groupId>
<artifactId>
reactor-kafka
</artifactId>
</dependency>
</dependencies>
</dependencies>
<build>
<build>
<plugins>
<plugins>
...
...
backend/order-management/src/main/java/com/afp/ordermanagement/config/KafkaConfig.java
View file @
d644e306
package
com
.
afp
.
ordermanagement
.
config
;
package
com
.
afp
.
ordermanagement
.
config
;
import
com.afp.ordermanagement.reactivekafkaservice.Sender
;
import
org.apache.kafka.clients.consumer.ConsumerConfig
;
import
org.apache.kafka.clients.consumer.ConsumerConfig
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.common.serialization.StringDeserializer
;
import
org.apache.kafka.common.serialization.StringDeserializer
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
;
/*
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import
org.springframework.kafka.core.DefaultKafkaProducerFactory
;
import org.springframework.kafka.core.KafkaTemplate;*/
import
org.springframework.kafka.core.KafkaTemplate
;
import
reactor.kafka.receiver.KafkaReceiver
;
import
org.springframework.kafka.core.ProducerFactory
;
import
reactor.kafka.receiver.ReceiverOptions
;
import
reactor.kafka.sender.KafkaSender
;
import
reactor.kafka.sender.SenderOptions
;
import
java.util.Collection
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Map
;
@Configuration
@Configuration
public
class
KafkaConfig
{
public
class
KafkaConfig
{
@Value
(
"${kafka.producer.bootstrap-servers}"
)
private
String
bootstrapServers
;
@Value
(
"${kafka.producer.acks}"
)
private
String
acks
;
@Value
(
"${kafka.consumer.group-id}"
)
private
String
groupId
;
@Bean
@Bean
public
ProducerFactory
<
String
,
String
>
producerFactoryString
()
{
public
Map
<
String
,
Object
>
producerFactoryString
()
{
Map
<
String
,
Object
>
configProps
=
new
HashMap
<>();
Map
<
String
,
Object
>
senderConfigProps
=
new
HashMap
<>();
configProps
.
put
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
"localhost:9092"
);
senderConfigProps
.
put
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
bootstrapServers
);
configProps
.
put
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
);
senderConfigProps
.
put
(
ProducerConfig
.
ACKS_CONFIG
,
acks
);
configProps
.
put
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
);
senderConfigProps
.
put
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
);
return
new
DefaultKafkaProducerFactory
<>(
configProps
);
senderConfigProps
.
put
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
);
return
senderConfigProps
;
//return new DefaultKafkaProducerFactory<>(configProps);
}
}
@Bean
@Bean
public
KafkaTemplate
<
String
,
String
>
kafkaTemplateString
()
{
public
KafkaSender
<
String
,
String
>
kafkaEventProducer
()
{
return
new
KafkaTemplate
<>(
producerFactoryString
());
SenderOptions
<
String
,
String
>
senderOptions
=
SenderOptions
.
create
(
producerFactoryString
());
return
KafkaSender
.
create
(
senderOptions
);
}
}
// @Bean
// public KafkaTemplate<String, Object> kafkaTemplateString() {
// return new KafkaTemplate<>(producerFactoryString());
// }
@Bean
@Bean
public
ConsumerFactory
<
String
,
String
>
consumerFactory
()
{
public
Map
<
String
,
Object
>
consumerFactory
()
{
Map
<
String
,
Object
>
c
onfigProps
=
new
HashMap
<>();
Map
<
String
,
Object
>
receiverC
onfigProps
=
new
HashMap
<>();
configProps
.
put
(
ConsumerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
"localhost:9092"
);
receiverConfigProps
.
put
(
ConsumerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
bootstrapServers
);
configProps
.
put
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
"group_id"
);
receiverConfigProps
.
put
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
groupId
);
c
onfigProps
.
put
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
receiverC
onfigProps
.
put
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
c
onfigProps
.
put
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
receiverC
onfigProps
.
put
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
return
new
DefaultKafkaConsumerFactory
<>(
configProps
)
;
return
receiverConfigProps
;
}
}
@Bean
@Bean
public
ConcurrentKafkaListenerContainerFactory
<
String
,
String
>
kafkaListenerContainerFactory
(
)
{
public
KafkaReceiver
<
String
,
String
>
kafkaEventReceiver
(
@Value
(
"${kafka.topic.input}"
)
String
posLogTopic
)
{
ConcurrentKafkaListenerContainerFactory
<
String
,
String
>
factory
=
new
ConcurrentKafkaListenerContainerFactory
<>(
);
ReceiverOptions
<
String
,
String
>
receiverOptions
=
ReceiverOptions
.
create
(
consumerFactory
()
);
factory
.
setConsumerFactory
(
consumerFactory
()
);
receiverOptions
.
maxCommitAttempts
(
3
);
return
factory
;
return
KafkaReceiver
.
create
(
receiverOptions
.
addAssignListener
(
Collection:
:
iterator
).
subscription
(
Collections
.
singleton
(
posLogTopic
)))
;
}
}
// @Bean
// public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConsumerFactory(consumerFactory());
// return factory;
// }
}
}
backend/order-management/src/main/java/com/afp/ordermanagement/config/ManagerMongoConfig.java
deleted
100644 → 0
View file @
a275ea9d
//package com.afp.ordermanagement.config;
//
//import org.springframework.beans.factory.annotation.Qualifier;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.beans.factory.config.ConfigurableBeanFactory;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
//import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
//import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
//
//import com.mongodb.reactivestreams.client.MongoClient;
//import com.mongodb.reactivestreams.client.MongoClients;
//
//@Configuration
//@EnableReactiveMongoRepositories(basePackages = "com.afp.ordermanagement.repository")
//public class ManagerMongoConfig extends AbstractReactiveMongoConfiguration
//{
// @Value("${port}")
// private String port;
//
// @Value("${dbname}")
// private String dbName;
//
// @Override
// public MongoClient reactiveMongoClient() {
// return MongoClients.create();
// }
//
// @Override
// protected String getDatabaseName() {
// return dbName;
// }
//
// @Qualifier(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// public ReactiveMongoTemplate reactiveMongoTemplate() {
// return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
// }
//}
\ No newline at end of file
backend/order-management/src/main/java/com/afp/ordermanagement/config/OrderMongoConfig.java
deleted
100644 → 0
View file @
a275ea9d
//package com.afp.ordermanagement.config;
//
//import org.springframework.beans.factory.annotation.Qualifier;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.beans.factory.config.ConfigurableBeanFactory;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
//import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
//import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
//
//import com.mongodb.reactivestreams.client.MongoClient;
//import com.mongodb.reactivestreams.client.MongoClients;
//
//@Configuration
//@EnableReactiveMongoRepositories(basePackages = "com.afp.ordermanagement.repository")
//public class OrderMongoConfig extends AbstractReactiveMongoConfiguration
//{
// @Value("${port}")
// private String port;
//
// @Value("${dbname}")
// private String dbName;
//
// @Override
// public MongoClient reactiveMongoClient() {
// return MongoClients.create();
// }
//
// @Override
// protected String getDatabaseName() {
// return dbName;
// }
//
// @Qualifier(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// public ReactiveMongoTemplate reactiveMongoTemplate() {
// return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
// }
//}
\ No newline at end of file
backend/order-management/src/main/java/com/afp/ordermanagement/controller/KafkaController.java
deleted
100644 → 0
View file @
a275ea9d
package
com
.
afp
.
ordermanagement
.
controller
;
import
com.afp.ordermanagement.service.Producer
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.*
;
@RestController
@RequestMapping
(
value
=
"/kafka"
)
public
class
KafkaController
{
private
final
Producer
producer
;
public
KafkaController
(
Producer
producer
)
{
this
.
producer
=
producer
;
}
@PostMapping
(
value
=
"/publish/{id}"
)
public
void
sendMessageToKafkaTopic
(
@RequestParam
String
id
)
{
producer
.
sendOrderId
(
id
);
}
@PostMapping
(
value
=
"/inventory/{quantity}"
)
public
void
sendInventoryQuantity
(
@PathVariable
String
quantity
)
{
producer
.
sendInventoryQuantity
(
quantity
);
}
}
backend/order-management/src/main/java/com/afp/ordermanagement/controller/ManagerController.java
View file @
d644e306
...
@@ -2,7 +2,6 @@ package com.afp.ordermanagement.controller;
...
@@ -2,7 +2,6 @@ package com.afp.ordermanagement.controller;
import
com.afp.ordermanagement.model.Manager
;
import
com.afp.ordermanagement.model.Manager
;
import
com.afp.ordermanagement.model.Order
;
import
com.afp.ordermanagement.repository.ManagerRepository
;
import
com.afp.ordermanagement.repository.ManagerRepository
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.GetMapping
;
import
org.springframework.web.bind.annotation.GetMapping
;
...
...
backend/order-management/src/main/java/com/afp/ordermanagement/controller/OrderController.java
View file @
d644e306
package
com
.
afp
.
ordermanagement
.
controller
;
package
com
.
afp
.
ordermanagement
.
controller
;
import
com.afp.ordermanagement.model.Order
;
import
com.afp.ordermanagement.model.Order
;
import
com.afp.ordermanagement.reactivekafkaservice.Sender
;
import
com.afp.ordermanagement.service.OrderService
;
import
com.afp.ordermanagement.service.OrderService
;
import
com.afp.ordermanagement.service.Producer
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.MediaType
;
import
org.springframework.http.ResponseEntity
;
import
org.springframework.http.ResponseEntity
;
import
org.springframework.web.bind.annotation.*
;
import
org.springframework.web.bind.annotation.*
;
import
reactor.core.publisher.Flux
;
import
reactor.core.publisher.Flux
;
...
@@ -19,7 +18,7 @@ public class OrderController {
...
@@ -19,7 +18,7 @@ public class OrderController {
private
OrderService
orderService
;
private
OrderService
orderService
;
@Autowired
@Autowired
Producer
produc
er
;
Sender
send
er
;
/**
/**
...
@@ -28,7 +27,7 @@ public class OrderController {
...
@@ -28,7 +27,7 @@ public class OrderController {
*/
*/
@GetMapping
(
"/orderStatus/{orderId}"
)
@GetMapping
(
"/orderStatus/{orderId}"
)
public
void
getOrderStatusFromWarehouse
(
@PathVariable
String
orderId
)
{
public
void
getOrderStatusFromWarehouse
(
@PathVariable
String
orderId
)
{
producer
.
sendOrderId
(
orderId
);
sender
.
sendOrderIdToWarehouse
(
orderId
);
}
}
@GetMapping
(
"/orders"
)
@GetMapping
(
"/orders"
)
...
...
backend/order-management/src/main/java/com/afp/ordermanagement/reactivekafkaservice/Receiver.java
0 → 100644
View file @
d644e306
package
com
.
afp
.
ordermanagement
.
reactivekafkaservice
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.context.event.ApplicationStartedEvent
;
import
org.springframework.context.event.EventListener
;
import
org.springframework.stereotype.Service
;
import
reactor.kafka.receiver.KafkaReceiver
;
@Service
@Slf4j
public
class
Receiver
{
@Autowired
private
KafkaReceiver
<
String
,
String
>
kafkaReceiver
;
@EventListener
(
ApplicationStartedEvent
.
class
)
public
void
consumeOrderStatus
()
{
kafkaReceiver
.
receive
()
.
doOnNext
(
record
->
log
.
info
(
String
.
format
(
"##### -> Receiver receiving message: %s "
,
record
.
value
())))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
.
getMessage
()))
.
subscribe
();
}
}
backend/order-management/src/main/java/com/afp/ordermanagement/reactivekafkaservice/Sender.java
0 → 100644
View file @
d644e306
package
com
.
afp
.
ordermanagement
.
reactivekafkaservice
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
reactor.core.publisher.Flux
;
import
reactor.core.publisher.Mono
;
import
reactor.kafka.sender.KafkaSender
;
import
reactor.kafka.sender.SenderRecord
;
import
reactor.kafka.sender.SenderResult
;
@Service
@Slf4j
public
class
Sender
{
@Autowired
private
KafkaSender
<
String
,
String
>
kafkaEventProducer
;
private
static
final
String
ORDER_TOPIC
=
"orders"
;
public
void
sendOrderIdToWarehouse
(
String
id
)
{
log
.
info
(
String
.
format
(
"##### -> Sender sending message: %s "
,
id
));
ProducerRecord
<
String
,
String
>
record
=
new
ProducerRecord
<>(
ORDER_TOPIC
,
id
);
Flux
<
SenderResult
<
String
>>
working
=
kafkaEventProducer
.
send
(
Mono
.
just
(
SenderRecord
.
create
(
record
,
id
)))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
))
.
doOnNext
(
uuidSenderResult
->
{
if
(
null
!=
uuidSenderResult
.
exception
())
{
System
.
out
.
println
(
"working"
);
}
});
working
.
doOnError
(
throwable
->
log
.
error
(
"some error"
)).
subscribe
();
}
}
backend/order-management/src/main/java/com/afp/ordermanagement/service/Consumer.java
deleted
100644 → 0
View file @
a275ea9d
package
com
.
afp
.
ordermanagement
.
service
;
import
com.afp.ordermanagement.model.Order
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.kafka.annotation.KafkaListener
;
import
org.springframework.stereotype.Service
;
@Service
public
class
Consumer
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
Consumer
.
class
);
@KafkaListener
(
topics
=
"managers"
,
groupId
=
"group_id"
)
public
void
consumerManager
(
String
message
){
logger
.
info
(
String
.
format
(
"#### -> Consumed message -> %s"
,
message
));
}
@KafkaListener
(
topics
=
"orders"
)
public
void
getOrderStatusFromWarehouse
(
String
status
)
{
logger
.
info
(
String
.
format
(
"#### -> Consumed order Status: %s"
,
status
));
}
@KafkaListener
(
topics
=
"inventory"
)
public
void
getInventoryQuantityUpdate
(
String
quantity
)
{
logger
.
info
(
String
.
format
(
"#### -> Consume inventory quantity update: %s"
,
quantity
));
}
}
backend/order-management/src/main/java/com/afp/ordermanagement/service/Producer.java
deleted
100644 → 0
View file @
a275ea9d
package
com
.
afp
.
ordermanagement
.
service
;
import
io.swagger.models.auth.In
;
import
org.apache.velocity.exception.ResourceNotFoundException
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.context.config.ConfigDataResourceNotFoundException
;
import
org.springframework.stereotype.Service
;
import
org.springframework.kafka.core.KafkaTemplate
;
import
java.sql.SQLOutput
;
@Service
public
class
Producer
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
Producer
.
class
);
private
static
final
String
MANAGER_TOPIC
=
"managers"
;
private
static
final
String
ORDER_TOPIC
=
"orders"
;
private
static
final
String
INVENTORY_TOPIC
=
"inventory"
;
private
KafkaTemplate
<
String
,
String
>
kafkaTemplate
;
public
Producer
(
KafkaTemplate
<
String
,
String
>
kafkaTemplate
)
{
this
.
kafkaTemplate
=
kafkaTemplate
;
}
/**
* DESC - Sending orderId as a message to ORDER_TOPIC for warehouse to consume
* @param id
* @throws ResourceNotFoundException
* @throws IllegalArgumentException
*/
public
void
sendOrderId
(
String
id
)
{
try
{
logger
.
info
(
String
.
format
(
"#### -> Order id sent to warehouse: %s"
,
id
));
kafkaTemplate
.
send
(
ORDER_TOPIC
,
id
);
}
catch
(
ResourceNotFoundException
e
)
{
logger
.
error
(
"Order with that Id does not exist, exception caught: "
+
e
);
}
catch
(
IllegalArgumentException
e
)
{
logger
.
error
(
"Not a valid input, exception caught: "
+
e
);
}
}
public
void
sendInventoryQuantity
(
String
quantity
)
{
logger
.
info
(
String
.
format
(
"#### -> Sending inventory quantity update: %s"
,
quantity
));
kafkaTemplate
.
send
(
INVENTORY_TOPIC
,
quantity
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment