package com.rp.assignment; import com.rp.coureutil.Util; import reactor.core.publisher.Flux; import java.util.concurrent.atomic.AtomicInteger; public class Assignment03EmitWIthCondition { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(0); Flux.generate((fluxSinkSynchronousSink) -> { // fluxSink.next(1); String country = Util.getFaker().country().name(); fluxSinkSynchronousSink.next(country); if (country.equalsIgnoreCase("india") || atomicInteger.incrementAndGet() > 10) { fluxSinkSynchronousSink.complete(); } }) //.take(3) .subscribe(Util.subscriber()); //other way to System.out.println("..............."); Flux.generate( () -> { System.out.println("starting the counter with " + 1); return 1; }, (state, fluxSinkSynchronousSink) -> { state++; fluxSinkSynchronousSink.next(1); if(state>10){ fluxSinkSynchronousSink.complete(); } return state; }, (s) -> { System.out.println("consumed: "+s); } ).subscribe(Util.subscriber()); } }