Commit 9bf16a22 authored by Divisha Agarwal's avatar Divisha Agarwal

added Backpressure

parent a03eb81e
package com.nisum.backpressureStrategy;
import java.util.concurrent.Flow;
public class BackpressureSubscriber<Integer> implements Flow.Subscriber<Integer>{
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + " | Received = " + item);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " | ERROR = "
+ throwable.getClass().getSimpleName() + " | " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
}
package com.nisum.backpressureStrategy;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
public class BackpressureUsingBufferStrategy {
public static void main(String[] args) throws InterruptedException {
final int BUFFER = 4;
Flow.Publisher<Integer> backPressurePublisher = new SubmissionPublisher<>(ForkJoinPool.commonPool(), BUFFER);
BackpressureSubscriber<Integer> backpressureSubscriber = new BackpressureSubscriber<>();
backPressurePublisher.subscribe(backpressureSubscriber);
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " Publishing = " + i);
((SubmissionPublisher<Integer>) backPressurePublisher).submit(i);
}
((SubmissionPublisher<Integer>) backPressurePublisher).close();
Thread.sleep(10000);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment