Commit 1ef7a4cd authored by Kenil Mavani's avatar Kenil Mavani

m2

parent a01783bb
......@@ -52,7 +52,12 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
<build>
<plugins>
......
package com.example.kafka;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
public class MessageStore {
private final List<String> allMessages = new ArrayList<>();
public void addMessage(String message){
allMessages.add(message);
}
public String getAllMessages(){
return allMessages.toString();
}
}
package com.example.kafka.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Data
public class KafkaProducerConfig {
@Value("${bootstrap.server}")
private String bootstrapper;
}
package com.example.kafka.consumer;
import com.example.kafka.MessageStore;
import com.example.kafka.serviceImpl.UserServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@Autowired
private MessageStore store;
@Autowired
private UserServiceImpl userService;
@KafkaListener(topics = "${app.topic.name}",groupId = "group1")
public void readMessage(String message){
userService.validateOrder(message);
store.addMessage(message);
}
}
package com.example.kafka.controller;
import com.example.kafka.entity.User;
import com.example.kafka.serviceImpl.UserServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/v1/user")
public class UserController {
@Autowired
private UserServiceImpl userService;
@PostMapping("/create")
@ResponseStatus(HttpStatus.ACCEPTED)
public Mono<User> createUser(@RequestBody User user){
return userService.createUser(user);
}
@GetMapping("/all")
public Flux<User> getAllUsers(){
return userService.getAllUsers();
}
@GetMapping("/{userId}")
public Mono<ResponseEntity<User>> getUserById(@PathVariable String userId){
Mono<User> user = userService.findById(userId);
return user.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
}
package com.example.kafka.entity;
import com.example.kafka.enumerator.OrderStatus;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@ToString
@EqualsAndHashCode(of={"id","name","amount"})
@EqualsAndHashCode(of={"id","name"})
@AllArgsConstructor
@NoArgsConstructor
@Document(value = "orders")
public class Order {
@Id
private String id;
private String name;
private double amount;
private OrderStatus orderStatus;
}
package com.example.kafka.entity;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@ToString
@EqualsAndHashCode(of = {"id","name"})
@AllArgsConstructor
@NoArgsConstructor
@Data
@Document(value = "users")
public class User {
@Id
private String id;
private String name;
private int age;
private double balance;
}
package com.example.kafka.enumerator;
public enum OrderStatus {
ACCEPTED,REJECTED;
}
package com.example.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component("/msgProducer")
public class MessageProducer {
private static final Logger log = LoggerFactory.getLogger(MessageProducer.class.getName());
@Autowired
KafkaTemplate<String, String> template;
@Value("${app.topic.name}")
private String topicName;
public void publishOrder(String order){
template.send(topicName,order);
log.info("published order {} ::",order);
}
}
package com.example.kafka.repository;
import com.example.kafka.entity.User;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface UserRepository extends ReactiveMongoRepository<User,String> {
}
package com.example.kafka.service;
import com.example.kafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface UserService {
public Mono<User> createUser(User user);
public Flux<User> getAllUsers();
public Mono<User> findById(String userId);
public Mono<User> updateUser(String userId, User user);
public void validateOrder(String message) throws JsonProcessingException;
}
package com.example.kafka.serviceImpl;
import com.example.kafka.entity.Order;
import com.example.kafka.producer.MessageProducer;
import com.example.kafka.repository.OrderRepository;
import com.example.kafka.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -12,8 +13,12 @@ public class OrderServiceImpl implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private MessageProducer messageProducer;
@Override
public Mono<Order> saveOrderIntoDB(Order order) {
messageProducer.publishOrder(order.toString());
return orderRepository.save(order);
}
}
package com.example.kafka.serviceImpl;
import com.example.kafka.entity.Order;
import com.example.kafka.entity.User;
import com.example.kafka.repository.UserRepository;
import com.example.kafka.service.UserService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Map;
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserRepository userRepository;
@Override
public Mono<User> createUser(User user){
return userRepository.save(user);
}
@Override
public Flux<User> getAllUsers(){
return userRepository.findAll();
}
@Override
public Mono<User> findById(String userId){
return userRepository.findById(userId);
}
@Override
public Mono<User> updateUser(String userId, User user){
return userRepository.findById(userId)
.flatMap(dbUser -> {
dbUser.setAge(user.getAge());
dbUser.setBalance(user.getBalance());
return userRepository.save(dbUser);
});
}
@Override
public void validateOrder(String message) {
try {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> map = mapper.readValue(message, Map.class);
} catch (Exception e) {
System.out.println(e);
}
}
}
bootstrap.server=localhost:9092
kafka.clientId=rewards-generator
spring.webflux.base-path=/api
spring.application.name=spring-webflux-kafka
......@@ -10,6 +7,8 @@ data.mongodb.database=test
server.port:9000
app.topic.name=accept_order
logging.level.io.reflectoring: DEBUG
logging.level.org.springframework.web: INFO
logging.level.org.springframework.data.mongodb.core.ReactiveMongoTemplate=DEBUG
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment