Commit 07b7deba authored by Divisha Agarwal's avatar Divisha Agarwal

added log

parent aa9d9fae
package com.nisum; package com.nisum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@Slf4j
@SpringBootApplication @SpringBootApplication
public class FlowApiBasicApplication { public class FlowApiBasicApplication {
......
...@@ -19,7 +19,7 @@ public class MyController { ...@@ -19,7 +19,7 @@ public class MyController {
@GetMapping("/home") @GetMapping("/home")
public void getHome(){ public void getHome(){
System.out.println("Welcome home ...."); log.info("Welcome home ....");
} }
......
package com.nisum; package com.nisum;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
@Slf4j
public class OrderSubscriber< T > implements Flow.Subscriber < T > { public class OrderSubscriber< T > implements Flow.Subscriber < T > {
private Flow.Subscription subscription; private Flow.Subscription subscription;
...@@ -13,8 +14,8 @@ public class OrderSubscriber< T > implements Flow.Subscriber < T > { ...@@ -13,8 +14,8 @@ public class OrderSubscriber< T > implements Flow.Subscriber < T > {
@Override @Override
public void onNext(T order) { public void onNext(T order) {
System.out.println("Subscriber : " + Thread.currentThread().getName() + " ORDER " +order); log.info("Subscriber : " + Thread.currentThread().getName() + " ORDER " +order);
System.out.println("Received--ORDER::" + order); log.info("Received--ORDER::" + order);
subscription.request(1); subscription.request(1);
} }
...@@ -25,6 +26,6 @@ public class OrderSubscriber< T > implements Flow.Subscriber < T > { ...@@ -25,6 +26,6 @@ public class OrderSubscriber< T > implements Flow.Subscriber < T > {
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("ON COMPLETE ORDER"); log.info("ON COMPLETE ORDER");
} }
} }
\ No newline at end of file
package com.nisum; package com.nisum;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
@Slf4j
public class ProductSubscriber< T > implements Flow.Subscriber < T > { public class ProductSubscriber< T > implements Flow.Subscriber < T > {
private Flow.Subscription subscription; private Flow.Subscription subscription;
...@@ -13,8 +16,8 @@ public class ProductSubscriber< T > implements Flow.Subscriber < T > { ...@@ -13,8 +16,8 @@ public class ProductSubscriber< T > implements Flow.Subscriber < T > {
@Override @Override
public void onNext(T product) { public void onNext(T product) {
System.out.println("Subscriber : " + Thread.currentThread().getName() + " PRODUCT " +product); log.info("Subscriber : " + Thread.currentThread().getName() + " PRODUCT " +product);
System.out.println("Received--PRODUCT::" + product); log.info("Received--PRODUCT::" + product);
subscription.request(1); subscription.request(1);
} }
......
...@@ -5,12 +5,14 @@ import com.nisum.OrderSubscriber; ...@@ -5,12 +5,14 @@ import com.nisum.OrderSubscriber;
import com.nisum.ProductSubscriber; import com.nisum.ProductSubscriber;
import com.nisum.model.Order; import com.nisum.model.Order;
import com.nisum.model.Product; import com.nisum.model.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.adapter.JdkFlowAdapter; import reactor.adapter.JdkFlowAdapter;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@Service @Service
@Slf4j
public class FlowAppService { public class FlowAppService {
@Autowired @Autowired
...@@ -29,7 +31,7 @@ public class FlowAppService { ...@@ -29,7 +31,7 @@ public class FlowAppService {
public Flux<Order> getOrderUsingPublisher() throws InterruptedException { public Flux<Order> getOrderUsingPublisher() throws InterruptedException {
OrderSubscriber<Order> orderSubscriber = new OrderSubscriber<>(); OrderSubscriber<Order> orderSubscriber = new OrderSubscriber<>();
System.out.println("Publishing OrderItems........................................................................"); log.info("Publishing OrderItems........................................................................");
orderData=myServiceImpl.getAllOrdersFromService(); orderData=myServiceImpl.getAllOrdersFromService();
JdkFlowAdapter.publisherToFlowPublisher(orderData).subscribe(orderSubscriber); JdkFlowAdapter.publisherToFlowPublisher(orderData).subscribe(orderSubscriber);
Thread.sleep(1000); Thread.sleep(1000);
...@@ -38,7 +40,7 @@ public class FlowAppService { ...@@ -38,7 +40,7 @@ public class FlowAppService {
public Flux<Product> getProductsUsingPublisher() throws InterruptedException { public Flux<Product> getProductsUsingPublisher() throws InterruptedException {
ProductSubscriber<Product> productSubscriber = new ProductSubscriber<>(); ProductSubscriber<Product> productSubscriber = new ProductSubscriber<>();
System.out.println("Publishing ProductItems....................................................................."); log.info("Publishing ProductItems.....................................................................");
productData=myServiceImpl.getAllProductsFromService(); productData=myServiceImpl.getAllProductsFromService();
JdkFlowAdapter.publisherToFlowPublisher(productData).subscribe(productSubscriber); JdkFlowAdapter.publisherToFlowPublisher(productData).subscribe(productSubscriber);
Thread.sleep(1000); Thread.sleep(1000);
......
...@@ -14,6 +14,8 @@ repositories { ...@@ -14,6 +14,8 @@ repositories {
dependencies { dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') { testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
} }
......
package com.nisum.backpressureStrategy; package com.nisum.backpressureStrategy;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
@Slf4j
public class BackpressureSubscriber<Integer> implements Flow.Subscriber<Integer>{ public class BackpressureSubscriber<Integer> implements Flow.Subscriber<Integer>{
private Flow.Subscription subscription; private Flow.Subscription subscription;
@Override @Override
public void onSubscribe(Flow.Subscription subscription) { public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed"); log.info("Subscribed");
this.subscription = subscription; this.subscription = subscription;
subscription.request(1); subscription.request(1);
} }
@Override @Override
public void onNext(Integer item) { public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + " | Received Inside Backpressure Subscriber = " + item); log.info(Thread.currentThread().getName() + " | Received Inside Backpressure Subscriber = " + item);
try { try {
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -25,12 +28,12 @@ public class BackpressureSubscriber<Integer> implements Flow.Subscriber<Integer> ...@@ -25,12 +28,12 @@ public class BackpressureSubscriber<Integer> implements Flow.Subscriber<Integer>
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " | ERROR = " log.error(Thread.currentThread().getName() + " | ERROR = "
+ throwable.getClass().getSimpleName() + " | " + throwable.getMessage()); + throwable.getClass().getSimpleName() + " | " + throwable.getMessage());
} }
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("Completed"); log.info("Completed");
} }
} }
package com.nisum.backpressureStrategy; package com.nisum.backpressureStrategy;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.SubmissionPublisher;
@Slf4j
public class BackpressureUsingBufferStrategy { public class BackpressureUsingBufferStrategy {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
...@@ -14,7 +17,7 @@ public class BackpressureUsingBufferStrategy { ...@@ -14,7 +17,7 @@ public class BackpressureUsingBufferStrategy {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " Publishing = " + i); log.info(Thread.currentThread().getName() + " Publishing = " + i);
((SubmissionPublisher<Integer>) backPressurePublisher).submit(i); ((SubmissionPublisher<Integer>) backPressurePublisher).submit(i);
} }
......
package com.nisum.backpressureStrategy; package com.nisum.backpressureStrategy;
import java.util.concurrent.Flow; import lombok.extern.slf4j.Slf4j;
import java.util.stream.IntStream;
import java.util.concurrent.Flow;
@Slf4j
public class ElementSubscriber implements Flow.Subscriber<Integer> { public class ElementSubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription; private Flow.Subscription subscription;
private int nextElementExpected; private int nextElementExpected;
...@@ -24,25 +24,25 @@ public class ElementSubscriber implements Flow.Subscriber<Integer> { ...@@ -24,25 +24,25 @@ public class ElementSubscriber implements Flow.Subscriber<Integer> {
/* if (elementNumber != nextElementExpected) { /* if (elementNumber != nextElementExpected) {
IntStream.range(nextElementExpected, elementNumber).forEach( IntStream.range(nextElementExpected, elementNumber).forEach(
(itemNumber) -> (itemNumber) ->
System.out.println(Thread.currentThread().getName()+"==="+"I missed the element " + itemNumber) log.info(Thread.currentThread().getName()+"==="+"I missed the element " + itemNumber)
); );
nextElementExpected = elementNumber; nextElementExpected = elementNumber;
}*/ }*/
System.out.println(Thread.currentThread().getName()+"==="+" I got a new element: " + elementNumber+" :Now wait till I give next Instruction"); log.info(Thread.currentThread().getName()+"==="+" I got a new element: " + elementNumber+" :Now wait till I give next Instruction");
sleep(); sleep();
nextElementExpected++; nextElementExpected++;
System.out.println(Thread.currentThread().getName()+"==="+"Get next element "); log.info(Thread.currentThread().getName()+"==="+"Get next element ");
subscription.request(1); subscription.request(1);
} }
@Override @Override
public void onError(final Throwable throwable) { public void onError(final Throwable throwable) {
System.out.println("Oops I got an error from the Publisher: " + throwable.getMessage()); log.error("Oops I got an error from the Publisher: " + throwable.getMessage());
} }
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("Completed"); log.info("Completed");
} }
private void sleep() { private void sleep() {
......
package com.nisum.backpressureStrategy; package com.nisum.backpressureStrategy;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class EndSubscriber<T> implements Flow.Subscriber<T> { public class EndSubscriber<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription; private Flow.Subscription subscription;
@Override @Override
public void onSubscribe(Flow.Subscription subscription) { public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription; this.subscription = subscription;
...@@ -21,7 +17,7 @@ public class EndSubscriber<T> implements Flow.Subscriber<T> { ...@@ -21,7 +17,7 @@ public class EndSubscriber<T> implements Flow.Subscriber<T> {
@Override @Override
public void onNext(T item) { public void onNext(T item) {
System.out.println("Got in End Subscriber : " + item ); log.info("Got in End Subscriber : " + item );
try { try {
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -31,12 +27,12 @@ public class EndSubscriber<T> implements Flow.Subscriber<T> { ...@@ -31,12 +27,12 @@ public class EndSubscriber<T> implements Flow.Subscriber<T> {
} }
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " | ERROR = " log.error(Thread.currentThread().getName() + " | ERROR = "
+ throwable.getClass().getSimpleName() + " | " + throwable.getMessage()); + throwable.getClass().getSimpleName() + " | " + throwable.getMessage());
} }
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("Completed"); log.info("Completed");
} }
} }
package com.nisum.backpressureStrategy; package com.nisum.backpressureStrategy;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.SubmissionPublisher;
@Slf4j
public class SubscriberEndBackpressreExample { public class SubscriberEndBackpressreExample {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Integer> sb=new SubmissionPublisher<>(); SubmissionPublisher<Integer> sb=new SubmissionPublisher<>();
sb.subscribe(new ElementSubscriber()); sb.subscribe(new ElementSubscriber());
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " Publishing = " + i); log.info(Thread.currentThread().getName() + " Publishing = " + i);
sb.submit(i); sb.submit(i);
} }
sb.close(); sb.close();
......
package com.nisum.basicFlowExampleUsingProcessor; package com.nisum.basicFlowExampleUsingProcessor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
@Slf4j
public class MySubscriber < T > implements Flow.Subscriber < T > { public class MySubscriber < T > implements Flow.Subscriber < T > {
private Flow.Subscription subscription; private Flow.Subscription subscription;
...@@ -13,8 +16,8 @@ public class MySubscriber < T > implements Flow.Subscriber < T > { ...@@ -13,8 +16,8 @@ public class MySubscriber < T > implements Flow.Subscriber < T > {
@Override @Override
public void onNext(T item) { public void onNext(T item) {
System.out.println("Subscriber : " + Thread.currentThread().getName() + " item " + item); log.info("Subscriber : " + Thread.currentThread().getName() + " item " + item);
System.out.println("Received::" + item); log.info("Received::" + item);
subscription.request(1); subscription.request(1);
} }
...@@ -25,6 +28,6 @@ public class MySubscriber < T > implements Flow.Subscriber < T > { ...@@ -25,6 +28,6 @@ public class MySubscriber < T > implements Flow.Subscriber < T > {
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("Done"); log.info("Done");
} }
} }
\ No newline at end of file
package com.nisum.processors; package com.nisum.processors;
import com.nisum.basicFlowExampleUsingProcessor.MySubscriber; import com.nisum.basicFlowExampleUsingProcessor.MySubscriber;
import com.nisum.processors.MyFilterProcessor; import lombok.extern.slf4j.Slf4j;
import com.nisum.processors.MyTransformProcessor;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.SubmissionPublisher;
@Slf4j
public class App { public class App {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
...@@ -29,7 +27,7 @@ public class App { ...@@ -29,7 +27,7 @@ public class App {
filterProcessor.subscribe(transformProcessor); filterProcessor.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber); transformProcessor.subscribe(subscriber);
System.out.println("Publishing Items..."); log.info("Publishing Items...");
String[] items = {"100", "x", "200", "x", "300", "x"}; String[] items = {"100", "x", "200", "x", "300", "x"};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i)); Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
......
package com.nisum.processors; package com.nisum.processors;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.SubmissionPublisher;
import java.util.function.Predicate; import java.util.function.Predicate;
@Slf4j
public class MyFilterProcessor < T, R > extends SubmissionPublisher<R> implements Flow.Processor< T , R > { public class MyFilterProcessor < T, R > extends SubmissionPublisher<R> implements Flow.Processor< T , R > {
private Predicate<T> predicate; private Predicate<T> predicate;
...@@ -21,7 +24,7 @@ public class MyFilterProcessor < T, R > extends SubmissionPublisher<R> imp ...@@ -21,7 +24,7 @@ public class MyFilterProcessor < T, R > extends SubmissionPublisher<R> imp
} }
public void onNext(T item) { public void onNext(T item) {
System.out.println("Filter Processor: " + Thread.currentThread().getName() + " item " + item); log.info("Filter Processor: " + Thread.currentThread().getName() + " item " + item);
if (predicate.test(item)) { if (predicate.test(item)) {
submit((R) item); submit((R) item);
} }
......
package com.nisum.processors; package com.nisum.processors;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function; import java.util.function.Function;
@Slf4j
public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> { public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function function; private Function function;
...@@ -22,7 +25,7 @@ public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements ...@@ -22,7 +25,7 @@ public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements
@Override @Override
public void onNext(T item) { public void onNext(T item) {
System.out.println("Transform Processor: " + Thread.currentThread().getName() + " item " + item); log.info("Transform Processor: " + Thread.currentThread().getName() + " item " + item);
submit((R) function.apply(item)); submit((R) function.apply(item));
subscription.request(1); subscription.request(1);
} }
......
package com.nisum.publishers; package com.nisum.publishers;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
@Slf4j
public class SimplePublisherMain { public class SimplePublisherMain {
public static void main(String[] args) { public static void main(String[] args) {
new SimplePublisher(10).subscribe(new Flow.Subscriber<>() { new SimplePublisher(10).subscribe(new Flow.Subscriber<>() {
...@@ -10,14 +13,14 @@ public class SimplePublisherMain { ...@@ -10,14 +13,14 @@ public class SimplePublisherMain {
} }
@Override @Override
public void onNext(Integer item) { public void onNext(Integer item) {
System.out.println("item = [" + item + "]"); log.info("item = [" + item + "]");
} }
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
} }
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("complete"); log.info("complete");
} }
}); });
} }
......
package com.nisum.publishers; package com.nisum.publishers;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*; import java.util.concurrent.*;
@Slf4j
public class SubmissionPublisherConsumeMethod { public class SubmissionPublisherConsumeMethod {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
...@@ -13,7 +16,7 @@ public class SubmissionPublisherConsumeMethod { ...@@ -13,7 +16,7 @@ public class SubmissionPublisherConsumeMethod {
}); });
Thread.sleep(13000); Thread.sleep(13000);
submissionPublisher.close(); submissionPublisher.close();
System.out.println("Status::"+status); log.info("Status::"+status);
} }
......
package com.nisum.publishers; package com.nisum.publishers;
import com.nisum.subscribers.CustomerSubscriber; import com.nisum.subscribers.CustomerSubscriber;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.SubmissionPublisher;
@Slf4j
public class SubmissionPublisherOfferMethod { public class SubmissionPublisherOfferMethod {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
......
...@@ -2,12 +2,13 @@ package com.nisum.sampleCode; ...@@ -2,12 +2,13 @@ package com.nisum.sampleCode;
import com.nisum.subscribers.MagazineSubscriber; import com.nisum.subscribers.MagazineSubscriber;
import com.nisum.subscribers.PrintSubscriber; import com.nisum.subscribers.PrintSubscriber;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.SubmissionPublisher;
@Slf4j
public class Sample { public class Sample {
public static void main(String[] args) throws InterruptedException, ExecutionException { public static void main(String[] args) throws InterruptedException, ExecutionException {
...@@ -17,7 +18,7 @@ public class Sample { ...@@ -17,7 +18,7 @@ public class Sample {
feed.subscribe(magazineSubscriber); feed.subscribe(magazineSubscriber);
feed.subscribe(printSubscriber); feed.subscribe(printSubscriber);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " Publishing = " + i); log.info(Thread.currentThread().getName() + " Publishing = " + i);
feed.submit(i); feed.submit(i);
Thread.sleep(1000); Thread.sleep(1000);
} }
......
package com.nisum.subscribers; package com.nisum.subscribers;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
public class CustomerSubscriber<Integer> implements Flow.Subscriber<Integer> { @Slf4j
public class CustomerSubscriber<Integer> implements Flow.Subscriber<Integer> {
private Flow.Subscription sub; private Flow.Subscription sub;
@Override @Override
public void onSubscribe(Flow.Subscription sub) { public void onSubscribe(Flow.Subscription sub) {
System.out.println("Subscription done"); log.info("Subscription done");
this.sub = sub; this.sub = sub;
sub.request(5); sub.request(5);
} }
...@@ -16,7 +20,7 @@ import java.util.concurrent.Flow; ...@@ -16,7 +20,7 @@ import java.util.concurrent.Flow;
@Override @Override
public void onNext(Integer arg0) { public void onNext(Integer arg0) {
System.out.println("Got in Customer Subscriber : " + arg0 ); log.info("Got in Customer Subscriber : " + arg0 );
try { try {
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -33,7 +37,7 @@ import java.util.concurrent.Flow; ...@@ -33,7 +37,7 @@ import java.util.concurrent.Flow;
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("onComplete"); log.info("onComplete");
} }
......
package com.nisum.subscribers; package com.nisum.subscribers;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
@Slf4j
public class EventSubscriber<Integer> implements Flow.Subscriber<Integer> { public class EventSubscriber<Integer> implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription; private Flow.Subscription subscription;
...@@ -13,7 +16,7 @@ public class EventSubscriber<Integer> implements Flow.Subscriber<Integer> { ...@@ -13,7 +16,7 @@ public class EventSubscriber<Integer> implements Flow.Subscriber<Integer> {
@Override @Override
public void onNext(Integer item) { public void onNext(Integer item) {
System.out.println("ES ::"+item); log.info("ES ::"+item);
subscription.request(1); subscription.request(1);
if(item.equals(3)) if(item.equals(3))
subscription.cancel(); subscription.cancel();
...@@ -21,12 +24,12 @@ public class EventSubscriber<Integer> implements Flow.Subscriber<Integer> { ...@@ -21,12 +24,12 @@ public class EventSubscriber<Integer> implements Flow.Subscriber<Integer> {
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
System.out.println("ERROR "+throwable); log.error("ERROR "+throwable);
} }
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("DONE"); log.info("DONE");
} }
} }
package com.nisum.subscribers; package com.nisum.subscribers;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
@Slf4j
public class MagazineSubscriber<Integer> implements Flow.Subscriber<Integer> { public class MagazineSubscriber<Integer> implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription; private Flow.Subscription subscription;
...@@ -13,7 +16,7 @@ public class MagazineSubscriber<Integer> implements Flow.Subscriber<Integer> { ...@@ -13,7 +16,7 @@ public class MagazineSubscriber<Integer> implements Flow.Subscriber<Integer> {
} }
@Override @Override
public void onNext(Integer item) { public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() +"Magazine ::" + item); log.info(Thread.currentThread().getName() +"Magazine ::" + item);
subscription.request(1); subscription.request(1);
if(item.equals(5)) if(item.equals(5))
subscription.cancel(); subscription.cancel();
...@@ -21,10 +24,10 @@ public class MagazineSubscriber<Integer> implements Flow.Subscriber<Integer> { ...@@ -21,10 +24,10 @@ public class MagazineSubscriber<Integer> implements Flow.Subscriber<Integer> {
} }
@Override @Override
public void onError(Throwable error) { public void onError(Throwable error) {
System.out.println("RECEIVED AN ERROR IN MAGAZINE SUBSCRIBER: " + error.getMessage()); log.error("RECEIVED AN ERROR IN MAGAZINE SUBSCRIBER: " + error.getMessage());
} }
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("MagazineSubscription is complete"); log.info("MagazineSubscription is complete");
} }
} }
package com.nisum.subscribers; package com.nisum.subscribers;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
@Slf4j
public class MessageSubscriber<Integer> implements Flow.Subscriber<Integer> { public class MessageSubscriber<Integer> implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription; private Flow.Subscription subscription;
...@@ -9,13 +12,13 @@ public class MessageSubscriber<Integer> implements Flow.Subscriber<Integer> { ...@@ -9,13 +12,13 @@ public class MessageSubscriber<Integer> implements Flow.Subscriber<Integer> {
@Override @Override
public void onSubscribe(Flow.Subscription subscription) { public void onSubscribe(Flow.Subscription subscription) {
this.subscription=subscription; this.subscription=subscription;
System.out.println("Subscription done"); log.info("Subscription done");
subscription.request(1); subscription.request(1);
} }
@Override @Override
public void onNext(Integer item) { public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + "MS ::"+item); log.info(Thread.currentThread().getName() + "MS ::"+item);
try { try {
Thread.sleep(2000); Thread.sleep(2000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -27,11 +30,11 @@ public class MessageSubscriber<Integer> implements Flow.Subscriber<Integer> { ...@@ -27,11 +30,11 @@ public class MessageSubscriber<Integer> implements Flow.Subscriber<Integer> {
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
System.out.println("RECEIVED AN ERROR IN MESSAGE SUBSCRIBER:::"+throwable); log.error("RECEIVED AN ERROR IN MESSAGE SUBSCRIBER:::"+throwable);
} }
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("DONE"); log.info("DONE");
} }
} }
package com.nisum.subscribers; package com.nisum.subscribers;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
@Slf4j
public class PrintSubscriber<Integer> implements Flow.Subscriber<Integer> { public class PrintSubscriber<Integer> implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription; private Flow.Subscription subscription;
...@@ -13,16 +16,16 @@ public class PrintSubscriber<Integer> implements Flow.Subscriber<Integer> { ...@@ -13,16 +16,16 @@ public class PrintSubscriber<Integer> implements Flow.Subscriber<Integer> {
} }
@Override @Override
public void onNext(Integer item) { public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() +":Print ::" + item); log.info(Thread.currentThread().getName() +":Print ::" + item);
subscription.request(1); subscription.request(1);
} }
@Override @Override
public void onError(Throwable error) { public void onError(Throwable error) {
System.out.println("RECEIVED AN ERROR IN PRINT SUBSCRIBER: " + error.getMessage()); log.error("RECEIVED AN ERROR IN PRINT SUBSCRIBER: " + error.getMessage());
} }
@Override @Override
public void onComplete() { public void onComplete() {
System.out.println("PrintSubscriber is complete"); log.info("PrintSubscriber is complete");
} }
} }
...@@ -16,9 +16,6 @@ public class SubscriptionBehaviour { ...@@ -16,9 +16,6 @@ public class SubscriptionBehaviour {
sleep(1000); sleep(1000);
} }
private static Boolean sleep(int ms) { private static Boolean sleep(int ms) {
try{ try{
Thread.sleep(ms); Thread.sleep(ms);
......
...@@ -15,5 +15,5 @@ public class Order { ...@@ -15,5 +15,5 @@ public class Order {
private int orderId; private int orderId;
private String orderName; private String orderName;
private List<Product> products; private List<Product> products;
//private int productCount=products.size();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment