package com.example.kafka.service; import com.example.kafka.entity.Order; import com.example.kafka.entity.User; import com.example.kafka.enumerator.OrderStatus; import com.example.kafka.repository.OrderRepository; import com.example.kafka.repository.UserRepository; import com.mongodb.reactivestreams.client.MongoClients; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.ReactiveMongoOperations; import org.springframework.data.mongodb.core.ReactiveMongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** * UserServiceImpl is implementation of UserService class */ @Service public class UserServiceImpl implements UserService { @Autowired private UserRepository userRepository; @Autowired private OrderRepository orderRepository; /** * Create user into DB * @param user The user * @return The mono user */ @Override public Mono createUser(User user){ return userRepository.save(user); } /** * Get all users * @return The flux user */ @Override public Flux getAllUsers(){ return userRepository.findAll(); } /** * Find user by id * @param userId The userId * @return The mono user */ @Override public Mono findById(String userId){ return userRepository.findById(userId); } /** * Update the User details * @param userId The userId * @param user The user * @return The mono user */ @Override public Mono updateUser(String userId, User user){ return userRepository.findById(userId).flatMap(dbUser -> { if(!ObjectUtils.isEmpty(user.getAge()) && user.getAge()>0) { dbUser.setAge(user.getAge()); } if(!ObjectUtils.isEmpty(user.getBalance()) && user.getBalance()>0) { dbUser.setBalance(user.getBalance()); } if(!ObjectUtils.isEmpty(user.getDepartmentName())) { dbUser.setDepartmentName(user.getDepartmentName()); } return userRepository.save(dbUser); }); } @Override public Flux fetchUserByDepartmentName(String department) { ReactiveMongoOperations ops = new ReactiveMongoTemplate(MongoClients.create(), "test"); Criteria c = Criteria.where("departmentName").is(department); Query qry = new Query(c); return ops.find(qry, User.class); } }