Commit ef1c38bb authored by vikram singh's avatar vikram singh

worked on the comment addded

parent f41cdfca
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
......@@ -18,78 +18,39 @@ import java.util.logging.Logger;
*/
@Service
public class AdvancedConcept_Schedular_test {
public class AdvancedConceptSchedulartest {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
// SimplePublisher
public static void main(String args[]) {
simplePublisher();
/**
* If you notice the above output of simplePublisher carefully, for each subscription (cold subscription), our Publisher publishes values. The map and consumption operations are getting executed in the respective threads where subscription happens. This is the default behavior.
*
* However Reactor provides an easy way to switch the task execution in the reactive chain using below methods.
*
* publishOn
* subscribeOn
*
* PublishOn:
* PublishOn accepts a Scheduler which changes the task execution context for the operations in the downstream. (for all the operations or until another PublishOn switches the context in the chain). Lets take a look at the below examples.
*/
//Scheulers.immediate():To keep the execution in the current thread.
publisherOn_with_immidiateScheduler();
//Schedulers.single():
//A single reusable thread. When we use this thread, all the operations of the reactive chain are executed using this thread by all the callers.
publisherOn_with_singleScheduler();
/**
* Schedulers.newSingle():
* Same as above. But a dedicated single thread just for the caller.
*/
publisherOn_with_newSingleScheduler();
/**
* Schedulers.elastic():
* This is a thread pool with unlimited threads which is no longer preferred. So DO NOT USE this option.
*/
/**
* Schedulers.boundedElastic():
* This is a preferred one instead of above elastic. This thread pool contains 10 * number of CPU cores you have. Good choice for IO operations.
*/
publisherOn_with_boundedElastic_Scheduler();
/**
* Schedulers.parallel():
* A fixed pool of workers that is tuned for parallel work. It creates as many workers as you have CPU cores.
*/
publisherOn_with_parallel_Scheduler();
/**
* Multiple PublishOn Methods:
* parallel/boundedElastic_Schedular
*/
multiplePublishOnMethods();
/**
* SubscribeOn:
* SubscribeOn method affects the context of the source emission. That is, as we had said earlier, nothing happens in the reactive chain until we subscribe! Once subscribed, the pipeline is getting executed by default on the thread which subscribed.
* When the publishOn method is encountered, it switches the context for the downstream operations. But the source which is the Flux / Mono / or any publisher, is always executed on the current thread which subscribed.
* This SubscribeOn method will change the behavior.
*
*/
multiplePublishOnMethodsUsingSubscribeOn();
}
// SimplePublisher
private static void simplePublisher() {
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
......@@ -109,6 +70,18 @@ public class AdvancedConcept_Schedular_test {
t2.start();
}
/**
* If you notice the above output of simplePublisher carefully, for each subscription (cold subscription), our Publisher publishes values. The map and consumption operations are getting executed in the respective threads where subscription happens. This is the default behavior.
* *
* * However Reactor provides an easy way to switch the task execution in the reactive chain using below methods.
* *
* publishOn
* subscribeOn
*
* PublishOn:
* PublishOn accepts a Scheduler which changes the task execution context for the operations in the downstream. (for all the operations or until another PublishOn switches the context in the chain). Lets take a look at the below examples.
*/
//Scheulers.immediate():To keep the execution in the current thread.
private static void publisherOn_with_immidiateScheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.immediate())
......@@ -128,6 +101,9 @@ public class AdvancedConcept_Schedular_test {
t1.start();
t2.start();
}
//Schedulers.single():
//A single reusable thread. When we use this thread, all the operations of the reactive chain are executed using this thread by all the callers.
private static void publisherOn_with_singleScheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.single())
......@@ -148,6 +124,10 @@ public class AdvancedConcept_Schedular_test {
t2.start();
}
/**
* Schedulers.newSingle():
* Same as above. But a dedicated single thread just for the caller.
*/
private static void publisherOn_with_newSingleScheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.newSingle("newScheduler"));
......@@ -164,6 +144,16 @@ public class AdvancedConcept_Schedular_test {
t2.start();
}
/**
* Schedulers.elastic():
* This is a thread pool with unlimited threads which is no longer preferred. So DO NOT USE this option.
*/
/**
* Schedulers.boundedElastic():
* This is a preferred one instead of above elastic. This thread pool contains 10 * number of CPU cores you have. Good choice for IO operations.
*/
private static void publisherOn_with_boundedElastic_Scheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.boundedElastic());
......@@ -180,6 +170,10 @@ public class AdvancedConcept_Schedular_test {
t2.start();
}
/**
* Schedulers.parallel():
* A fixed pool of workers that is tuned for parallel work. It creates as many workers as you have CPU cores.
*/
private static void publisherOn_with_parallel_Scheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.parallel());
......@@ -195,6 +189,10 @@ public class AdvancedConcept_Schedular_test {
t1.start();
t2.start();
}
/**
* Multiple PublishOn Methods:
* parallel/boundedElastic_Schedular
*/
private static void multiplePublishOnMethods(){
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
......@@ -222,6 +220,14 @@ public class AdvancedConcept_Schedular_test {
}
/**
* SubscribeOn:
* SubscribeOn method affects the context of the source emission. That is, as we had said earlier, nothing happens in the reactive chain until we subscribe! Once subscribed, the pipeline is getting executed by default on the thread which subscribed.
* When the publishOn method is encountered, it switches the context for the downstream operations. But the source which is the Flux / Mono / or any publisher, is always executed on the current thread which subscribed.
* This SubscribeOn method will change the behavior.
*
*/
private static void multiplePublishOnMethodsUsingSubscribeOn(){
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
......@@ -8,7 +8,7 @@ import reactor.test.StepVerifier;
import java.util.logging.Logger;
//check java controllers also to understand it more clearly;
public class Basic_Flux_Mono_Test {
public class BasicFluxMonoTest {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.junit.Test;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
/**
* Custom class to handle the exception exteding throwable
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.junit.Test;
import reactor.core.publisher.BaseSubscriber;
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
......
package com.nisum.webflux.basicOperators;
package com.nisum.webflux.basicoperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment