Commit 3fa0f01c authored by vikram singh's avatar vikram singh

did the required changes based on second review

parent 335be7ee
......@@ -20,8 +20,6 @@ import java.util.Map;
@Configuration
@Component
public class KafkaProducerConfig {
@Autowired
private KafkaProperties kafkaProperties;
......
......@@ -9,7 +9,7 @@ import org.springframework.stereotype.Service;
public class KafkaServer {
@Autowired
KafkaTemplate<String, String> template;
private KafkaTemplate<String, String> template;
@Value("${com.kafka.topic}")
private String topic;
......
......@@ -13,13 +13,13 @@ import reactor.core.publisher.Mono;
public class EmployeeController {
@Autowired
IEmployeeService employeeService;
private IEmployeeService employeeService;
@PostMapping("/employee")
public Mono<Employee> save(@RequestBody
Employee employee) {
log.info("employee obj ##################################################"+employee);
log.info("employee obj "+employee);
return employeeService.save(employee);
}
......@@ -38,28 +38,28 @@ public class EmployeeController {
@GetMapping("/employee/byCity")
public Flux<Employee> findByCity(@RequestParam
String city){
return employeeService.findByCity(city);
}
@GetMapping("/employee/byHobby")
public Flux<Employee> findByHobbyName(@RequestParam
String hobbyName){
return employeeService.findByHobbyName(hobbyName);
}
}
@GetMapping("/employee/byNameAndHobby")
public Flux<Employee> findByNameAndHobbyName(@RequestParam
String name, @RequestParam
String hobbyName){
return employeeService.findByNameAndHobbyName(name,hobbyName);
String name, @RequestParam
String hobbyName){
return employeeService.findByNameAndHobbyName(name,hobbyName);
}
@GetMapping("/employee/byNameAndCityAndHobby")
public Flux<Employee> findByNameAndCityAndHobby(@RequestParam
String name, @RequestParam
String cityName, @RequestParam
String hobbyName){
String name, @RequestParam
String cityName, @RequestParam
String hobbyName){
return employeeService.findByNameAndCityAndHobby(name,cityName,hobbyName);
}
}
......@@ -25,10 +25,10 @@ public class FruitController {
private String topic;
@Autowired
IFruitService fruitService;
private IFruitService fruitService;
@Autowired
OperationService operationService;
private OperationService operationService;
@Qualifier("simpleProducer")
......
......@@ -12,7 +12,7 @@ import org.springframework.web.bind.annotation.RestController;
//basic kafka
public class SampleController {
@Autowired
KafkaServer kafkaServer;
private KafkaServer kafkaServer;
@GetMapping("/send")
public String producer(@RequestParam
......
......@@ -10,7 +10,6 @@ import java.util.List;
@Setter
@Getter
@ToString
//@ToString(exclude= {"id","dob"})
@NoArgsConstructor
@AllArgsConstructor
@Document
......
......@@ -9,7 +9,6 @@ import org.springframework.data.annotation.Id;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Hobby {
@Id
private String hobbyId;
......
......@@ -20,7 +20,7 @@ import java.util.List;
@Slf4j
public class EmployeeService implements IEmployeeService{
@Autowired
EmployeeRepo empRepo;
private EmployeeRepo empRepo;
@Override
public Mono<Employee> save(Employee employee) {
......@@ -49,19 +49,19 @@ public class EmployeeService implements IEmployeeService{
@Override
public Flux<Employee> findByName(String name) {
return empRepo.findByName(name);
}
@Override
public Flux<Employee> findByCity(String city) {
return empRepo.findByCity(city);
}
@Override
public Flux<Employee> findByHobbyName(String hobbyName) {
return empRepo.findbyHobbyName(hobbyName);
}
......@@ -72,7 +72,7 @@ public class EmployeeService implements IEmployeeService{
@Override
public Flux<Employee> findByNameAndCityAndHobby(String name, String cityName, String hobbyName) {
return empRepo.findByNameAndCityAndHobby(name,cityName,hobbyName);
}
......
......@@ -23,7 +23,7 @@ import java.util.stream.Collectors;
public class FruitService implements IFruitService {
@Autowired
FruitRepository fruitRepository;
private FruitRepository fruitRepository;
@Override
public Mono<Fruit> save(Fruit fruit) {
......
package com.nisum.webflux.service.impl;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
@Aspect
public class Logging {
@Pointcut("execution(* com.nisum.*.*(..))")
private void selectAll(){}
@Before("selectAll()")
public void beforeAdvice(){
System.out.println("Going to Call desired Method");
}
}
......@@ -11,7 +11,7 @@ import reactor.core.publisher.Mono;
public class OperationService {
@Autowired
NameService nameService;
private NameService nameService;
public Mono<Fruit> operate(Fruit fruit) {
log.info("Into operate method of OperationService" + fruit);
......
package com.nisum.webflux.basicOperators;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.logging.Logger;
//link:http://www.vinsguru.com/reactive-programming-schedulers/
/**
......@@ -13,10 +16,13 @@ import reactor.core.scheduler.Schedulers;
* • boundedElastic(): Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) is capped
* • immediate(): to immediately run submitted Runnable instead of scheduling them (somewhat of a no-op or "null object" Scheduler)
*/
@Service
public class AdvancedConcept_Schedular_test {
// SimplePublisher
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
// SimplePublisher
public static void main(String args[]) {
......@@ -87,7 +93,7 @@ public class AdvancedConcept_Schedular_test {
private static void simplePublisher() {
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
log.info("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
......@@ -98,7 +104,7 @@ public class AdvancedConcept_Schedular_test {
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
log.info("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
......@@ -107,7 +113,7 @@ public class AdvancedConcept_Schedular_test {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.immediate())
.map(i -> {
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
log.info("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
......@@ -118,7 +124,7 @@ public class AdvancedConcept_Schedular_test {
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
log.info("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
......@@ -126,7 +132,7 @@ public class AdvancedConcept_Schedular_test {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.single())
.map(i -> {
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
log.info("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
......@@ -137,7 +143,7 @@ public class AdvancedConcept_Schedular_test {
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
log.info("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
......@@ -153,7 +159,7 @@ public class AdvancedConcept_Schedular_test {
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
log.info("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
......@@ -169,7 +175,7 @@ public class AdvancedConcept_Schedular_test {
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
log.info("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
......@@ -185,24 +191,24 @@ public class AdvancedConcept_Schedular_test {
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
log.info("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
private static void multiplePublishOnMethods(){
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
System.out.println("Mapping one for " + i + " is done by thread " + Thread.currentThread().getName());
log.info("Mapping one for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("Mapping two for " + i + " is done by thread " + Thread.currentThread().getName());
log.info("Mapping two for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.parallel())
.map(i -> {
System.out.println("Mapping three for " + i + " is done by thread " + Thread.currentThread().getName());
log.info("Mapping three for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
Runnable r=getRunnable(flux);
......@@ -210,7 +216,7 @@ public class AdvancedConcept_Schedular_test {
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
log.info("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
......@@ -219,17 +225,17 @@ public class AdvancedConcept_Schedular_test {
private static void multiplePublishOnMethodsUsingSubscribeOn(){
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
System.out.println("Mapping one for " + i + " is done by thread " + Thread.currentThread().getName());
log.info("Mapping one for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("Mapping two for " + i + " is done by thread " + Thread.currentThread().getName());
log.info("Mapping two for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.parallel())
.map(i -> {
System.out.println("Mapping three for " + i + " is done by thread " + Thread.currentThread().getName());
log.info("Mapping three for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
......@@ -242,7 +248,7 @@ public class AdvancedConcept_Schedular_test {
Runnable r = () -> flux
.subscribeOn(Schedulers.single())
.subscribe(s -> {
System.out.println("Received " + s + " via " + Thread.currentThread().getName());
log.info("Received " + s + " via " + Thread.currentThread().getName());
});
......@@ -253,7 +259,7 @@ public class AdvancedConcept_Schedular_test {
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
log.info("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
......@@ -262,7 +268,7 @@ public class AdvancedConcept_Schedular_test {
private static Runnable getRunnable(Flux<Integer> flux) {
return () -> flux.subscribe(s -> {
System.out.println("Received " + s + " via " + Thread.currentThread().getName());
log.info("Received " + s + " via " + Thread.currentThread().getName());
});
}
......
......@@ -5,8 +5,13 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.logging.Logger;
//check java controllers also to understand it more clearly;
public class Basic_Flux_Mono_Test {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
/**
* flux with error
*
......@@ -29,8 +34,8 @@ public class Basic_Flux_Mono_Test {
stringFlux
.subscribe(System.out::println,
(e) -> System.err.println("Exception is -> " + e)
, () -> System.out.println("Completed"));
(e) -> log.info("Exception is -> " + e)
, () -> log.info("Completed"));
}
@Test
......
......@@ -4,12 +4,16 @@ import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.logging.Logger;
/**
* Examples on Cold and hot publisher
* reference:::: http://www.vinsguru.com/reactive-programming-publisher-types-cold-vs-hot/
*/
public class ColdAndHotPublisherTest {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
/**
* cold publishers generates new data every time a new subscriber subscribes to them.
* Publishers by default do not produce any value unless at least 1 observer subscribes to it.
......@@ -25,11 +29,11 @@ public class ColdAndHotPublisherTest {
Flux<String> stringFlux = Flux.just("A","B","C","D","E","F")
.delayElements(Duration.ofSeconds(1));
stringFlux.subscribe(s -> System.out.println("Subscriber 1 : " + s)); //emits the value from beginning
stringFlux.subscribe(s -> log.info("Subscriber 1 : " + s)); //emits the value from beginning
Thread.sleep(2000);
stringFlux.subscribe(s -> System.out.println("Subscriber 2 : " + s));//emits the value from beginning
stringFlux.subscribe(s -> log.info("Subscriber 2 : " + s));//emits the value from beginning
Thread.sleep(4000);
......@@ -51,10 +55,10 @@ public class ColdAndHotPublisherTest {
ConnectableFlux<String> connectableFlux = stringFlux.publish();
connectableFlux.connect();
connectableFlux.subscribe(s -> System.out.println("Subscriber 1 : " + s));
connectableFlux.subscribe(s -> log.info("Subscriber 1 : " + s));
Thread.sleep(3000);
connectableFlux.subscribe(s -> System.out.println("Subscriber 2 : " + s)); // does not emit the values from beginning
connectableFlux.subscribe(s -> log.info("Subscriber 2 : " + s)); // does not emit the values from beginning
Thread.sleep(4000);
}
......
......@@ -5,6 +5,8 @@ import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.logging.Logger;
/**
* link : https://www.reactiveprogramming.be/project-reactor-backpressure/
* Understanding BackPressure concept
......@@ -15,6 +17,9 @@ import reactor.test.StepVerifier;
* 2.cancel
*/
public class FluxAndMonoBackPressureTest {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
/**
* verifies the backPressure
*/
......@@ -45,9 +50,9 @@ public class FluxAndMonoBackPressureTest {
Flux<Integer> finiteFlux = Flux.range(1, 10)
.log();
finiteFlux.subscribe((element) -> System.out.println("Element is : " + element)
, (e) -> System.err.println("Exception is : " + e)
, () -> System.out.println("Done")
finiteFlux.subscribe((element) -> log.info("Element is : " + element)
, (e) -> log.info("Exception is : " + e)
, () -> log.info("Done")
, (subscription -> subscription.request(2)));
}
......@@ -61,9 +66,9 @@ public class FluxAndMonoBackPressureTest {
Flux<Integer> finiteFlux = Flux.range(1, 10)
.log();
finiteFlux.subscribe((element) -> System.out.println("Element is : " + element)
, (e) -> System.err.println("Exception is : " + e)
, () -> System.out.println("Done")
finiteFlux.subscribe((element) -> log.info("Element is : " + element)
, (e) -> log.info("Exception is : " + e)
, () -> log.info("Done")
, (subscription -> subscription.cancel()));
}
......@@ -82,7 +87,7 @@ public class FluxAndMonoBackPressureTest {
@Override
protected void hookOnNext(Integer value) {
request(1);
System.out.println("Value received is : " + value);
log.info("Value received is : " + value);
if(value == 4){
cancel();
}
......
......@@ -5,6 +5,7 @@ import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.logging.Logger;
/**
*
......@@ -13,6 +14,9 @@ import java.time.Duration;
* understanding error operators
*/
public class FluxAndMonoErrorTest {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
/**
*
* onErrorResume method
......@@ -26,7 +30,7 @@ public class FluxAndMonoErrorTest {
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.concatWith(Flux.just("D"))
.onErrorResume((e) -> { // this block gets executed
System.out.println("Exception is : " + e);
log.info("Exception is : " + e);
return Flux.just("default", "default1");
});
......
......@@ -8,6 +8,7 @@ import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
import java.util.logging.Logger;
/**
* check java controllers/services also to understand it more clearly;
......@@ -15,6 +16,9 @@ import java.util.function.Supplier;
*/
public class FluxAndMonoFactoryTest {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
List<String> names = Arrays.asList("adam","anna","jack","jenny");
/**
......@@ -83,7 +87,7 @@ public class FluxAndMonoFactoryTest {
Mono<String> stringMono = Mono.fromSupplier(stringSupplier);
System.out.println(stringSupplier.get());
log.info(stringSupplier.get());
StepVerifier.create(stringMono.log())
.expectNext("adam")
......
......@@ -5,11 +5,14 @@ import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.logging.Logger;
/**
* Playing with flux with timestamp and delays;
*/
public class FluxAndMonoWithTimeTest {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
/**
* uses
* to create infinite sequence of the flux;
......@@ -22,7 +25,7 @@ public class FluxAndMonoWithTimeTest {
.log(); // starts from 0 --> ......
infiniteFlux
.subscribe((element) -> System.out.println("Value is : " + element));
.subscribe((element) -> log.info("Value is : " + element));
Thread.sleep(3000);
......
......@@ -40,10 +40,10 @@ public class EmployeeControllerComponentTest {
@Autowired
WebTestClient webTestClient;
private WebTestClient webTestClient;
@Autowired
EmployeeRepo empRepo;
private EmployeeRepo empRepo;
@Before
public void setup() {
......
......@@ -34,10 +34,10 @@ public class EmployeeControllerIntegrationTest {
@Autowired
WebTestClient webTestClient;
private WebTestClient webTestClient;
@InjectMocks
EmployeeController employeeController;
private EmployeeController employeeController;
@Mock
private IEmployeeService employeeService;
......@@ -52,7 +52,6 @@ public class EmployeeControllerIntegrationTest {
webTestClient.post().uri("/employee").contentType(MediaType.APPLICATION_JSON).body(Mono.just(getMockEmployee().blockFirst()), Employee.class)
.exchange()
.expectStatus().isOk().expectBody().jsonPath("$.id").isEqualTo("1");
// .jsonPath("$.description").isEqualTo("xx").jsonPath("$.price").isEqualTo(34.4);
}
@Test
public void getAll_UsingWebClientComponentTestCase() {
......@@ -74,14 +73,14 @@ public class EmployeeControllerIntegrationTest {
StepVerifier.create(employeeFlux.log()).expectSubscription()
.expectNextCount(1).verifyComplete();
}
@Test
public void getAll_UsingMockito(){
public void getAll_UsingMockito_JunitTestCase(){
Flux<Employee> employeeFlux=getMockEmployee();
when(employeeService.getAllEmployee()).thenReturn(employeeFlux);
assertEquals(employeeController.getAll(),employeeFlux);
}
private Flux<Employee> getMockEmployee() {
List<Hobby> hobbyObj=new ArrayList<>();
......
......@@ -24,10 +24,10 @@ import static org.mockito.Mockito.when;
@ContextConfiguration(classes = TestConfig.class)
public class FruitControllerComponentTest {
@Autowired
WebTestClient webTestClient;
private WebTestClient webTestClient;
@Autowired
FruitRepository fruitRepo;
private FruitRepository fruitRepo;
@Test
public void save() {
......
......@@ -24,9 +24,7 @@ import static org.mockito.Mockito.when;
public class FruitControllerIntegrationTest {
@Autowired
WebTestClient webTestClient;
private WebTestClient webTestClient;
@Test
public void save() {
......
......@@ -27,10 +27,9 @@ import static org.mockito.Mockito.when;
@RunWith(SpringRunner.class)
public class EmployeeServiceTest {
@Mock
EmployeeRepo empRepo;
private EmployeeRepo empRepo;
@InjectMocks
EmployeeService employeeService;
private EmployeeService employeeService;
@Test
public void getAllEmployeeTestUsingStepVerifier() {
......
......@@ -9,8 +9,8 @@ import org.springframework.boot.test.mock.mockito.MockBean;
public class TestConfig {
@MockBean
EmployeeRepo empRepo;
private EmployeeRepo empRepo;
@MockBean
FruitRepository fruitRepo;
private FruitRepository fruitRepo;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment