package com.nisum.basicFlowExampleUsingProcessor; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> { private Function function; private Flow.Subscription subscription; public MyTransformProcessor(Function<? super T, ? extends R> function) { super(); this.function = function; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { System.out.println("Transform Processor: " + Thread.currentThread().getName() + " item " + item); submit((R) function.apply(item)); subscription.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { close(); } }