Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
S
Spring webflux kafka
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Kenil Mavani
Spring webflux kafka
Commits
db8e0602
Commit
db8e0602
authored
Feb 10, 2023
by
Kenil Mavani
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
updating the project with comments
parent
502d14a4
Changes
14
Hide whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
160 additions
and
47 deletions
+160
-47
MessageConsumer.java
...main/java/com/example/kafka/consumer/MessageConsumer.java
+9
-4
OrderController.java
...in/java/com/example/kafka/controller/OrderController.java
+8
-0
UserController.java
...ain/java/com/example/kafka/controller/UserController.java
+25
-0
Order.java
src/main/java/com/example/kafka/entity/Order.java
+17
-0
User.java
src/main/java/com/example/kafka/entity/User.java
+18
-0
OrderStatus.java
src/main/java/com/example/kafka/enumerator/OrderStatus.java
+3
-0
MessageProducer.java
...main/java/com/example/kafka/producer/MessageProducer.java
+7
-1
OrderRepository.java
...in/java/com/example/kafka/repository/OrderRepository.java
+3
-2
UserRepository.java
...ain/java/com/example/kafka/repository/UserRepository.java
+3
-0
OrderService.java
src/main/java/com/example/kafka/service/OrderService.java
+5
-3
OrderServiceImpl.java
...main/java/com/example/kafka/service/OrderServiceImpl.java
+24
-33
UserService.java
src/main/java/com/example/kafka/service/UserService.java
+5
-1
UserServiceImpl.java
src/main/java/com/example/kafka/service/UserServiceImpl.java
+33
-3
UserController.class
...classes/com/example/kafka/controller/UserController.class
+0
-0
No files found.
src/main/java/com/example/kafka/consumer/MessageConsumer.java
View file @
db8e0602
package
com
.
example
.
kafka
.
consumer
;
import
com.example.kafka.service.OrderServiceImpl
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.kafka.annotation.KafkaListener
;
import
org.springframework.stereotype.Component
;
/**
* Message consumer class :: to consume messages from kafka topics
*/
@Component
public
class
MessageConsumer
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
MessageConsumer
.
class
.
getName
());
@Autowired
private
OrderServiceImpl
orderService
;
/**
* read message method define to consume messages from kafka
* @param order the message
*/
@KafkaListener
(
topics
=
"${app.topic.name}"
,
groupId
=
"group1"
)
public
void
readMessage
(
String
order
){
orderService
.
validateOrder
(
order
)
.
doOnNext
(
log
.
info
(
"order has been validated"
))
;
orderService
.
validateOrder
(
order
);
}
}
src/main/java/com/example/kafka/controller/OrderController.java
View file @
db8e0602
...
...
@@ -7,6 +7,9 @@ import org.springframework.http.HttpStatus;
import
org.springframework.web.bind.annotation.*
;
import
reactor.core.publisher.Mono
;
/**
* Order controller for order related apis
*/
@RestController
@RequestMapping
(
"/v1/order"
)
public
class
OrderController
{
...
...
@@ -14,6 +17,11 @@ public class OrderController {
@Autowired
private
OrderServiceImpl
orderService
;
/**
* Save order api for creating new order
* @param order The order
* @return The mono object of Order
*/
@PostMapping
(
"/save"
)
@ResponseStatus
(
HttpStatus
.
ACCEPTED
)
public
Mono
<
Order
>
saveOrderDetails
(
@RequestBody
Order
order
){
...
...
src/main/java/com/example/kafka/controller/UserController.java
View file @
db8e0602
...
...
@@ -9,6 +9,10 @@ import org.springframework.web.bind.annotation.*;
import
reactor.core.publisher.Flux
;
import
reactor.core.publisher.Mono
;
/**
* User controller for CRUD operation of user activities
*/
@RestController
@RequestMapping
(
"/v1/user"
)
public
class
UserController
{
...
...
@@ -16,16 +20,31 @@ public class UserController {
@Autowired
private
UserServiceImpl
userService
;
/**
* Create user
* @param user The user
* @return The mono of User
*/
@PostMapping
(
"/create"
)
@ResponseStatus
(
HttpStatus
.
ACCEPTED
)
public
Mono
<
User
>
createUser
(
@RequestBody
User
user
){
return
userService
.
createUser
(
user
);
}
/**
* Display all users data
* @return The Flux of user
*/
@GetMapping
(
"/all"
)
public
Flux
<
User
>
getAllUsers
(){
return
userService
.
getAllUsers
();
}
/**
* Fetch particular user by user id
* @param userId The userId
* @return ResponseEntity of user
*/
@GetMapping
(
"/{userId}"
)
public
Mono
<
ResponseEntity
<
User
>>
getUserById
(
@PathVariable
String
userId
){
Mono
<
User
>
user
=
userService
.
findById
(
userId
);
...
...
@@ -33,6 +52,12 @@ public class UserController {
.
defaultIfEmpty
(
ResponseEntity
.
notFound
().
build
());
}
/**
* Update the user data
* @param userId The userId
* @param user The user data
* @return ResponseEntity of user
*/
@PutMapping
(
"/{userId}"
)
public
Mono
<
ResponseEntity
<
User
>>
updateUserById
(
@PathVariable
String
userId
,
@RequestBody
User
user
){
return
userService
.
updateUser
(
userId
,
user
)
...
...
src/main/java/com/example/kafka/entity/Order.java
View file @
db8e0602
...
...
@@ -7,6 +7,10 @@ import lombok.*;
import
org.springframework.data.annotation.Id
;
import
org.springframework.data.mongodb.core.mapping.Document
;
/**
* Order entity class which has define all properties for Order class
*/
@Data
@ToString
@EqualsAndHashCode
(
of
={
"id"
,
"name"
})
...
...
@@ -15,16 +19,29 @@ import org.springframework.data.mongodb.core.mapping.Document;
@JsonInclude
(
JsonInclude
.
Include
.
NON_NULL
)
@Document
(
value
=
"orders"
)
public
class
Order
{
/**
* Order id
*/
@Id
@JsonProperty
(
value
=
"id"
)
private
String
id
;
/**
* Order's name
*/
@JsonProperty
(
value
=
"name"
)
private
String
name
;
/**
* Order amount
*/
@JsonProperty
(
value
=
"amount"
)
private
double
amount
;
/**
* Order status
*/
@JsonProperty
(
value
=
"orderStatus"
)
private
OrderStatus
orderStatus
;
}
src/main/java/com/example/kafka/entity/User.java
View file @
db8e0602
...
...
@@ -4,6 +4,9 @@ import lombok.*;
import
org.springframework.data.annotation.Id
;
import
org.springframework.data.mongodb.core.mapping.Document
;
/**
* User entity class model for user details
*/
@ToString
@EqualsAndHashCode
(
of
=
{
"id"
,
"name"
})
@AllArgsConstructor
...
...
@@ -12,9 +15,24 @@ import org.springframework.data.mongodb.core.mapping.Document;
@Document
(
value
=
"users"
)
public
class
User
{
/**
* User id
*/
@Id
private
String
id
;
/**
* User name
*/
private
String
name
;
/**
* User's age
*/
private
int
age
;
/**
* User's balance
*/
private
double
balance
;
}
src/main/java/com/example/kafka/enumerator/OrderStatus.java
View file @
db8e0602
package
com
.
example
.
kafka
.
enumerator
;
/**
* Enumerator for Order Status
*/
public
enum
OrderStatus
{
ACCEPTED
,
REJECTED
;
}
src/main/java/com/example/kafka/producer/MessageProducer.java
View file @
db8e0602
package
com
.
example
.
kafka
.
producer
;
import
com.example.kafka.entity.Order
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -8,6 +7,9 @@ import org.springframework.beans.factory.annotation.Value;
import
org.springframework.kafka.core.KafkaTemplate
;
import
org.springframework.stereotype.Component
;
/**
* Message producer class to produce messages into kafka topic
*/
@Component
(
"/msgProducer"
)
public
class
MessageProducer
{
...
...
@@ -19,6 +21,10 @@ public class MessageProducer {
@Value
(
"${app.topic.name}"
)
private
String
topicName
;
/**
* Publish the order to kafka topic
* @param order The order
*/
public
void
publishOrder
(
String
order
){
template
.
send
(
topicName
,
order
);
log
.
info
(
"published order {} ::"
,
order
);
...
...
src/main/java/com/example/kafka/repository/OrderRepository.java
View file @
db8e0602
...
...
@@ -2,8 +2,9 @@ package com.example.kafka.repository;
import
com.example.kafka.entity.Order
;
import
org.springframework.data.mongodb.repository.ReactiveMongoRepository
;
import
org.springframework.stereotype.Repository
;
@Repository
/**
* Order repository to perform DB actions on order
*/
public
interface
OrderRepository
extends
ReactiveMongoRepository
<
Order
,
String
>
{
}
src/main/java/com/example/kafka/repository/UserRepository.java
View file @
db8e0602
...
...
@@ -3,5 +3,8 @@ package com.example.kafka.repository;
import
com.example.kafka.entity.User
;
import
org.springframework.data.mongodb.repository.ReactiveMongoRepository
;
/**
* User repository to perform DB actions on User
*/
public
interface
UserRepository
extends
ReactiveMongoRepository
<
User
,
String
>
{
}
src/main/java/com/example/kafka/service/OrderService.java
View file @
db8e0602
...
...
@@ -4,10 +4,12 @@ import com.example.kafka.entity.Order;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
reactor.core.publisher.Mono
;
/**
* Order service
*/
public
interface
OrderService
{
Mono
<
Order
>
saveOrderIntoDB
(
Order
order
)
throws
JsonProcessingException
;
Mono
<
Order
>
findById
(
String
userId
)
;
Mono
<
Order
>
saveOrderIntoDB
(
Order
order
)
throws
JsonProcessingException
;
Mono
<
Order
>
validateOrder
(
String
order
);
void
validateOrder
(
String
order
);
}
src/main/java/com/example/kafka/service/OrderServiceImpl.java
View file @
db8e0602
...
...
@@ -10,11 +10,11 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.util.ObjectUtils
;
import
reactor.core.publisher.Mono
;
import
java.util.Map
;
/**
* OrderServiceImpl is implementation of OrderService class
*/
@Service
public
class
OrderServiceImpl
implements
OrderService
{
...
...
@@ -28,6 +28,12 @@ public class OrderServiceImpl implements OrderService {
@Autowired
private
MessageProducer
messageProducer
;
/**
* Save order data into DB
* @param order The order
* @return The mono of order
*/
@Override
public
Mono
<
Order
>
saveOrderIntoDB
(
Order
order
){
try
{
...
...
@@ -39,37 +45,22 @@ public class OrderServiceImpl implements OrderService {
return
orderRepository
.
save
(
order
);
}
@Override
public
Mono
<
Order
>
findById
(
String
orderId
){
return
orderRepository
.
findById
(
orderId
);
}
/**
* Validate the order to either accept or reject
* @param orderJson The order json
*/
@Override
public
Mono
<
Order
>
validateOrder
(
String
orderJson
)
{
// try {
return
orderRepository
.
findById
(
"1"
)
.
flatMap
(
dbOrder
->
{
dbOrder
.
setOrderStatus
(
OrderStatus
.
REJECTED
);
return
orderRepository
.
save
(
dbOrder
);
});
// ObjectMapper mapper = new ObjectMapper();
// Order order = mapper.readValue(orderJson,Order.class);
// double amount = order.getAmount();
// double balance = userService.findById(order.getId()).block().getBalance();
// OrderStatus orderStatus;
// if(amount>balance) {
// orderStatus=OrderStatus.REJECTED;
// } else {
// orderStatus=OrderStatus.ACCEPTED;
// }
// Order dbOrder = orderRepository.findById(order.getId()).block();
// if(dbOrder!=null)
// {
// dbOrder.setOrderStatus(orderStatus);
// }
// orderRepository.save(dbOrder);
// } catch(JsonProcessingException e) {
// log.error("Caught Exception {}:: ",e);
// }
public
void
validateOrder
(
String
orderJson
)
{
try
{
ObjectMapper
mapper
=
new
ObjectMapper
();
Order
order
=
mapper
.
readValue
(
orderJson
,
Order
.
class
);
userService
.
findById
(
order
.
getId
()).
flatMap
(
user
->
{
order
.
setOrderStatus
(
order
.
getAmount
()<=
user
.
getBalance
()
?
OrderStatus
.
ACCEPTED
:
OrderStatus
.
REJECTED
);
return
orderRepository
.
save
(
order
);
}).
subscribe
();
}
catch
(
JsonProcessingException
e
)
{
log
.
error
(
"Caught Exception :: {} "
,
e
);
}
}
}
src/main/java/com/example/kafka/service/UserService.java
View file @
db8e0602
package
com
.
example
.
kafka
.
service
;
import
com.example.kafka.entity.Order
;
import
com.example.kafka.entity.User
;
import
reactor.core.publisher.Flux
;
import
reactor.core.publisher.Mono
;
/**
* UserService
*/
public
interface
UserService
{
Mono
<
User
>
createUser
(
User
user
);
...
...
@@ -12,5 +16,5 @@ public interface UserService {
Mono
<
User
>
findById
(
String
userId
);
Mono
<
User
>
updateUser
(
String
userId
,
User
user
);
Mono
<
User
>
updateUser
(
String
userId
,
User
user
);
}
src/main/java/com/example/kafka/service/UserServiceImpl.java
View file @
db8e0602
package
com
.
example
.
kafka
.
service
;
import
com.example.kafka.entity.Order
;
import
com.example.kafka.entity.User
;
import
com.example.kafka.enumerator.OrderStatus
;
import
com.example.kafka.repository.OrderRepository
;
import
com.example.kafka.repository.UserRepository
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
reactor.core.publisher.Flux
;
import
reactor.core.publisher.Mono
;
/**
* UserServiceImpl is implementation of UserService class
*/
@Service
public
class
UserServiceImpl
implements
UserService
{
@Autowired
private
UserRepository
userRepository
;
@Autowired
private
OrderRepository
orderRepository
;
/**
* Create user into DB
* @param user The user
* @return The mono user
*/
@Override
public
Mono
<
User
>
createUser
(
User
user
){
return
userRepository
.
save
(
user
);
}
/**
* Get all users
* @return The flux user
*/
@Override
public
Flux
<
User
>
getAllUsers
(){
return
userRepository
.
findAll
();
}
/**
* Find user by id
* @param userId The userId
* @return The mono user
*/
@Override
public
Mono
<
User
>
findById
(
String
userId
){
return
userRepository
.
findById
(
userId
);
}
/**
* Update the User details
* @param userId The userId
* @param user The user
* @return The mono user
*/
@Override
public
Mono
<
User
>
updateUser
(
String
userId
,
User
user
){
return
userRepository
.
findById
(
userId
)
.
flatMap
(
dbUser
->
{
public
Mono
<
User
>
updateUser
(
String
userId
,
User
user
){
return
userRepository
.
findById
(
userId
).
flatMap
(
dbUser
->
{
dbUser
.
setAge
(
user
.
getAge
());
dbUser
.
setBalance
(
user
.
getBalance
());
return
userRepository
.
save
(
dbUser
);
...
...
target/classes/com/example/kafka/controller/UserController.class
View file @
db8e0602
No preview for this file type
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment