Commit 6169a3d6 authored by Divisha Agarwal's avatar Divisha Agarwal
Browse files

final

parent 07b7deba
No related merge requests found
Showing with 69 additions and 47 deletions
+69 -47
......@@ -18,11 +18,7 @@ public class BackpressureSubscriber<Integer> implements Flow.Subscriber<Integer>
@Override
public void onNext(Integer item) {
log.info(Thread.currentThread().getName() + " | Received Inside Backpressure Subscriber = " + item);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
processingRequest(item);
subscription.request(1);
}
......@@ -36,4 +32,13 @@ public class BackpressureSubscriber<Integer> implements Flow.Subscriber<Integer>
public void onComplete() {
log.info("Completed");
}
private void processingRequest(Integer number) {
try {
log.info("Processing my number ::"+number);
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
......@@ -11,18 +11,14 @@ public class BackpressureUsingBufferStrategy {
public static void main(String[] args) throws InterruptedException {
final int BUFFER = 4;
Flow.Publisher<Integer> backPressurePublisher = new SubmissionPublisher<>(ForkJoinPool.commonPool(), BUFFER);
SubmissionPublisher<Integer> backPressurePublisher = new SubmissionPublisher<>(ForkJoinPool.commonPool(), BUFFER);
BackpressureSubscriber<Integer> backpressureSubscriber = new BackpressureSubscriber<>();
backPressurePublisher.subscribe(backpressureSubscriber);
for (int i = 0; i < 10; i++) {
log.info(Thread.currentThread().getName() + " Publishing = " + i);
((SubmissionPublisher<Integer>) backPressurePublisher).submit(i);
backPressurePublisher.submit(i);
}
((SubmissionPublisher<Integer>) backPressurePublisher).close();
backPressurePublisher.close();
Thread.sleep(10000);
}
}
......@@ -6,12 +6,6 @@ import java.util.concurrent.Flow;
@Slf4j
public class ElementSubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
private int nextElementExpected;
ElementSubscriber() {
this.nextElementExpected = 1;
}
@Override
public void onSubscribe(final Flow.Subscription subscription) {
......@@ -21,16 +15,8 @@ public class ElementSubscriber implements Flow.Subscriber<Integer> {
@Override
public void onNext(final Integer elementNumber) {
/* if (elementNumber != nextElementExpected) {
IntStream.range(nextElementExpected, elementNumber).forEach(
(itemNumber) ->
log.info(Thread.currentThread().getName()+"==="+"I missed the element " + itemNumber)
);
nextElementExpected = elementNumber;
}*/
log.info(Thread.currentThread().getName()+"==="+" I got a new element: " + elementNumber+" :Now wait till I give next Instruction");
sleep();
nextElementExpected++;
log.info(Thread.currentThread().getName()+"==="+" I got a new element in ElementSubscriber: " + elementNumber+" :Now wait till I give next Instruction");
processingRequest(elementNumber);
log.info(Thread.currentThread().getName()+"==="+"Get next element ");
subscription.request(1);
}
......@@ -45,8 +31,9 @@ public class ElementSubscriber implements Flow.Subscriber<Integer> {
log.info("Completed");
}
private void sleep() {
private void processingRequest(int number) {
try {
log.info("Processing my number inside ElementSubscriber::"+number);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
......
......@@ -17,12 +17,9 @@ public class EndSubscriber<T> implements Flow.Subscriber<T> {
@Override
public void onNext(T item) {
log.info("Got in End Subscriber : " + item );
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(Thread.currentThread().getName()+"==="+" I got a new element in EndSubscriber: " + item+" :Now wait till I give next Instruction");
processingRequest(item);
subscription.request(1);
}
@Override
......@@ -35,4 +32,13 @@ public class EndSubscriber<T> implements Flow.Subscriber<T> {
public void onComplete() {
log.info("Completed");
}
private void processingRequest(T number) {
try {
log.info("Processing my number inside EndSubscriber::"+number);
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
......@@ -8,6 +8,7 @@ public class SubscriberEndBackpressreExample {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Integer> sb=new SubmissionPublisher<>();
sb.subscribe(new ElementSubscriber());
//sb.subscribe(new EndSubscriber<>());
for (int i = 0; i < 10; i++) {
log.info(Thread.currentThread().getName() + " Publishing = " + i);
sb.submit(i);
......
......@@ -9,7 +9,8 @@ public class SimplePublisher implements Flow.Publisher<Integer> {
private final Iterator<Integer> iterator;
SimplePublisher(int count) {
SimplePublisher(int count)
{
this.iterator = IntStream.rangeClosed(1, count).iterator();
}
......
......@@ -12,10 +12,11 @@ public class SubmissionPublisherConsumeMethod {
SubmissionPublisher<Integer> submissionPublisher=new SubmissionPublisher<>();
CompletableFuture<Void> status;
status=submissionPublisher.consume(data->{
throw new IllegalArgumentException();
System.out.println(data.toString());
});
Thread.sleep(13000);
submissionPublisher.close();
Thread.sleep(2000);
//submissionPublisher.close();
submissionPublisher.closeExceptionally(new RuntimeException());
log.info("Status::"+status);
}
......
......@@ -12,14 +12,11 @@ public class SubmissionPublisherOfferMethod {
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 16);
publisher.subscribe(new CustomerSubscriber<>());
publisher.subscribe(new CustomerSubscriber<>());
// publish 3 items for each subscriber
for(int i = 0; i < 10; i++) {
int result = publisher.offer("item" + i, (subscriber, value) -> {
try {
......@@ -27,15 +24,15 @@ public class SubmissionPublisherOfferMethod {
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
return false;
});
if(result < 0) {
System.err.println("dropped: " + -result);
System.err.println("dropped: " + result);
}
}
Thread.sleep(3000);
Thread.sleep(10000);
publisher.close();
}
}
package com.nisum.publishers;
;
import com.nisum.subscribers.PrintSubscriber;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
@Slf4j
public class SubmissionPublisherSubmitMethod {
public static void main(String[] args) throws InterruptedException, ExecutionException {
SubmissionPublisher<Integer> feed = new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
PrintSubscriber<Integer> printSubscriber=new PrintSubscriber();
feed.subscribe(printSubscriber);
for (int i = 0; i < 10; i++) {
log.info(Thread.currentThread().getName() + " Publishing = " + i);
feed.submit(i);
Thread.sleep(4000);
}
feed.close();
}
}
......@@ -22,10 +22,11 @@ public class CustomerSubscriber<Integer> implements Flow.Subscriber<Integer> {
public void onNext(Integer arg0) {
log.info("Got in Customer Subscriber : " + arg0 );
try {
Thread.sleep(1000);
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment