Commit 940b786c authored by Sarika Sama's avatar Sarika Sama

integrating kafka

parent 19c74fb6
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<name>webflux-mongodb-order-management</name> <name>webflux-mongodb-order-management</name>
<description>Demo project for Spring Boot</description> <description>Demo project for Spring Boot</description>
<properties> <properties>
<java.version>11</java.version> <java.version>1.8</java.version>
</properties> </properties>
<dependencies> <dependencies>
<dependency> <dependency>
...@@ -51,6 +51,29 @@ ...@@ -51,6 +51,29 @@
<artifactId>springdoc-openapi-webflux-ui</artifactId> <artifactId>springdoc-openapi-webflux-ui</artifactId>
<version>1.6.14</version> <version>1.6.14</version>
</dependency> </dependency>
<!-- <dependency>-->
<!-- <groupId>org.springdoc</groupId>-->
<!-- <artifactId>springdoc-openapi-starter-webflux-ui</artifactId>-->
<!-- <version>1.6.14</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.springdoc</groupId> <groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-webflux-core</artifactId> <artifactId>springdoc-openapi-webflux-core</artifactId>
...@@ -63,9 +86,21 @@ ...@@ -63,9 +86,21 @@
<dependency> <dependency>
<groupId>de.flapdoodle.embed</groupId> <groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId> <artifactId>de.flapdoodle.embed.mongo</artifactId>
<version>2.2.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.1</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId> <artifactId>spring-boot-devtools</artifactId>
...@@ -107,8 +142,8 @@ ...@@ -107,8 +142,8 @@
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>9</source> <source>1.8</source>
<target>9</target> <target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>
......
package com.nisum.webfluxmongodbordermanagement.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFatory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "orders_group");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(config);
log.info("consumer config is ready {}", consumerFactory.getConfigurationProperties());
return consumerFactory;
}
@Bean(name = "consumer_factory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFatory());
concurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false);
return concurrentKafkaListenerContainerFactory;
}
}
\ No newline at end of file
package com.nisum.webfluxmongodbordermanagement.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaListeners {
@Value("${order_topic}")
private String topicName;
@KafkaListener(topics = "${order_topic}",
groupId = "orders_group",
containerFactory="consumer_factory")
public void consume(ConsumerRecord<String, String> record){
log.info("record consumed to topic: {}, offset {}, partitionkey {}, message {}",
record.topic(), record.offset(), record.key(), record.value());
}
}
package com.nisum.webfluxmongodbordermanagement.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String partitionKey, String message) {
kafkaTemplate.send(topic, partitionKey, message)
.addCallback(result -> {
RecordMetadata record = result.getRecordMetadata();
log.info("record sent to topic {}, partition {}, offset {}, partitionkey {}, record {}",
record.topic(), record.partition(), record.offset(), partitionKey, message);
},
ex -> log.error("failed to send message to topic {}, partitionkey {}, message {}",
topic, partitionKey, message)
);
}
}
package com.nisum.webfluxmongodbordermanagement.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class KafkaProducerConfig {
@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<String, String>(config);
log.info("producer config is ready {}",producerFactory.getConfigurationProperties());
return producerFactory;
}
@Bean
KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
\ No newline at end of file
...@@ -3,7 +3,7 @@ package com.nisum.webfluxmongodbordermanagement.controller; ...@@ -3,7 +3,7 @@ package com.nisum.webfluxmongodbordermanagement.controller;
import com.nisum.webfluxmongodbordermanagement.entity.Order; import com.nisum.webfluxmongodbordermanagement.entity.Order;
import com.nisum.webfluxmongodbordermanagement.service.OrdersService; import com.nisum.webfluxmongodbordermanagement.service.OrdersService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
...@@ -13,9 +13,10 @@ public class OrdersController { ...@@ -13,9 +13,10 @@ public class OrdersController {
@Autowired @Autowired
private OrdersService ordersService; private OrdersService ordersService;
@GetMapping("/orders") @PostMapping("/publish-orders")
public Flux<Order> getOrders(){ public Flux<Order> publishToKafka() {
return ordersService.getOrders(); Flux<Order> orders = ordersService.fetchOrdersAndPublish();
return orders;
} }
} }
...@@ -27,16 +27,3 @@ public class UserController { ...@@ -27,16 +27,3 @@ public class UserController {
//
//@PostMapping("/userDetails")
// public Mono<User> saveUser(@RequestBody User user){
// return userService.addUser(user);
// }
// @GetMapping("/{userId}")
// public Mono<ResponseEntity<User>> getUserWithOrders(@PathVariable String userId) {
// return userService.getUserWithOrders(userId)
// .map(user -> ResponseEntity.ok(user))
// .defaultIfEmpty(ResponseEntity.notFound().build());
// }
\ No newline at end of file
package com.nisum.webfluxmongodbordermanagement.entity; package com.nisum.webfluxmongodbordermanagement.entity;
import lombok.*; import lombok.Data;
import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
@Data @Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Document(collection = "orders") @Document(collection = "orders")
public class Order { public class Order {
@Id @Id
private String _id; private String _id;
private String orderId; private String orderId;
private String userId; private String userId;
private String transactionId; private String transactionId;
private String trackingId; private String trackingId;
private String status; private String status;
} }
package com.nisum.webfluxmongodbordermanagement.service; package com.nisum.webfluxmongodbordermanagement.service;
import com.nisum.webfluxmongodbordermanagement.config.KafkaProducer;
import com.nisum.webfluxmongodbordermanagement.entity.Order; import com.nisum.webfluxmongodbordermanagement.entity.Order;
import com.nisum.webfluxmongodbordermanagement.repository.OrderRepository; import com.nisum.webfluxmongodbordermanagement.repository.OrderRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.List;
@Service @Service
@Slf4j
public class OrdersService { public class OrdersService {
@Autowired @Autowired
private OrderRepository orderRepository; private OrderRepository orderRepository;
public Flux<Order> getOrders(){ @Autowired
return orderRepository.findAll(); private KafkaProducer kafkaProducer;
@Value("${order_topic}")
private String topicName;
public Flux<Order> fetchOrdersAndPublish(){
Flux<Order> orders = orderRepository.findAll();
Mono<List<Order>> ordersMono = orders.collectList()
.subscribeOn(Schedulers.boundedElastic());
ordersMono.subscribe(
resultList -> {
log.info("sending orders to kafka");
for (Order order : resultList) {
kafkaProducer.sendMessage( topicName, "order-"+order.getOrderId(), order.toString());
}
},
throwable -> log.error("Error occurred: {}", throwable.getMessage())
);
return orders;
} }
} }
#server.port=9090
#spring.data.mongodb.user-orders-db.uri=mongodb://localhost:27017/user-orders-db
#spring.data.mongodb.user-orders-db.database=user-orders-db
server.port=9090 server.port=9090
#spring.data.mongodb.host=localhost spring.data.mongodb.host=localhost
#spring.data.mongodb.port=27017 spring.data.mongodb.port=27017
spring.data.mongodb.database=user-orders-db spring.data.mongodb.database=user-orders-db
order_topic=order_topic
#spring.data.mongodb.orders-db.uri=mongodb://localhost:27017/orders-db
#spring.data.mongodb.orders-db.database=orders-db
package com.nisum.webfluxmongodbordermanagement.controller; //package com.nisum.webfluxmongodbordermanagement.controller;
//
import com.nisum.webfluxmongodbordermanagement.entity.Order; //import com.nisum.webfluxmongodbordermanagement.entity.Order;
import com.nisum.webfluxmongodbordermanagement.service.OrdersService; //import com.nisum.webfluxmongodbordermanagement.repository.OrderRepository;
import org.junit.jupiter.api.BeforeEach; //import com.nisum.webfluxmongodbordermanagement.service.OrdersService;
import org.junit.jupiter.api.Disabled; //import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; //import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith; //import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; //import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate; //import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
import org.springframework.test.context.ActiveProfiles; //import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.test.context.junit.jupiter.SpringExtension; //import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.test.web.reactive.server.WebTestClient; //import org.springframework.test.context.ActiveProfiles;
import reactor.core.publisher.Flux; //import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.test.StepVerifier; //import org.springframework.test.web.reactive.server.WebTestClient;
//import reactor.core.publisher.Flux;
import java.io.IOException; //import reactor.test.StepVerifier;
//
@DataMongoTest //import java.io.IOException;
@ExtendWith(SpringExtension.class) //
@ActiveProfiles("test") ////@DataMongoTest
@Disabled //@WebFluxTest
public class OrderControllerIntTest { //@ExtendWith(SpringExtension.class)
//@ActiveProfiles("test")
static { //public class OrderControllerIntTest {
System.setProperty("spring.mongodb.embedded.version","5.0.0"); //
} // static {
// System.setProperty("spring.mongodb.embedded.version","5.0.0");
@Autowired // }
WebTestClient webTestClient; //
@Autowired(required=true) // @Autowired
OrdersService orderService; // OrdersService ordersService;
//
@BeforeEach // @Autowired
public void setup() throws IOException { // OrderRepository orderRepository;
//
// @Autowired
// WebTestClient webTestClient ;
//// .bindToServer()
//// .baseUrl("http://localhost:9090")
//// .build();
//
//
// @BeforeEach
// public void setup() throws IOException {
//// mongoTemplate.getCollection("orders").drop();
//// //mongoTemplate.insert(new User("23563456", "01", "samab12344","sarika@123456","1"));
//// mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess"));
//
// }
//
// @Test
// public void getOrderControllerTest(@Autowired MongoTemplate mongoTemplate) {
// mongoTemplate.getCollection("orders").drop(); // mongoTemplate.getCollection("orders").drop();
// //mongoTemplate.insert(new User("23563456", "01", "samab12344","sarika@123456","1"));
// mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess"));
}
@Test
public void getOrderControllerTest(@Autowired MongoTemplate mongoTemplate) {
mongoTemplate.getCollection("orders").drop();
// Flux<Order> result = orderService.getOrders();
// //
// mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess"));
// Flux<Order> result = ordersService.getOrders();
// StepVerifier.create(result) // StepVerifier.create(result)
// .expectNextMatches(order -> order.getOrderId().equals("01")) // .expectNextMatches(order -> order.getOrderId().equals("01"))
// .expectComplete() // .expectComplete()
// .verify(); // .verify();
// } // }
//// mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess"));
mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess")); ////
Flux<Order> result = orderService.getOrders(); //// Flux<Order> responseBody = webTestClient.get().uri("/orders")
Flux<Order> responseBody = webTestClient.get().uri("/orders") //// .exchange()
.exchange() //// .expectStatus().isOk()
.expectStatus().isOk() //// .returnResult(Order.class)
.returnResult(Order.class) //// .getResponseBody();
.getResponseBody(); //////
//// StepVerifier.create(responseBody)
//// .expectSubscription()
//// .expectNextMatches(order -> order.getUserId().equals("1"))
//// .verifyComplete();
////}
//
// //
StepVerifier.create(responseBody) //}
.expectSubscription()
.expectNextMatches(order -> order.getUserId().equals("1"))
.verifyComplete();
}
}
//package com.nisum.webfluxmongodbordermanagement.controller;
//
//import com.nisum.webfluxmongodbordermanagement.entity.Order;
//import com.nisum.webfluxmongodbordermanagement.service.OrdersService;
//import de.flapdoodle.embed.mongo.MongodExecutable;
//import de.flapdoodle.embed.mongo.MongodStarter;
//import de.flapdoodle.embed.mongo.config.IMongodConfig;
//import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
//import de.flapdoodle.embed.mongo.config.Net;
//import de.flapdoodle.embed.mongo.distribution.Version;
//import de.flapdoodle.embed.process.runtime.Network;
//import org.junit.jupiter.api.BeforeEach;
//import org.junit.jupiter.api.Test;
//import org.junit.jupiter.api.extension.ExtendWith;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
//import org.springframework.boot.test.context.SpringBootTest;
//import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
//import org.springframework.test.context.ActiveProfiles;
//import org.springframework.test.context.junit.jupiter.SpringExtension;
//import org.springframework.test.util.TestSocketUtils;
//import org.springframework.test.web.reactive.server.WebTestClient;
//
//import java.io.IOException;
//
////@SpringJUnitConfig
//@ExtendWith(SpringExtension.class)
//@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
//@AutoConfigureWebTestClient(timeout = "30000")
//@ActiveProfiles("test")
//public class OrderControllerIntegrationTest {
// static int port = TestSocketUtils.findAvailableTcpPort();
//
//
//
// public static final String MONGODB_URL = "mongodb://localhost:" + port;
// static MongodExecutable mongodExecutable = null;
// @Autowired
// ReactiveMongoTemplate mongoTemplate;
//
// @Autowired
// WebTestClient webTestClient;
//
// @Autowired
// OrdersService ordersService;
//
// @BeforeEach
// public void setup() throws IOException {
// MongodStarter starter = MongodStarter.getDefaultInstance();
// IMongodConfig mongodConfig = new MongodConfigBuilder().version(Version.Main.PRODUCTION)
// .net(new Net(port, Network.localhostIsIPv6())).build();
// mongodExecutable = starter.prepare(mongodConfig);
// mongodExecutable.start();
// }
//
// @Test
// public void getOrderControllerTest() {
//
// Order order = new Order("2356781", "01", "1", "ooppo122356", "oppo123456", "sucess");
// mongoTemplate.insert(order, "orders").block();
// webTestClient.get().uri("/orders")
// .exchange()
// .expectStatus().isOk()
// .returnResult(Order.class)
// .getResponseBody();
// }
//
//}
//// public void testGetUserByEmailId() {
//// UserDto userDto = ReadJson.read("userDetails.json", UserDto.class);
//// reactiveMongoTemplate.insert(userDto, USER_COLLECTION).block();
////
//// webTestClient.get().uri("/api/user/v1/login/amit@gmail.com")
//// .exchange()
//// .expectStatus().isOk()
//// .expectBody(UserDto.class)
//// .value(userDto1 ->
//// assertEquals("amit@gmail.com", userDto.getEmailId())
//// );
//// }
\ No newline at end of file
package com.nisum.webfluxmongodbordermanagement.service; //package com.nisum.webfluxmongodbordermanagement.service;
//
import com.nisum.webfluxmongodbordermanagement.entity.Order; //import com.nisum.webfluxmongodbordermanagement.entity.Order;
import com.nisum.webfluxmongodbordermanagement.repository.OrderRepository; //import com.nisum.webfluxmongodbordermanagement.repository.OrderRepository;
import org.junit.jupiter.api.BeforeEach; //import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; //import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; //import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; //import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
import org.springframework.data.mongodb.core.MongoTemplate; //import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.test.context.ActiveProfiles; //import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension; //import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Flux; //import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; //import reactor.test.StepVerifier;
//
import java.io.IOException; //import java.io.IOException;
@DataMongoTest //@DataMongoTest
@ExtendWith(SpringExtension.class) //@ExtendWith(SpringExtension.class)
@ActiveProfiles("test") //@ActiveProfiles("test")
public class OrdersServiceIntTest { //public class OrdersServiceIntTest {
//
static { // static {
System.setProperty("spring.mongodb.embedded.version","5.0.0"); // System.setProperty("spring.mongodb.embedded.version","5.0.0");
} // }
//
/*@Autowired // /*@Autowired
MongoTemplate mongoTemplate;*/ // MongoTemplate mongoTemplate;*/
@Autowired // @Autowired
OrderRepository orderRepository; // OrderRepository orderRepository;
//
@BeforeEach // @BeforeEach
public void setup() throws IOException { // public void setup() throws IOException {
//
// mongoTemplate.getCollection("orders").drop(); // // mongoTemplate.getCollection("orders").drop();
//mongoTemplate.insert(new User("23563456", "01", "samab12344","sarika@123456","1")); // //mongoTemplate.insert(new User("23563456", "01", "samab12344","sarika@123456","1"));
//mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess")); // //mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess"));
//
} // }
//
@Test // @Test
public void testGetOrders(@Autowired MongoTemplate mongoTemplate) { // public void testGetOrders(@Autowired MongoTemplate mongoTemplate) {
mongoTemplate.getCollection("orders").drop(); // mongoTemplate.getCollection("orders").drop();
Flux<Order> result = orderRepository.findAll(); // Flux<Order> result = orderRepository.findAll();
mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess")); // mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess"));
mongoTemplate.insert(new Order("2356782", "02", "2","ooppo122356","oppo123456","sucess")); // mongoTemplate.insert(new Order("2356782", "02", "2","ooppo122356","oppo123456","sucess"));
StepVerifier.create(result) // StepVerifier.create(result)
.expectNextMatches(order -> order.getUserId().equals("1")) // .expectNextMatches(order -> order.getUserId().equals("1"))
.expectNextMatches(o -> o.getUserId().equals("2")) // .expectNextMatches(o -> o.getUserId().equals("2"))
.expectComplete() // .expectComplete()
.verify(); // .verify();
} // }
//
} //}
package com.nisum.webfluxmongodbordermanagement.service; //package com.nisum.webfluxmongodbordermanagement.service;
//
import com.nisum.webfluxmongodbordermanagement.entity.Order; //import com.nisum.webfluxmongodbordermanagement.entity.Order;
import com.nisum.webfluxmongodbordermanagement.entity.User; //import com.nisum.webfluxmongodbordermanagement.entity.User;
import com.nisum.webfluxmongodbordermanagement.entity.UserOrders; //import com.nisum.webfluxmongodbordermanagement.entity.UserOrders;
import com.nisum.webfluxmongodbordermanagement.repository.UserRepository; //import com.nisum.webfluxmongodbordermanagement.repository.UserRepository;
import org.junit.jupiter.api.BeforeEach; //import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; //import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; //import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; //import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
import org.springframework.data.mongodb.core.MongoTemplate; //import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate; //import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.test.context.ActiveProfiles; //import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension; //import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Flux; //import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; //import reactor.test.StepVerifier;
//
import java.io.IOException; //import java.io.IOException;
//
//
@DataMongoTest //@DataMongoTest
@ExtendWith(SpringExtension.class) //@ExtendWith(SpringExtension.class)
@ActiveProfiles("test") //@ActiveProfiles("test")
public class UsersOrderServiceIntTest { //public class UsersOrderServiceIntTest {
//
static { // static {
System.setProperty("spring.mongodb.embedded.version","5.0.0"); // System.setProperty("spring.mongodb.embedded.version","5.0.0");
} // }
//
UserOrdersService userOrdersService; // UserOrdersService userOrdersService;
//
@Autowired // @Autowired
private ReactiveMongoTemplate reactiveMongoTemplate; // private ReactiveMongoTemplate reactiveMongoTemplate;
//
@Autowired // @Autowired
MongoTemplate mongoTemplate; // MongoTemplate mongoTemplate;
@Autowired // @Autowired
UserRepository userRepository; // UserRepository userRepository;
//
@BeforeEach // @BeforeEach
public void setup() throws IOException { // public void setup() throws IOException {
userOrdersService = new UserOrdersService(); // userOrdersService = new UserOrdersService();
userOrdersService.reactiveMongoTemplate = reactiveMongoTemplate; // userOrdersService.reactiveMongoTemplate = reactiveMongoTemplate;
mongoTemplate.getCollection("user").drop(); // mongoTemplate.getCollection("user").drop();
mongoTemplate.getCollection("orders").drop(); // mongoTemplate.getCollection("orders").drop();
//
//mongoTemplate.insert(new User("23563456", "01", "samab12344","sarika@123456","1")); // //mongoTemplate.insert(new User("23563456", "01", "samab12344","sarika@123456","1"));
mongoTemplate.insert(new Order("2356781", "01", "user1","ooppo122356","oppo123456","sucess")); // mongoTemplate.insert(new Order("2356781", "01", "user1","ooppo122356","oppo123456","sucess"));
mongoTemplate.insert(new Order("2356782", "02", "user1","vivo122356","vivo123456","sucess")); // mongoTemplate.insert(new Order("2356782", "02", "user1","vivo122356","vivo123456","sucess"));
mongoTemplate.insert(new Order("2356783", "03", "user2","sam122356","sam123456","sucess")); // mongoTemplate.insert(new Order("2356783", "03", "user2","sam122356","sam123456","sucess"));
//
mongoTemplate.insert(new User("2356781", "user1", "sarika","sarika@123456")); // mongoTemplate.insert(new User("2356781", "user1", "sarika","sarika@123456"));
mongoTemplate.insert(new User("2356782", "user2", "sama","sama@123456")); // mongoTemplate.insert(new User("2356782", "user2", "sama","sama@123456"));
mongoTemplate.insert(new User("2356783", "user3", "samasarika","sama@123456")); // mongoTemplate.insert(new User("2356783", "user3", "samasarika","sama@123456"));
//
//
//
} // }
//
//
@Test // @Test
public void testGetUsers(@Autowired MongoTemplate mongoTemplate) { // public void testGetUsers(@Autowired MongoTemplate mongoTemplate) {
//
// Flux<User> result = userRepository.findAll(); // // Flux<User> result = userRepository.findAll();
Flux<UserOrders> result = userOrdersService.getAllUsersWithOrders(); // Flux<UserOrders> result = userOrdersService.getAllUsersWithOrders();
//
//
StepVerifier.create(result) // StepVerifier.create(result)
.expectNextMatches(uo -> uo.getUserId().equals("user1") && uo.getOrder().size()==2) // .expectNextMatches(uo -> uo.getUserId().equals("user1") && uo.getOrder().size()==2)
.expectNextMatches(uo -> uo.getUserId().equals("user2") && uo.getOrder().size() == 1) // .expectNextMatches(uo -> uo.getUserId().equals("user2") && uo.getOrder().size() == 1)
.expectNextMatches(uo -> uo.getUserId().equals("user3") && uo.getOrder().size() == 0) // .expectNextMatches(uo -> uo.getUserId().equals("user3") && uo.getOrder().size() == 0)
.expectComplete() // .expectComplete()
.verify(); // .verify();
} // }
//
} //}
//
//
//
//
//
package com.nisum.webfluxmongodbordermanagement.service; //package com.nisum.webfluxmongodbordermanagement.service;
//
import com.nisum.webfluxmongodbordermanagement.entity.User; //import com.nisum.webfluxmongodbordermanagement.entity.User;
import com.nisum.webfluxmongodbordermanagement.repository.UserRepository; //import com.nisum.webfluxmongodbordermanagement.repository.UserRepository;
import org.junit.jupiter.api.BeforeEach; //import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; //import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; //import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; //import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
import org.springframework.data.mongodb.core.MongoTemplate; //import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.test.context.ActiveProfiles; //import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension; //import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Flux; //import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; //import reactor.test.StepVerifier;
//
import java.io.IOException; //import java.io.IOException;
//
//
@DataMongoTest //@DataMongoTest
@ExtendWith(SpringExtension.class) //@ExtendWith(SpringExtension.class)
@ActiveProfiles("test") //@ActiveProfiles("test")
public class UsersServiceIntTest { //public class UsersServiceIntTest {
//
static { // static {
System.setProperty("spring.mongodb.embedded.version","5.0.0"); // System.setProperty("spring.mongodb.embedded.version","5.0.0");
} // }
//
//
@Autowired // @Autowired
MongoTemplate mongoTemplate; // MongoTemplate mongoTemplate;
@Autowired // @Autowired
UserRepository userRepository; // UserRepository userRepository;
//
@BeforeEach // @BeforeEach
public void setup() throws IOException { // public void setup() throws IOException {
//
mongoTemplate.getCollection("user").drop(); // mongoTemplate.getCollection("user").drop();
//mongoTemplate.insert(new User("23563456", "01", "samab12344","sarika@123456")); // //mongoTemplate.insert(new User("23563456", "01", "samab12344","sarika@123456"));
mongoTemplate.insert(new User("2356781", "01", "samasarika","sama@123456")); // mongoTemplate.insert(new User("2356781", "01", "samasarika","sama@123456"));
//
} // }
//
//
@Test // @Test
public void testGetUsers(@Autowired MongoTemplate mongoTemplate) { // public void testGetUsers(@Autowired MongoTemplate mongoTemplate) {
//
Flux<User> result = userRepository.findAll(); // Flux<User> result = userRepository.findAll();
//
StepVerifier.create(result) // StepVerifier.create(result)
.expectNextMatches(user -> user.getUserId().equals("01")) // .expectNextMatches(user -> user.getUserId().equals("01"))
.expectComplete() // .expectComplete()
.verify(); // .verify();
} // }
//
} // }
//
//
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment