Commit a7a7171d authored by Sarika Sama's avatar Sarika Sama

added spring cloud config

parent b8847239
spring-cloud-config:
spring cloud config provides server and client-side support for externalized configuration in a distributed system.
With the Config Server you have a central place to manage external properties for applications across all environments
\ No newline at end of file
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId> <artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version> <version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository --> <relativePath/> <!-- lookup parent from repository -->
</parent> </parent>
<groupId>com.nisum</groupId> <groupId>com.nisum</groupId>
...@@ -61,7 +61,19 @@ ...@@ -61,7 +61,19 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId> <artifactId>spring-boot-starter-webflux</artifactId>
</dependency> </dependency>
<dependency> <!-- <dependency>-->
<!-- <groupId>org.springframework.cloud</groupId>-->
<!-- <artifactId>spring-cloud-openfeign-core</artifactId>-->
<!-- <version>3.0.2</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-config-client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-client</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
</dependency> </dependency>
......
C:\kafka\bin\windows\zookeeper-server-start.bat C:\kafka\config\zookeeper.properties
C:\kafka\bin\windows\kafka-server-start.bat C:\kafka\config\server.properties
##C:\kafka\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 -topic nisum-stream
Run server properties of kafka:
-------------------------------
C:\kafka\bin\windows\kafka-server-start.bat .\config\server.properties
Run on cmd for zookeeper properties of kafka:
---------------------------------------------
C:\kafka\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
To create a Topic in new terminal
---------------------------------
C:\kafka\bin\windows\kafka-topics.bat --create --topic library-events1 --bootstrap-server localhost:9092
C:\kafka\bin\windows\kafka-topics.bat --create --topic order_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
C:\kafka\bin\windows\kafka-topics.bat --delete --topic order_topic --bootstrap-server localhost:9092
List of Topics:
----------------
C:\kafka\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
Add messages into the topics in producer
---------------------------------------------
C:\kafka\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic library-events
To see messages from the topic in consumer
-------------------------------------------
c:\kafka\bin\windows\kafka-console-consumer.bat --topic library-events1 --from-beginning --bootstrap-server localhost:9092
c:\kafka\bin\windows\kafka-console-consumer.bat --topic order_topic --from-beginning --bootstrap-server localhost:9092
http://localhost:9090/webjars/swagger-ui/index.html
\ No newline at end of file
package com.nisum.webfluxmongodbordermanagement.controller;
import com.nisum.webfluxmongodbordermanagement.entity.Order;
import com.nisum.webfluxmongodbordermanagement.service.OrdersService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
public class OrderConfigController {
@Value("${app.order}")
private String orderNumber;
@GetMapping("/getOrderNumber")
public Mono<String> getOrderNumbers() {
return Mono.just(orderNumber);
}
}
...@@ -3,6 +3,8 @@ package com.nisum.webfluxmongodbordermanagement.controller; ...@@ -3,6 +3,8 @@ package com.nisum.webfluxmongodbordermanagement.controller;
import com.nisum.webfluxmongodbordermanagement.entity.Order; import com.nisum.webfluxmongodbordermanagement.entity.Order;
import com.nisum.webfluxmongodbordermanagement.service.OrdersService; import com.nisum.webfluxmongodbordermanagement.service.OrdersService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
...@@ -13,7 +15,7 @@ public class OrdersController { ...@@ -13,7 +15,7 @@ public class OrdersController {
@Autowired @Autowired
private OrdersService ordersService; private OrdersService ordersService;
@PostMapping("/publish-orders") @GetMapping("/publish-orders")
public Flux<Order> publishToKafka() { public Flux<Order> publishToKafka() {
Flux<Order> orders = ordersService.fetchOrdersAndPublish(); Flux<Order> orders = ordersService.fetchOrdersAndPublish();
return orders; return orders;
......
...@@ -23,28 +23,10 @@ public class UserOrdersController { ...@@ -23,28 +23,10 @@ public class UserOrdersController {
@Autowired @Autowired
private UserOrdersService userOrdersService; private UserOrdersService userOrdersService;
@Autowired
private KafkaProducer kafkaProducer;
@Value("${order_topic}")
private String topicName;
@GetMapping("/user-orders") @GetMapping("/user-orders")
public Flux<UserOrders> fetchUserOrdersAndPublish(){ public Flux<UserOrders> fetchUserOrdersAndPublish(){
Flux<UserOrders> userOrdersFlux = userOrdersService.getAllUsersWithOrders(); Flux<UserOrders> userOrdersFlux = userOrdersService.getAllUsersWithOrders();
Mono<List<UserOrders>> usersMono = userOrdersFlux.collectList() userOrdersService.fetchUserOrdersAndPublish(userOrdersFlux);
.subscribeOn(Schedulers.boundedElastic());
usersMono
.subscribe(
resultList -> {
log.info("sending userOrders to kafka");
for (UserOrders user : resultList) {
kafkaProducer.sendMessage( topicName, "userOrders-"+user.getUserId(), user.toString());
}
},
throwable -> log.error("Error occurred: {}", throwable.getMessage())
);
return userOrdersFlux; return userOrdersFlux;
} }
......
package com.nisum.webfluxmongodbordermanagement.entity; package com.nisum.webfluxmongodbordermanagement.entity;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.mapping.Document;
@Data @Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "orders") @Document(collection = "orders")
public class Order { public class Order {
@Id @Id
...@@ -15,4 +19,6 @@ public class Order { ...@@ -15,4 +19,6 @@ public class Order {
private String transactionId; private String transactionId;
private String trackingId; private String trackingId;
private String status; private String status;
} }
package com.nisum.webfluxmongodbordermanagement.service; package com.nisum.webfluxmongodbordermanagement.service;
import com.nisum.webfluxmongodbordermanagement.config.KafkaProducer;
import com.nisum.webfluxmongodbordermanagement.entity.UserOrders; import com.nisum.webfluxmongodbordermanagement.entity.UserOrders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate; import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation; import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
...@@ -9,14 +12,27 @@ import org.springframework.data.mongodb.core.aggregation.LookupOperation; ...@@ -9,14 +12,27 @@ import org.springframework.data.mongodb.core.aggregation.LookupOperation;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.List;
@Service @Service
@Slf4j
public class UserOrdersService { public class UserOrdersService {
@Autowired @Autowired
ReactiveMongoTemplate reactiveMongoTemplate; ReactiveMongoTemplate reactiveMongoTemplate;
@Autowired
private KafkaProducer kafkaProducer;
@Value("${order_topic}")
private String topicName;
public Flux<UserOrders> getAllUsersWithOrders() { public Flux<UserOrders> getAllUsersWithOrders() {
LookupOperation lookupOperation = LookupOperation.newLookup(). LookupOperation lookupOperation = LookupOperation.newLookup().
...@@ -42,8 +58,24 @@ public class UserOrdersService { ...@@ -42,8 +58,24 @@ public class UserOrdersService {
return reactiveMongoTemplate.aggregate(aggregation, "user", UserOrders.class); return reactiveMongoTemplate.aggregate(aggregation, "user", UserOrders.class);
}
public void fetchUserOrdersAndPublish( Flux<UserOrders> userOrdersFlux){
Mono<List<UserOrders>> usersMono = userOrdersFlux.collectList()
.subscribeOn(Schedulers.boundedElastic());
usersMono
.subscribe(
resultList -> {
log.info("sending userOrders to kafka");
for (UserOrders user : resultList) {
kafkaProducer.sendMessage( topicName, "userOrders-"+user.getUserId(), user.toString());
}
},
throwable -> log.error("Error occurred: {}", throwable.getMessage())
);
} }
} }
......
...@@ -4,3 +4,5 @@ spring.data.mongodb.port=27017 ...@@ -4,3 +4,5 @@ spring.data.mongodb.port=27017
spring.data.mongodb.database=user-orders-db spring.data.mongodb.database=user-orders-db
order_topic=order_topic order_topic=order_topic
spring.cloud.config.uri=http://localhost:8888
...@@ -55,7 +55,7 @@ ...@@ -55,7 +55,7 @@
// mongoTemplate.getCollection("orders").drop(); // mongoTemplate.getCollection("orders").drop();
// //
// mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess")); // mongoTemplate.insert(new Order("2356781", "01", "1","ooppo122356","oppo123456","sucess"));
// Flux<Order> result = ordersService.getOrders(); // Flux<Order> result = ordersService.fetchOrdersAndPublish();
// StepVerifier.create(result) // StepVerifier.create(result)
// .expectNextMatches(order -> order.getOrderId().equals("01")) // .expectNextMatches(order -> order.getOrderId().equals("01"))
// .expectComplete() // .expectComplete()
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
//import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; //import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
//import org.springframework.boot.test.context.SpringBootTest; //import org.springframework.boot.test.context.SpringBootTest;
//import org.springframework.data.mongodb.core.ReactiveMongoOperations;
//import org.springframework.data.mongodb.core.ReactiveMongoTemplate; //import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
//import org.springframework.test.context.ActiveProfiles; //import org.springframework.test.context.ActiveProfiles;
//import org.springframework.test.context.junit.jupiter.SpringExtension; //import org.springframework.test.context.junit.jupiter.SpringExtension;
...@@ -30,13 +31,15 @@ ...@@ -30,13 +31,15 @@
//@ActiveProfiles("test") //@ActiveProfiles("test")
//public class OrderControllerIntegrationTest { //public class OrderControllerIntegrationTest {
// static int port = TestSocketUtils.findAvailableTcpPort(); // static int port = TestSocketUtils.findAvailableTcpPort();
//
//
//
// public static final String MONGODB_URL = "mongodb://localhost:" + port; // public static final String MONGODB_URL = "mongodb://localhost:" + port;
// static MongodExecutable mongodExecutable = null; // static MongodExecutable mongodExecutable = null;
//
// static {
// System.setProperty("spring.mongodb.embedded.version","5.0.0");
// }
//
// @Autowired // @Autowired
// ReactiveMongoTemplate mongoTemplate; // ReactiveMongoOperations mongoOperations;
// //
// @Autowired // @Autowired
// WebTestClient webTestClient; // WebTestClient webTestClient;
...@@ -57,12 +60,13 @@ ...@@ -57,12 +60,13 @@
// public void getOrderControllerTest() { // public void getOrderControllerTest() {
// //
// Order order = new Order("2356781", "01", "1", "ooppo122356", "oppo123456", "sucess"); // Order order = new Order("2356781", "01", "1", "ooppo122356", "oppo123456", "sucess");
// mongoTemplate.insert(order, "orders").block(); // //mongoTemplate.insert(order, "orders").block();
// webTestClient.get().uri("/orders") // webTestClient.get().uri("/orders")
// .exchange() // .exchange()
// .expectStatus().isOk() // .expectStatus().isOk()
// .returnResult(Order.class) // .returnResult(Order.class)
// .getResponseBody(); // .getResponseBody();
//
// } // }
// //
//} //}
......
...@@ -46,9 +46,9 @@ public class OrderControllerTest { ...@@ -46,9 +46,9 @@ public class OrderControllerTest {
// Mock data // Mock data
Flux<Order> order = Flux.just(new Order("1", "1123", "01", "ooppo01234","ooppo12345678","success")); Flux<Order> order = Flux.just(new Order("1", "1123", "01", "ooppo01234","ooppo12345678","success"));
when(ordersService.getOrders()).thenReturn(order); when(ordersService.fetchOrdersAndPublish()).thenReturn(order);
Flux<Order> responseBody = webTestClient.get().uri("/orders") Flux<Order> responseBody = webTestClient.get().uri("/publish-orders")
.exchange() .exchange()
.expectStatus().isOk() .expectStatus().isOk()
.returnResult(Order.class) .returnResult(Order.class)
......
...@@ -45,9 +45,9 @@ public class UserControllerTest { ...@@ -45,9 +45,9 @@ public class UserControllerTest {
// Mock data // Mock data
Flux<User> user = Flux.just(new User("1", "1", "sama", "sama@gmail.com")); Flux<User> user = Flux.just(new User("1", "1", "sama", "sama@gmail.com"));
when(userService.getUsers()).thenReturn(user); when(userService.fetchUsersAndPublish()).thenReturn(user);
Flux<User> responseBody = webTestClient.get().uri("/users") Flux<User> responseBody = webTestClient.get().uri("/publish-users")
.exchange() .exchange()
.expectStatus().isOk() .expectStatus().isOk()
.returnResult(User.class) .returnResult(User.class)
......
...@@ -8,6 +8,7 @@ import org.junit.jupiter.api.AfterEach; ...@@ -8,6 +8,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest; import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.MockBean;
...@@ -19,6 +20,8 @@ import reactor.test.StepVerifier; ...@@ -19,6 +20,8 @@ import reactor.test.StepVerifier;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ExtendWith(SpringExtension.class) @ExtendWith(SpringExtension.class)
...@@ -26,6 +29,8 @@ import static org.mockito.Mockito.when; ...@@ -26,6 +29,8 @@ import static org.mockito.Mockito.when;
public class UserOrderControllerTest { public class UserOrderControllerTest {
@InjectMocks
UserOrdersController controller;
@Autowired @Autowired
private WebTestClient webTestClient; private WebTestClient webTestClient;
...@@ -38,9 +43,6 @@ public class UserOrderControllerTest { ...@@ -38,9 +43,6 @@ public class UserOrderControllerTest {
void setUp() { void setUp() {
} }
@Test
void getAllUsers() {
}
@Test @Test
public void testGetUsersAndOrders() { public void testGetUsersAndOrders() {
...@@ -49,6 +51,7 @@ public class UserOrderControllerTest { ...@@ -49,6 +51,7 @@ public class UserOrderControllerTest {
Arrays.asList(new Order("1","123","01","oppo1234","oppo23bvn1","success"), Arrays.asList(new Order("1","123","01","oppo1234","oppo23bvn1","success"),
new Order("2","1234","01","vivo1234","vivo23bvn1","success")))); new Order("2","1234","01","vivo1234","vivo23bvn1","success"))));
when(userOrdersService.getAllUsersWithOrders()).thenReturn(order); when(userOrdersService.getAllUsersWithOrders()).thenReturn(order);
doNothing().when(userOrdersService).fetchUserOrdersAndPublish(any());
Flux<UserOrders> responseBody = webTestClient.get().uri("/user-orders") Flux<UserOrders> responseBody = webTestClient.get().uri("/user-orders")
.exchange() .exchange()
......
package com.nisum.webfluxmongodbordermanagement.service; package com.nisum.webfluxmongodbordermanagement.service;
import com.nisum.webfluxmongodbordermanagement.config.KafkaProducer;
import com.nisum.webfluxmongodbordermanagement.entity.Order; import com.nisum.webfluxmongodbordermanagement.entity.Order;
import com.nisum.webfluxmongodbordermanagement.entity.User; import com.nisum.webfluxmongodbordermanagement.entity.User;
import com.nisum.webfluxmongodbordermanagement.repository.OrderRepository; import com.nisum.webfluxmongodbordermanagement.repository.OrderRepository;
...@@ -15,6 +16,8 @@ import org.springframework.data.mongodb.core.ReactiveMongoTemplate; ...@@ -15,6 +16,8 @@ import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class OrdersServiceTest { public class OrdersServiceTest {
...@@ -26,12 +29,13 @@ public class OrdersServiceTest { ...@@ -26,12 +29,13 @@ public class OrdersServiceTest {
@InjectMocks @InjectMocks
OrdersService ordersService; OrdersService ordersService;
@Mock
KafkaProducer kafkaProducer;
@BeforeEach @BeforeEach
void setUp() { void setUp() {
MockitoAnnotations.openMocks(this); MockitoAnnotations.initMocks(this);
} }
...@@ -41,9 +45,9 @@ public class OrdersServiceTest { ...@@ -41,9 +45,9 @@ public class OrdersServiceTest {
Order order1 = new Order("1123v34","1", "1","oppo123445", "oppo123", "success"); Order order1 = new Order("1123v34","1", "1","oppo123445", "oppo123", "success");
Order order2 = new Order("112345","2", "1","vivo123445", "vivo123", "success"); Order order2 = new Order("112345","2", "1","vivo123445", "vivo123", "success");
when(orderRepository.findAll()).thenReturn(Flux.just(order1, order2)); when(orderRepository.findAll()).thenReturn(Flux.just(order1, order2));
doNothing().when(kafkaProducer).sendMessage(anyString(),anyString(),anyString());
// Call the method // Call the method
Flux<Order> result = ordersService.getOrders(); Flux<Order> result = ordersService.fetchOrdersAndPublish();
// Verify the output // Verify the output
StepVerifier.create(result) StepVerifier.create(result)
......
package com.nisum.webfluxmongodbordermanagement.service; package com.nisum.webfluxmongodbordermanagement.service;
import com.nisum.webfluxmongodbordermanagement.config.KafkaProducer;
import com.nisum.webfluxmongodbordermanagement.entity.User; import com.nisum.webfluxmongodbordermanagement.entity.User;
import com.nisum.webfluxmongodbordermanagement.repository.UserRepository; import com.nisum.webfluxmongodbordermanagement.repository.UserRepository;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
...@@ -13,6 +14,8 @@ import org.springframework.data.mongodb.core.ReactiveMongoTemplate; ...@@ -13,6 +14,8 @@ import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
class UserServiceTest { class UserServiceTest {
...@@ -25,11 +28,14 @@ class UserServiceTest { ...@@ -25,11 +28,14 @@ class UserServiceTest {
@InjectMocks @InjectMocks
UserService userService; UserService userService;
@Mock
KafkaProducer kafkaProducer;
@BeforeEach @BeforeEach
void setUp() { void setUp() {
MockitoAnnotations.openMocks(this); MockitoAnnotations.initMocks(this);
} }
...@@ -39,9 +45,10 @@ class UserServiceTest { ...@@ -39,9 +45,10 @@ class UserServiceTest {
User user1 = new User("1","01", "John", "john@example.com"); User user1 = new User("1","01", "John", "john@example.com");
User user2 = new User("2","02","Jane", "jane@example.com"); User user2 = new User("2","02","Jane", "jane@example.com");
when(userRepository.findAll()).thenReturn(Flux.just(user1, user2)); when(userRepository.findAll()).thenReturn(Flux.just(user1, user2));
doNothing().when(kafkaProducer).sendMessage(anyString(),anyString(),anyString());
// Call the method // Call the method
Flux<User> result = userService.getUsers(); Flux<User> result = userService.fetchUsersAndPublish();
// Verify the output // Verify the output
StepVerifier.create(result) StepVerifier.create(result)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment