Commit 489d44c2 authored by Kenil Mavani's avatar Kenil Mavani

updating logic

parent 1ef7a4cd
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.6/apache-maven-3.8.6-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar
......@@ -30,6 +30,10 @@
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
......
package com.example.kafka.consumer;
import com.example.kafka.MessageStore;
import com.example.kafka.serviceImpl.UserServiceImpl;
import com.example.kafka.service.OrderServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
......@@ -9,15 +8,11 @@ import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@Autowired
private MessageStore store;
@Autowired
private UserServiceImpl userService;
private OrderServiceImpl orderService;
@KafkaListener(topics = "${app.topic.name}",groupId = "group1")
public void readMessage(String message){
userService.validateOrder(message);
store.addMessage(message);
public void readMessage(String order){
orderService.validateOrder(order);
}
}
package com.example.kafka.controller;
import com.example.kafka.entity.Order;
import com.example.kafka.serviceImpl.OrderServiceImpl;
import com.example.kafka.service.OrderServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
......
package com.example.kafka.controller;
import com.example.kafka.entity.User;
import com.example.kafka.serviceImpl.UserServiceImpl;
import com.example.kafka.service.UserServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
......
package com.example.kafka.entity;
import com.example.kafka.enumerator.OrderStatus;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
......@@ -10,14 +12,19 @@ import org.springframework.data.mongodb.core.mapping.Document;
@EqualsAndHashCode(of={"id","name"})
@AllArgsConstructor
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@Document(value = "orders")
public class Order {
@Id
@JsonProperty(value = "id")
private String id;
@JsonProperty(value = "name")
private String name;
@JsonProperty(value = "amount")
private double amount;
@JsonProperty(value = "orderStatus")
private OrderStatus orderStatus;
}
package com.example.kafka.producer;
import com.example.kafka.entity.Order;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......
package com.example.kafka.service;
import com.example.kafka.entity.Order;
import com.fasterxml.jackson.core.JsonProcessingException;
import reactor.core.publisher.Mono;
public interface OrderService {
Mono<Order> saveOrderIntoDB(Order order);
Mono<Order> saveOrderIntoDB(Order order) throws JsonProcessingException;
Mono<Order> findById(String userId);
void validateOrder(String order);
}
package com.example.kafka.service;
import com.example.kafka.entity.Order;
import com.example.kafka.enumerator.OrderStatus;
import com.example.kafka.producer.MessageProducer;
import com.example.kafka.repository.OrderRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Mono;
import java.util.Map;
@Service
public class OrderServiceImpl implements OrderService {
private static final Logger log = LoggerFactory.getLogger(OrderServiceImpl.class.getName());
@Autowired
private OrderRepository orderRepository;
@Autowired
private UserServiceImpl userService;
@Autowired
private MessageProducer messageProducer;
@Override
public Mono<Order> saveOrderIntoDB(Order order){
try {
ObjectMapper objectMapper = new ObjectMapper();
messageProducer.publishOrder(objectMapper.writeValueAsString(order));
} catch(JsonProcessingException e) {
log.error("Caught Exception {}:: ",e);
}
return orderRepository.save(order);
}
@Override
public Mono<Order> findById(String orderId){
return orderRepository.findById(orderId);
}
@Override
public void validateOrder(String orderJson) {
try {
ObjectMapper mapper = new ObjectMapper();
Order order = mapper.readValue(orderJson,Order.class);
double amount = order.getAmount();
double balance = userService.findById(order.getId()).block().getBalance();
OrderStatus orderStatus;
if(amount>balance) {
orderStatus=OrderStatus.REJECTED;
} else {
orderStatus=OrderStatus.ACCEPTED;
}
Order dbOrder = orderRepository.findById(order.getId()).block();
if(dbOrder!=null)
{
dbOrder.setOrderStatus(orderStatus);
}
orderRepository.save(dbOrder);
} catch(JsonProcessingException e) {
log.error("Caught Exception {}:: ",e);
}
}
}
package com.example.kafka.service;
import com.example.kafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface UserService {
public Mono<User> createUser(User user);
Mono<User> createUser(User user);
public Flux<User> getAllUsers();
Flux<User> getAllUsers();
public Mono<User> findById(String userId);
Mono<User> findById(String userId);
public Mono<User> updateUser(String userId, User user);
public void validateOrder(String message) throws JsonProcessingException;
Mono<User> updateUser(String userId, User user);
}
package com.example.kafka.serviceImpl;
package com.example.kafka.service;
import com.example.kafka.entity.Order;
import com.example.kafka.entity.User;
import com.example.kafka.repository.UserRepository;
import com.example.kafka.service.UserService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Map;
@Service
public class UserServiceImpl implements UserService {
......@@ -43,15 +37,4 @@ public class UserServiceImpl implements UserService {
return userRepository.save(dbUser);
});
}
@Override
public void validateOrder(String message) {
try {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> map = mapper.readValue(message, Map.class);
} catch (Exception e) {
System.out.println(e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment