Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
O
order-management-backend
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Ascend
order-management-backend
Commits
ddd01603
Commit
ddd01603
authored
May 10, 2021
by
Shanelle Valencia
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[AFP-53] Add readme instructions for consumer and producer [
@svalencia
]
parent
60f67a68
Pipeline
#1691
failed with stage
in 38 seconds
Changes
5
Pipelines
1
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
131 additions
and
0 deletions
+131
-0
.DS_Store
.DS_Store
+0
-0
README.md
README.md
+37
-0
Order.java
src/main/java/com/afp/ordermanagement/model/Order.java
+5
-0
Receiver.java
...om/afp/ordermanagement/reactivekafkaservice/Receiver.java
+66
-0
Sender.java
.../com/afp/ordermanagement/reactivekafkaservice/Sender.java
+23
-0
No files found.
.DS_Store
View file @
ddd01603
No preview for this file type
README.md
View file @
ddd01603
...
@@ -58,3 +58,40 @@ Example Console Response:
...
@@ -58,3 +58,40 @@ Example Console Response:
2021-05-05 17:13:29.932 INFO 82715 --- [ntainer#0-0-C-1] c.afp.ordermanagement.service.Consumer : #### -> Consumed message -> somethinghere
2021-05-05 17:13:29.932 INFO 82715 --- [ntainer#0-0-C-1] c.afp.ordermanagement.service.Consumer : #### -> Consumed message -> somethinghere
```
```
## Testing Kafka Producer and Consumer
Make sure kafka server and zookeeper are running
To create a kafka topic:
```
kafka-topics.sh --describe --topic <insert-topic-here> --bootstrap-server localhost:9092
```
if the above command doesn't work, try:
```
/usr/local/Cellar/kafka/2.8.0/bin/kafka-topics --describe --topic orders --bootstrap-server localhost:9092
```
To start kafka console for producer:
```
kafka-console-producer.sh --topic orders --bootstrap-server localhost:9092
```
-OR-
```
/usr/local/Cellar/kafka/2.8.0/bin/kafka-console-producer --topic orders --bootstrap-server localhost:9092
```
To start kafka console for consumer:
```
kafka-console-consumer.sh --topic orders --from-beginning --bootstrap-server localhost:9092
```
-OR-
```
/usr/local/Cellar/kafka/2.8.0/bin/kafka-console-consumer --topic orders --from-beginning --bootstrap-server localhost:9092
```
(--from-beginning flag will show all messages that were received for this particular topic)
if you're still getting errors finding the pathway for kafka, try running:
```
ls -alrth /usr/local/Cellar/kafka/
```
then find your way to the bin directory where you'll find all the kafka commands
\ No newline at end of file
src/main/java/com/afp/ordermanagement/model/Order.java
View file @
ddd01603
...
@@ -32,4 +32,9 @@ public class Order {
...
@@ -32,4 +32,9 @@ public class Order {
public
Order
(){
public
Order
(){
}
}
public
Order
(
OrderStatus
status
)
{
this
.
orderStatus
=
status
;
}
}
}
src/main/java/com/afp/ordermanagement/reactivekafkaservice/Receiver.java
View file @
ddd01603
package
com
.
afp
.
ordermanagement
.
reactivekafkaservice
;
package
com
.
afp
.
ordermanagement
.
reactivekafkaservice
;
import
com.afp.ordermanagement.model.Order
;
import
com.afp.ordermanagement.model.OrderStatus
;
import
com.afp.ordermanagement.service.OrderService
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.autoconfigure.AutoConfigurationPackage
;
import
org.springframework.boot.context.event.ApplicationStartedEvent
;
import
org.springframework.boot.context.event.ApplicationStartedEvent
;
import
org.springframework.context.event.EventListener
;
import
org.springframework.context.event.EventListener
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
reactor.core.publisher.Flux
;
import
reactor.core.publisher.Mono
;
import
reactor.kafka.receiver.KafkaReceiver
;
import
reactor.kafka.receiver.KafkaReceiver
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Locale
;
import
java.util.Map
;
@Service
@Service
@Slf4j
@Slf4j
public
class
Receiver
{
public
class
Receiver
{
...
@@ -14,11 +26,65 @@ public class Receiver {
...
@@ -14,11 +26,65 @@ public class Receiver {
@Autowired
@Autowired
private
KafkaReceiver
<
String
,
String
>
kafkaReceiver
;
private
KafkaReceiver
<
String
,
String
>
kafkaReceiver
;
@Autowired
private
OrderService
orderService
;
@EventListener
(
ApplicationStartedEvent
.
class
)
@EventListener
(
ApplicationStartedEvent
.
class
)
public
void
consumeOrderStatus
()
{
public
void
consumeOrderStatus
()
{
kafkaReceiver
.
receive
()
kafkaReceiver
.
receive
()
.
doOnNext
(
record
->
log
.
info
(
String
.
format
(
"##### -> Receiver receiving message: %s "
,
record
.
value
())))
.
doOnNext
(
record
->
log
.
info
(
String
.
format
(
"##### -> Receiver receiving message: %s "
,
record
.
value
())))
.
doOnNext
(
record
->
log
.
info
(
"record.value(): {} "
,
record
.
value
()))
.
doOnNext
(
record
->
onOrderStatusReceived
(
record
.
value
()))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
.
getMessage
()))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
.
getMessage
()))
.
subscribe
();
.
subscribe
();
}
}
private
void
onOrderStatusReceived
(
String
orderStatusStr
)
{
try
{
//deserialize string into java object using ObjectMapper
ObjectMapper
objectMapper
=
new
ObjectMapper
();
Map
<
String
,
String
>
orderStatus
=
objectMapper
.
readValue
(
orderStatusStr
,
Map
.
class
);
String
id
=
orderStatus
.
get
(
"id"
);
String
status
=
orderStatus
.
get
(
"orderStatus"
).
toUpperCase
(
Locale
.
ROOT
);
updateOrderStatus
(
id
,
status
);
log
.
info
(
"orderStatus: {}"
,
orderStatus
);
}
catch
(
Exception
e
)
{
log
.
error
(
"Caught error"
,
e
);
}
}
private
void
updateOrderStatus
(
String
orderId
,
String
orderStatus
)
{
if
(
checkExistingOrder
(
orderId
))
{
log
.
info
(
"Updating {} with status {}"
,
orderId
,
orderStatus
);
Order
newOrder
=
new
Order
(
OrderStatus
.
valueOf
(
orderStatus
));
Mono
<
Order
>
updateOrder
=
orderService
.
updateOrderByOrderId
(
orderId
,
newOrder
);
updateOrder
.
subscribe
();
// updateOrder.block(); //subscribe vs block?
}
}
private
boolean
checkExistingOrder
(
String
orderId
)
{
Flux
<
Order
>
orders
=
orderService
.
getAllOrders
();
List
<
Order
>
orderList
=
orders
.
collectList
().
block
();
Order
res
=
orderList
.
stream
()
.
filter
(
order
->
orderId
.
equals
(
order
.
getId
()))
.
findAny
()
.
orElse
(
null
);
if
(
res
==
null
)
{
log
.
error
(
"Order {} not found"
,
orderId
);
return
false
;
}
log
.
info
(
"Order exists on the database"
);
return
true
;
}
}
}
src/main/java/com/afp/ordermanagement/reactivekafkaservice/Sender.java
View file @
ddd01603
package
com
.
afp
.
ordermanagement
.
reactivekafkaservice
;
package
com
.
afp
.
ordermanagement
.
reactivekafkaservice
;
import
com.afp.ordermanagement.service.OrderService
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.clients.producer.RecordMetadata
;
import
org.reactivestreams.Publisher
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
reactor.core.publisher.Flux
;
import
reactor.core.publisher.Flux
;
import
reactor.core.publisher.Mono
;
import
reactor.core.publisher.Mono
;
import
reactor.kafka.sender.KafkaSender
;
import
reactor.kafka.sender.KafkaSender
;
import
reactor.kafka.sender.SenderOptions
;
import
reactor.kafka.sender.SenderRecord
;
import
reactor.kafka.sender.SenderRecord
;
import
reactor.kafka.sender.SenderResult
;
import
reactor.kafka.sender.SenderResult
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.concurrent.CountDownLatch
;
@Service
@Service
@Slf4j
@Slf4j
public
class
Sender
{
public
class
Sender
{
@Autowired
@Autowired
private
KafkaSender
<
String
,
String
>
kafkaEventProducer
;
private
KafkaSender
<
String
,
String
>
kafkaEventProducer
;
private
static
final
String
ORDER_TOPIC
=
"orders"
;
private
static
final
String
ORDER_TOPIC
=
"orders"
;
public
void
sendOrderIdToWarehouse
(
String
id
)
{
public
void
sendOrderIdToWarehouse
(
String
id
)
{
log
.
info
(
String
.
format
(
"##### -> Sender sending message: %s "
,
id
));
log
.
info
(
String
.
format
(
"##### -> Sender sending message: %s "
,
id
));
// sendMessage(ORDER_TOPIC, id, id);
ProducerRecord
<
String
,
String
>
record
=
new
ProducerRecord
<>(
ORDER_TOPIC
,
id
);
ProducerRecord
<
String
,
String
>
record
=
new
ProducerRecord
<>(
ORDER_TOPIC
,
id
);
Flux
<
SenderResult
<
String
>>
working
=
kafkaEventProducer
.
send
(
Mono
.
just
(
SenderRecord
.
create
(
record
,
id
)))
Flux
<
SenderResult
<
String
>>
working
=
kafkaEventProducer
.
send
(
Mono
.
just
(
SenderRecord
.
create
(
record
,
id
)))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
))
.
doOnError
(
throwable
->
System
.
out
.
println
(
throwable
))
...
@@ -33,4 +47,13 @@ public class Sender {
...
@@ -33,4 +47,13 @@ public class Sender {
});
});
working
.
doOnError
(
throwable
->
log
.
error
(
"some error"
)).
subscribe
();
working
.
doOnError
(
throwable
->
log
.
error
(
"some error"
)).
subscribe
();
}
}
public
void
sendUpdatedStatus
(
String
id
,
String
status
)
{
log
.
info
(
String
.
format
(
"Sender sending updated status for ordernumber: %s"
,
id
));
ProducerRecord
<
String
,
String
>
stat
=
new
ProducerRecord
<>(
ORDER_TOPIC
,
status
);
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment