package com.example.springWebFluxByDoms.service; import com.example.springWebFluxByDoms.dto.UserDto; import com.example.springWebFluxByDoms.entity.Users; import com.example.springWebFluxByDoms.utils.Apputils; import com.mongodb.client.result.UpdateResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.FindAndModifyOptions; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.web.server.ResponseStatusException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.HashMap; import java.util.List; import java.util.Map; @Service public class UserService { @Autowired private final MongoTemplate mongoTemplate; @Autowired public UserService(MongoTemplate mongoTemplate) { this.mongoTemplate = mongoTemplate; } public Mono<UserDto> saveUser(Mono<UserDto> usersMono) { return usersMono .map(Apputils::DtoToentity) .flatMap(user -> { Users panExistsValue = mongoTemplate.findOne(Query.query(Criteria.where("permanentAccountNumber").is(user.getPermanentAccountNumber())), Users.class); Users aadhaarExistsValue = mongoTemplate.findOne(Query.query(Criteria.where("adharNumber").is(user.getAdharNumber())), Users.class); if (panExistsValue != null && user.getPermanentAccountNumber().equals(panExistsValue.getPermanentAccountNumber()) || aadhaarExistsValue != null && user.getAdharNumber().equals(aadhaarExistsValue.getAdharNumber())) { // String errorMessage = panExistsValue != null ? "PAN already exists" : "Aadhaar already exists"; return Mono.error(new ResponseStatusException(HttpStatus.CONFLICT)); } else { return Mono.fromCallable(() -> mongoTemplate.insert(user)) .map(result -> Apputils.entityToDto(result)); } }); } public Flux<UserDto> getUsers(Flux<UserDto> userDtoFlux) { return userDtoFlux .flatMap(userDto -> { Criteria criteria = new Criteria(); boolean isQueryEmpty = true; if (userDto.getFirstName() != null) { criteria.and("firstName").is(userDto.getFirstName()); isQueryEmpty = false; } if (userDto.getLastName() != null) { criteria.and("lastName").is(userDto.getLastName()); isQueryEmpty = false; } if (isQueryEmpty) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "At least one query parameter is required"); } Query query = Query.query(criteria); return Flux.fromIterable(mongoTemplate.find(query, Users.class)) .map(Apputils::entityToDto); }) .switchIfEmpty(Mono.error(new ResponseStatusException(HttpStatus.NOT_FOUND, "No users found with the given criteria"))); } public Mono<UserDto> updateTheUser(Mono<UserDto> userDtoMono, Integer id) { return userDtoMono .flatMap(userDto -> { Query query = Query.query(Criteria.where("id").is(id)); Update update = new Update().set("firstName", userDto.getFirstName()); return Mono.fromCallable(() -> mongoTemplate.findAndModify(query, update, FindAndModifyOptions.options().returnNew(true), Users.class)) .flatMap(updatedUser -> { if (updatedUser != null) { return Mono.just(Apputils.entityToDto(updatedUser)); } else { throw new ResponseStatusException(HttpStatus.NOT_FOUND, "User not found"); } }); }); } public Mono<Void> deleteTheUser(Integer id) { return Mono.fromCallable(() -> { Query query = Query.query(Criteria.where("id").is(id)); return mongoTemplate.remove(query, Users.class); }).then(); } }