Commit 73a34db4 authored by Sumit Kumar's avatar Sumit Kumar

Initial commit

parents
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>reactiveProgramming</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<!-- test data -->
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.4.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>8</release>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.rp.assignment;
import com.rp.coureutil.Util;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class Assignment02StockPrice {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch=new CountDownLatch(1);
getStockPrice().subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
subscription=s;
}
@Override
public void onNext(Integer integer) {
if(integer<-5 || integer>15){
System.out.println("Received invalid val so cancelling the subs: "+integer);
subscription.cancel();
countDownLatch.countDown();
}else{
System.out.println("received:: "+integer);
}
}
@Override
public void onError(Throwable t) {
countDownLatch.countDown();
}
@Override
public void onComplete() {
countDownLatch.countDown();
}
});
countDownLatch.await();
}
public static Flux<Integer> getStockPrice(){
AtomicInteger atomicInteger = new AtomicInteger(10);
return Flux.interval(Duration.ofSeconds(1))
.map(i-> atomicInteger.accumulateAndGet(Util.getFaker().random().nextInt(-5,5),Integer::sum));
}
}
package com.rp.assignment;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicInteger;
public class Assignment03EmitWIthCondition {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(0);
Flux.generate((fluxSinkSynchronousSink) -> {
// fluxSink.next(1);
String country = Util.getFaker().country().name();
fluxSinkSynchronousSink.next(country);
if (country.equalsIgnoreCase("india") || atomicInteger.incrementAndGet() > 10) {
fluxSinkSynchronousSink.complete();
}
})
//.take(3)
.subscribe(Util.subscriber());
//other way to
System.out.println("...............");
Flux.generate(
() -> {
System.out.println("starting the counter with " + 1);
return 1;
},
(state, fluxSinkSynchronousSink) -> {
state++;
fluxSinkSynchronousSink.next(1);
if(state>10){
fluxSinkSynchronousSink.complete();
}
return state;
},
(s) -> {
System.out.println("consumed: "+s);
}
).subscribe(Util.subscriber());
}
}
package com.rp.assignment;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.function.Consumer;
public class Assignment04ReadFile {
public static final Path path= Paths.get("src\\main\\resources\\assignment");
public static final String rootFolder=System.getProperty("user.dir");
public static final String resouceFolder="\\src\\main\\resources\\assignment";
public static Flux<String> stringFlux(){
return Flux.generate(()->{
FileReader fileReader=new FileReader(rootFolder+resouceFolder+"\\sec01");
Scanner sc=new Scanner(fileReader);
//String s = new String(Files.readAllBytes(path.resolve("")));
return sc;
},
(scanner, synchronousSink) -> {
if(scanner.hasNextLine()){
synchronousSink.next(scanner.nextLine());
}else{
synchronousSink.complete();
}
return scanner;
},
(s)-> {
System.out.println("Reading file is completed: "+s);
s.close();
}
);
}
public static void main(String[] args) {
/*stringFlux().subscribe(
(s)->{
System.out.println(s);
},
Util.onError(),
Util.onComplete()
);*/
Flux.generate(callable(path),biFunction(),consumer());
}
public static Flux<String> readFileLineByLine(Path path){
return Flux.generate(callable(path),biFunction(),consumer());
}
private static Callable<BufferedReader> callable(Path path){
return ()->Files.newBufferedReader(path);
}
private static BiFunction<BufferedReader, SynchronousSink<String>,BufferedReader> biFunction(){
return (bufferedReader, synchronousSink)->{
try {
String str=bufferedReader.readLine();
if(!Objects.isNull(str)){
synchronousSink.next(str);
}else{
synchronousSink.complete();
}
} catch (IOException e) {
synchronousSink.error(e);
//e.printStackTrace();
}
return bufferedReader;
};
}
private static Consumer<BufferedReader> consumer(){
return (s)->{
try {
s.close();
} catch (IOException e) {
e.printStackTrace();
}
};
}
}
package com.rp.assignment;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Scanner;
public class FileService {
public static final String rootFolder=System.getProperty("user.dir");
public static final String resouceFolder="\\src\\main\\resources\\assignment";
public static final Path path= Paths.get("src\\main\\resources\\assignment");
public static void main(String[] args) {
System.out.println(System.getProperty("user.dir"));
System.out.println(readFile("sec01"));
//createAndWrite("sec01");
//deleteFile("sec04");
}
public static String readFile(String fileName){
StringBuilder output= new StringBuilder();
try {
File fileReader=new File(rootFolder+resouceFolder+"\\"+fileName+"");
Scanner sc=new Scanner(fileReader);
while (sc.hasNextLine()){
//System.out.println(sc.nextLine());
output.append(sc.nextLine());
output.append("\n");
}
sc.close();
} catch (Exception e) {
e.printStackTrace();
}
return output.toString();
}
public static String readFile2(String filename){
String out=null;
try {
out= new String(Files.readAllBytes(path.resolve(filename)));
} catch (IOException e) {
e.printStackTrace();
}
return out;
}
public static void writeFile2(String filename,String content){
try {
Files.write(path.resolve(filename),content.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
public static void deleteFile2(String filename){
try {
Files.delete(path.resolve(filename));
} catch (IOException e) {
e.printStackTrace();
}
}
public static String readFile(File file){
StringBuilder output= new StringBuilder();
try {
File fileReader=new File(rootFolder+resouceFolder+"\\"+file.getName()+"");
Scanner sc=new Scanner(fileReader);
while (sc.hasNextLine()){
//System.out.println(sc.nextLine());
output.append(sc.nextLine());
output.append("\n");
}
sc.close();
} catch (Exception e) {
e.printStackTrace();
}
//System.out.println(output);
return output.toString();
}
public static String createAndWrite(String fileName){
File file=new File(rootFolder+resouceFolder+"\\"+fileName);
try {
file.createNewFile();
System.out.println("File is created %s"+file.getName());
FileWriter fileWriter=new FileWriter(file);
for(int i=0;i<5;i++){
fileWriter.write("Line no "+i+" random texts END:::\n");
}
fileWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
return file.getName();
}
public static void deleteFile(String fileName){
File file=new File(rootFolder+resouceFolder+"\\"+fileName);
file.delete();
System.out.println("file is deleted successfully :"+fileName);
}
}
package com.rp.assignment;
import com.rp.coureutil.Util;
import reactor.core.publisher.Mono;
import java.io.File;
import java.nio.file.Paths;
import static com.rp.assignment.FileService.resouceFolder;
import static com.rp.assignment.FileService.rootFolder;
public class MainClass {
public static void main(String[] args) {
//FileService fileService=new FileService();
//Mono<ReadFileService> readFileServiceMono=Mono.fromSupplier(()-> (s)->new File(rootFolder+resouceFolder+"\\"+s));
//read("sec01").subscribe((s)-> System.out.println(s));
write("sec02","abcdmnop\nhh").subscribe(
(s)-> System.out.println("n"),
(throwable)-> System.out.println(throwable),
()-> System.out.println("completed----")
);
read("sec02").subscribe((s)-> System.out.println(s));
delete("sec02").subscribe(
(s)-> System.out.println("n"),
(throwable)-> System.out.println(throwable),
()-> System.out.println("deleted...")
);
System.out.println(">>>>>>>>>>>");
Assignment04ReadFile.readFileLineByLine(Paths.get("src\\main\\resources\\assignment\\sec01"))
.take(2)
.subscribe(Util.subscriber());
}
public static Mono<String> read (String fileName){
return Mono.fromSupplier(
()->FileService.readFile2(fileName)
);
}
public static Mono<Void> write (String fileName,String content){
return Mono.fromRunnable(
()->FileService.writeFile2(fileName,content)
);
}
public static Mono<Void> delete (String fileName){
return Mono.fromRunnable(
()->FileService.deleteFile2(fileName)
);
}
}
package com.rp.assignment;
import java.io.File;
@FunctionalInterface
public interface ReadFileService {
public File readFile(String filename);
}
package com.rp.coureutil;
import reactor.core.publisher.FluxSink;
import java.util.function.Consumer;
public class DefaultConsumerSink implements Consumer<FluxSink<Object>> {
private FluxSink<Object> fluxSink;
/* public DefaultConsumerSink(){
}*/
@Override
public void accept(FluxSink<Object> fluxSink) {
this.fluxSink=fluxSink;
}
public void produce(){
String s = Util.getFaker().name().fullName();
this.fluxSink.next(Thread.currentThread().getName()+" : "+s);
}
}
package com.rp.coureutil;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class DefaultSubscriber implements Subscriber<Object> {
private Subscription subscription;
private String name;
public DefaultSubscriber(String name){
this.name=name;
}
public DefaultSubscriber(){
}
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object t) {
System.out.println(name+" Received: "+t);
}
@Override
public void onError(Throwable t) {
System.out.println(name+" Error: "+t.getMessage());
}
@Override
public void onComplete() {
System.out.println(name+" Completed: ");
}
}
package com.rp.coureutil;
import com.github.javafaker.Faker;
import org.reactivestreams.Subscriber;
import java.util.function.Consumer;
public class Util implements Runnable {
private static final Faker FAKER=Faker.instance();
public static Consumer<Object> onNext(){
return o -> System.out.println("Received: "+o);
}
public static Consumer<Throwable> onError(){
return e -> System.out.println("Error: "+e.getMessage());
}
public static Runnable onComplete(){
return ()->System.out.println("Completed ");
}
public static Faker getFaker(){
return FAKER;
}
public static void sleep(int sec){
try {
Thread.sleep(sec*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void sleepMili(int mili){
try {
Thread.sleep(mili);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Subscriber subscriber(){
return new DefaultSubscriber();
}
public static Subscriber subscriber(String name){
return new DefaultSubscriber(name);
}
@Override
public void run() {
System.out.println("Completed ");
}
}
package com.rp.helper;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
public class NameGenerator {
public static void main(String[] args) {
//System.out.println(getNames(5));
// getNamesFlux(5).subscribe(Util.onNext());
Flux.interval(Duration.ofSeconds(1)).subscribe(Util.onNext());
Util.sleep(5);
}
public static Flux<String> getNamesFlux(int count){
return Flux.range(1,count)
.map((integer)->getName());
}
public static List<String> getNames(int count){
List<String> list=new ArrayList<>(count);
for(int i=0;i<count;i++){
list.add(getName());
}
return list;
}
private static String getName(){
Util.sleep(1);
return Util.getFaker().name().nameWithMiddle();
}
}
package com.rp.helper;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class OrderService {
private static Map<Integer, List<PurchaseOrder>> db=new HashMap<>();//map of userId and list of purchase order
static {
List<PurchaseOrder> list1= Arrays.asList(
new PurchaseOrder(1),
new PurchaseOrder(1),
new PurchaseOrder(1)
);
List<PurchaseOrder> list2= Arrays.asList(
new PurchaseOrder(2),
new PurchaseOrder(2),
new PurchaseOrder(2)
);
db.put(1,list1);
db.put(2,list2);
}
public static Flux<PurchaseOrder> getOrder(int userId){
return Flux.fromIterable(db.get(userId));
}
public static Flux<PurchaseOrder> getOrder2(int userId){
return Flux.create(sink->{
db.get(userId).forEach(sink::next);
sink.complete();
});
}
public static Flux<PurchaseOrder> getOrderWithDelay(int userId){
return Flux.create((FluxSink<PurchaseOrder> sink)->{
db.get(userId).forEach(sink::next);
sink.complete();
}).delayElements(Duration.ofSeconds(1));
}
public static Flux<PurchaseOrder> getOrder(){
return Flux.fromIterable(
db.entrySet().stream()
//.map(integerListEntry -> integerListEntry.getValue())
.flatMap(purchaseOrders -> purchaseOrders.getValue().stream())
.collect(Collectors.toList())
);
}
}
package com.rp.helper;
import com.rp.coureutil.Util;
import lombok.*;
@Data
@ToString
public class Person {
private String name;
private int age;
public Person(){
this.name= Util.getFaker().name().nameWithMiddle();
this.age=Util.getFaker().random().nextInt(1,80);
}
}
package com.rp.helper;
import com.rp.coureutil.Util;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class PurchaseOrder {
private String item;
private String price;
private int userId;
public PurchaseOrder(int userId) {
this.userId = userId;
this.item= Util.getFaker().commerce().productName();
this.price=Util.getFaker().commerce().price();
}
}
package com.rp.helper;
import com.rp.coureutil.Util;
import lombok.Data;
import lombok.ToString;
import java.util.Arrays;
import java.util.List;
@Data
@ToString
public class Revenu {
private List<Integer> price;
private int orderId;
public Revenu(int orderId){
this.orderId=orderId;
this.price= Arrays.asList(
Util.getFaker().random().nextInt(10,200),
Util.getFaker().random().nextInt(20,80),
Util.getFaker().random().nextInt(100,200)
);
}
}
package com.rp.helper;
import com.rp.coureutil.Util;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class User {
private int userId;
private String name;
public User(int userId){
this.userId=userId;
this.name= Util.getFaker().name().fullName();
}
}
package com.rp.helper;
import reactor.core.publisher.Flux;
public class UserService {
public static Flux<User> getUser(){
return Flux.range(1,2)
.map(User::new);
}
}
package com.rp.sec;
import com.github.javafaker.Faker;
public class FakerDemo {
public static void main(String[] args) {
for(int i=0;i<10;i++){
System.out.println(Faker.instance().name().fullName());
}
}
}
package com.rp.sec;
import com.rp.coureutil.Util;
import reactor.core.publisher.Mono;
public class MonoClass {
public static void main(String[] arg){
//publisher
Mono<Integer> mono=Mono.just(1);
//System.out.println(mono);//unless we subscribe the publisher it won't print anything
mono.subscribe(num->{
System.out.println("Recieved "+num);
});
//here mono.subscribe means subscriber are requesting data
//and consumer(block of code written) is the onNext, so publisher(here mono) will call consumer(onNext) and data will start emitting
//since we don't have onComplete and onError method so when publisher require to call these it will simple do nothing as subscriber
//don't have these method.
/*Mono subscribe*/
Mono<String> mono2=Mono.just("ball");
mono2.subscribe();//publisher will emit the data but since we didn't provide any onNext method so subscriber will not recieve any data
mono2.subscribe(
item ->System.out.println(item) ,//on next,
throwable -> System.out.println(throwable.getMessage()), //onError
() -> System.out.println("Data processing is completed") //onComplete
);
Mono<Integer> mono3=Mono.just("ball")
.map(String::length)
.map(len->len/1);
//if we don't provide the onError it will give nasty exception and exception stacktrace.
/*mono3.subscribe(
item ->System.out.println(item) ,//on next,
throwable -> System.out.println(throwable.getMessage()), //onError if
() -> System.out.println("Data processing is completed") //onComplete
);*/
/*mono3.subscribe(
Util.onNext(),
Util.onError(),
Util.onComplete()
);*/
System.out.println("....................");
userRepository(3)
.subscribe(
Util.onNext(),
Util.onError(),
Util.onComplete()
);
}
private static Mono<String> userRepository(int userId){
if(userId==1) {
return Mono.just(Util.getFaker().name().fullName());
}else if(userId==2){
return Mono.empty();//don;t return null if data is not available.
}else{
return Mono.error(new RuntimeException("Not in range"));
}
}
}
package com.rp.sec;
import com.rp.coureutil.Util;
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
public class MonoFromFuture {
public static void main(String[] args) {
Mono.fromFuture(MonoFromFuture::getName)
.subscribe(Util.onNext());
Util.sleep(1);
System.out.println("..");
Mono.fromRunnable(()-> System.out.println(Util.getFaker().name().fullName()) ).subscribe(Util.onNext(),Util.onError(),Util.onComplete());
}
private static CompletableFuture<String> getName(){
return CompletableFuture.supplyAsync(()-> Util.getFaker().name().fullName());
}
}
package com.rp.sec;
import com.rp.coureutil.Util;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
public class MonoFromSupplier {
public static void main(String[] args) throws InterruptedException {
//use just method only when you have already data.Below line is wrong.It will excute getNmae mehtod.So just is not meant to be use like below
//Mono<String> mono= Mono.just(getName());
//when data is not available then use other ways to create mono like supplier
Mono<String> mono=Mono.fromSupplier(()-> getName());
mono.subscribe(
Util.onNext()
);
Supplier<String> supplier=()->getName();
//Even we can create mono from callable.
Callable<String> callable=()->getName();
Mono.fromCallable(callable).subscribe(Util.onNext());
System.out.println(".......");
getNameMono();
getNameMono()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(Util.onNext());
getNameMono();
Util.sleep(3);
}
public static String getName() {
//Thread.sleep(2000);
System.out.println("Generating name: ");
return Util.getFaker().name().nameWithMiddle();
}
public static Mono<String> getNameMono() {
System.out.println("get name: ");
return Mono.fromSupplier(()->{
System.out.println("Generating name: ");
Util.sleep(1);
return Util.getFaker().name().nameWithMiddle();
}).map(String::toUpperCase);
}
}
package com.rp.sec;
import java.util.stream.Stream;
public class Stream1 {
public static void main(String[] arg){
Stream<Integer> stream=Stream.of(1)
.map(num->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return num*2;
});
//System.out.println(stream); stream is lazy so it will not return anything
stream.forEach(System.out::println);
}
}
package com.rp.sec03Flux;
import com.rp.coureutil.DefaultConsumerSink;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class FluxCreate {
public static void main(String[] args) {
/* Flux<Object> objectFlux = Flux.create((fluxSink)-> {
// fluxSink.next(1);
while(!Util.getFaker().country().name().equalsIgnoreCase("india")){
fluxSink.next(Util.getFaker().country().name());
}
fluxSink.complete();
});*/
//objectFlux.subscribe(Util.subscriber("ss"));
System.out.println(Thread.currentThread().getName()+">>>>>>>");
DefaultConsumerSink sink=new DefaultConsumerSink();
Flux<Object> objectFlux1 = Flux.create(sink);
objectFlux1.subscribe(Util.subscriber());
// sink.produce();
//here we are publishing after subscribe.
/* Runnable runnable= sink::produce;
for(int i=0;i<20;i++){
new Thread(runnable).start();
}*/
System.out.println(Thread.currentThread().getName()+">>>>>>>");
//Util.sleep(2);
//whenever we are done with the emitting the data we should cancell the downstream pipelines
//so above flux is not doing the same. we should fix the issue.
Flux.create((fluxSink)-> {
// fluxSink.next(1);
String country=Util.getFaker().country().name();
/* while(!country.equalsIgnoreCase("india")){
System.out.println("Emiting: "+country);
fluxSink.next(country);
}*/
fluxSink.next(country);
//fluxSink.complete();
})
//.take(3)
.subscribe(Util.subscriber());
}
}
package com.rp.sec03Flux;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class FluxGenerate {
public static void main(String[] args) {
Flux.generate(synchronousSink -> {
synchronousSink.next(Util.getFaker().ancient().hero());
synchronousSink.complete();
//synchronousSink is single instances, it can be called only one time.so each time it give new instance of synchronousSink
//At most one next call be there
//synchronousSink.next(Util.getFaker().ancient().hero());
})
.take(3)
.subscribe(Util.subscriber());
}
}
package com.rp.sec03Flux;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class FluxTake {
public static void main(String[] args) {
Flux.create((fluxSink)-> {
// fluxSink.next(1);
String country= Util.getFaker().country().name();
while(!country.equalsIgnoreCase("india") && !fluxSink.isCancelled()){
System.out.println("Emiting: "+country);
fluxSink.next(country);
country= Util.getFaker().country().name();
}
fluxSink.complete();
})
.take(1)
.log()
.subscribe(Util.subscriber());
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class DefaultIfEmptyOperator {
public static void main(String[] args) {
getOrderNumber()
.filter(i->i>10)
.defaultIfEmpty(-100)
.subscribe(Util.subscriber());
}
private static Flux<Integer> getOrderNumber2(){
return Flux.generate(integerFluxSink -> {
//Util.sleep(Util.getFaker().random().nextInt(1,4));
integerFluxSink.next(Util.getFaker().random().nextInt(1,100));
})
;
}
private static Flux<Integer> getOrderNumber(){
return Flux.range(1,10);
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class DelayOperator {
public static void main(String[] args) {
//we can emit data after some interval than we can use Flux.interval also
//it will be scheduled in different thread pool
//So by default initial request will be 32 or it will look in the system propery "reactor.bufferSize.x"
// and if it find it will take the max of 8 and set in the system property.
System.setProperty("reactor.bufferSize.x","40");
Flux.range(1,100)
.log()
.delayElements(Duration.ofSeconds(1))
.subscribe(Util.subscriber());
Util.sleep(100);
}
}
package com.rp.sec04Operators;
import reactor.core.publisher.Flux;
public class DoCallBack {
public static void main(String[] args) {
Flux.create(fluxSink -> {
System.out.println("inside create");
for(int i=0;i<5;i++){
fluxSink.next(i);
}
fluxSink.complete();
System.out.println("completed");
})
.doFirst(()-> System.out.println("doFirst: ")) //it will excuted after first1
.doOnNext((i)-> System.out.println("doOnNext: "+i))
.doOnComplete(()-> System.out.println("doOnComplete: "))
.doOnSubscribe((s)-> System.out.println("doOnSubscribe: "+s))
.doOnRequest(n-> System.out.println("doOnRequest : "+n))
.doOnError(throwable -> System.out.println("doOnError: "+throwable.getMessage()))
.doOnTerminate(()-> System.out.println("doOnTerminate: "))
.doOnCancel(()-> System.out.println("doOnCancel: "))
.doFirst(()-> System.out.println("doFirst: ")) //it will first excuted.
.doFinally(signalType -> System.out.println("doFinally: signal type is: "+signalType))
.doOnDiscard(Object.class,o-> System.out.println("doOnDiscard: "+o))
.subscribe();
//order will be
// doFirst:
// doOnSubscribe:
// doOnRequest :
// inside create
// doOnNext: 0
// doOnComplete:
// doOnTerminate:
// doFinally:
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class LecFLuxImmutable {
public static void main(String[] args) {
Flux.range(1,10)
.map(i->i*10)
.subscribe(Util.subscriber());
//flux is immutable so
Flux<Integer> flux=Flux.range(1,10);
flux.map(i->i*10);
flux.filter(i->i>5);
flux.subscribe(Util.subscriber());
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import com.rp.helper.OrderService;
import com.rp.helper.UserService;
public class LecFlatMap {
//
public static void main(String[] args) {
//if i use map than it will emit flux.
UserService.getUser()
.map(user->OrderService.getOrder(user.getUserId()))
.subscribe(Util.subscriber());
//flat map..extracting the information and publishing it
System.out.println(".....flat map........");
UserService.getUser()
.flatMap(user->OrderService.getOrderWithDelay(user.getUserId()))
.subscribe(Util.subscriber());
Util.sleep(10);
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import com.rp.helper.Person;
import reactor.core.publisher.Flux;
import java.util.Locale;
import java.util.function.Function;
public class LecSwitchOnFirst {
public static void main(String[] args) {
getPersonFlux()
.switchOnFirst((signal, flux) -> {
if(signal.isOnNext() && signal.get().getAge()>30){
return flux;
}else {
return fluxFluxFunction().apply(flux);
}
})
.subscribe(Util.subscriber());
}
public static Flux<Person> getPersonFlux(){
return Flux.range(1,10)
.map(integer -> new Person());
}
public static Function<Flux<Person>,Flux<Person>> fluxFluxFunction(){
return (flux)->flux
.filter((person)->person.getAge()>10)
/*.map(person -> {
person.getName().toUpperCase(Locale.ROOT);
return person;
}) */
.doOnNext(person->person.setName(person.getName().toUpperCase(Locale.ROOT)))
.doOnDiscard(Person.class,p-> System.out.println("discarding object "+p));
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class LimitRate {
public static void main(String[] args) {
Flux.range(1,10000)
.log()
.limitRate(100,50) //by default it is 75 but we can adjust also
//if you want to drain all data than only you will reques than use
//.limitRate(100,0) ->It is confusing.
.subscribe(Util.subscriber());
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class OnErrorOperator {
public static void main(String[] args) {
//onErrorReturn
Flux.range(1,100)
.log()
.map(integer -> 10/(5-integer))
.onErrorReturn(-1)
.subscribe(Util.subscriber());
//onErrorResume
System.out.println("..........onErrorResume.........");
Flux.range(1,100)
.log()
.map(integer -> 10/(5-integer))
//.log()
.onErrorResume((throwabl)->fallBack())
//.log()
.subscribe(Util.onNext());
//onErrorContinue
System.out.println(".........OnErrorContinue.........");
Flux.range(1,100)
.log()
.map(integer ->10/(integer%10-5))
.onErrorContinue((throwabl,i)->fallBack())
.subscribe(Util.onNext());
//Mono.just(2).map(i->i/0).onErrorResume(throwable -> fallBack()).subscribe(Util.subscriber());
}
private static Mono<Integer> fallBack(){
return Mono.just(-1);
}
private static Flux<Integer> fallBack2(){
return Flux.range(1,10);
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class OperatorHandle {
public static void main(String[] args) {
//handle =filter+map
Flux.range(1,20)
.handle((integer, synchronousSink) -> {
if(integer%2==0){
synchronousSink.next(integer); //filter
}else{
synchronousSink.next("Odd no :"+integer); //map
}
}).subscribe(Util.subscriber());
System.out.println("...........");
Flux.create(fluxSink -> {
while (!fluxSink.isCancelled()){
fluxSink.next(Util.getFaker().animal().name());
}
}).handle((o, synchronousSink) -> {
synchronousSink.next(o);
if(o.toString().equalsIgnoreCase("zebra")){
synchronousSink.complete();
};
}).subscribe(Util.subscriber());
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class SwitchIfEmptyOperator {
public static void main(String[] args) {
getOrderNumber()
.filter(i->i>10)
.switchIfEmpty(fallBack())
.subscribe(Util.subscriber());
}
private static Flux<Integer> getOrderNumber(){
return Flux.range(1,10);
}
private static Flux<Integer> fallBack(){
return Flux.range(100,200);
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class TimeoutOperator {
public static void main(String[] args) {
//timeout for each emission not all the data it requested.It is time between two signal/
getOrderNumber()
.timeout(Duration.ofSeconds(2),fallBack())
.subscribe(Util.subscriber());
Util.sleep(50);
}
private static Flux<Integer> getOrderNumber(){
return Flux.generate(integerFluxSink -> {
Util.sleep(Util.getFaker().random().nextInt(1,4));
integerFluxSink.next(Util.getFaker().random().nextInt(1,100));
})
;
}
private static Flux<Integer> fallBack(){
return Flux.range(100,200).delayElements(Duration.ofSeconds(1));
}
}
package com.rp.sec04Operators;
import com.rp.coureutil.Util;
import com.rp.helper.Person;
import reactor.core.publisher.Flux;
import java.util.Locale;
import java.util.function.Function;
public class TransformOperator {
public static void main(String[] args) {
getPersonFlux()
.transform(fluxFluxFunction())
.subscribe(Util.subscriber());
}
public static Flux<Person> getPersonFlux(){
return Flux.range(1,10)
.map(integer -> new Person());
}
public static Function<Flux<Person>,Flux<Person>> fluxFluxFunction(){
return (flux)->flux
.filter((person)->person.getAge()>10)
/*.map(person -> {
person.getName().toUpperCase(Locale.ROOT);
return person;
}) */
.doOnNext(person->person.setName(person.getName().toUpperCase(Locale.ROOT)))
.doOnDiscard(Person.class,p-> System.out.println("discarding object "+p));
}
}
package com.rp.sec05HotColdPublisher;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.Stream;
public class ColdPublisher {
public static void main(String[] args) {
Flux<String> movieStream = Flux.fromStream(() -> getMovies())
.delayElements(Duration.ofSeconds(1));
movieStream.subscribe(Util.subscriber("sumit"));
Util.sleep(5);
movieStream.subscribe(Util.subscriber("sam"));
Util.sleep(20);
}
private static Stream<String> getMovies(){
System.out.println("Got he movies streaming request");
return Stream.of(
"Scene1",
"Scene2",
"Scene3",
"Scene4",
"Scene5"
);
}
}
package com.rp.sec05HotColdPublisher;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
public class InventoryService {
private Map<String, Integer> db =new HashMap<>();
public InventoryService() {
db.put("Kids",100);
db.put("Automotive",100);
}
public Consumer<PurchaseOrder> subscribeOrderStream(){
return purchaseOrder->db.computeIfPresent(purchaseOrder.getCategory(),(K,V)-> V-purchaseOrder.getQuantity());
}
public Flux<String> inventoryStream(){
return Flux.interval(Duration.ofSeconds(2))
.map(i->db.toString());
}
}
package com.rp.sec05HotColdPublisher;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.Stream;
public class Lec02HotShare {
public static void main(String[] args) {
Flux<String> movieStream = Flux.fromStream(() -> getMovies())
.delayElements(Duration.ofSeconds(1))
.share();//it convert from cold to hot publisher
movieStream.subscribe(Util.subscriber("sumit"));
Util.sleep(2);
movieStream.subscribe(Util.subscriber("sam"));
Util.sleep(20);
}
private static Stream<String> getMovies(){
System.out.println("Got he movies streaming request");
return Stream.of(
"Scene1",
"Scene2",
"Scene3",
"Scene4",
"Scene5"
);
}
}
package com.rp.sec05HotColdPublisher;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.Stream;
public class Lec03HotPub {
public static void main(String[] args) {
//share=
Flux<String> movieStream = Flux.fromStream(() -> getMovies())
.delayElements(Duration.ofSeconds(1))
.publish()
.refCount(2);//min subscriber need so publisher will start emitting.
movieStream.subscribe(Util.subscriber("sumit"));
Util.sleep(2);
movieStream.subscribe(Util.subscriber("sam"));
test();
Util.sleep(20);
}
private static Stream<String> getMovies() {
System.out.println("Got he movies streaming request");
return Stream.of(
"Scene1",
"Scene2",
"Scene3",
"Scene4",
"Scene5"
);
}
private static void test(){
System.out.println("..........test.........");
Flux<Object> movieStream = Flux.create(fluxSink -> {
System.out.println("created");
for(int i=0;i<5;i++){
fluxSink.next(i);
}
})
.delayElements(Duration.ofSeconds(1))
.publish()
.refCount(2);//min subscriber need so publisher will start emitting.
movieStream.subscribe(System.out::println);
movieStream.subscribe(System.out::println);
movieStream.subscribe(System.out::println);
/* movieStream.subscribe(Util.subscriber("sumit"));
Util.sleep(2);
movieStream.subscribe(Util.subscriber("sam"));*/
Util.sleep(20);
}
}
package com.rp.sec05HotColdPublisher;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.Stream;
public class Lec04HotPubAutoConnect {
public static void main(String[] args) {
//share=
Flux<String> movieStream = Flux.fromStream(() -> getMovies())
.delayElements(Duration.ofSeconds(1))
.publish()
.autoConnect(1);//if value is zero..it will start emitting immediatly even no one subscribe it
movieStream.subscribe(Util.subscriber("sumit"));
Util.sleep(3);
System.out.println("Sam is about to join");
movieStream.subscribe(Util.subscriber("sam"));
Util.sleep(20);
}
private static Stream<String> getMovies() {
System.out.println("Got he movies streaming request");
return Stream.of(
"Scene1",
"Scene2",
"Scene3",
"Scene4",
"Scene5"
);
}
}
package com.rp.sec05HotColdPublisher;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.Stream;
public class Lec04HotPublishCache {
public static void main(String[] args) {
//cache->publish()+replay()//params->how many last no of items it should remeber
Flux<String> movieStream = Flux.fromStream(() -> getMovies())
.delayElements(Duration.ofSeconds(1))
.cache(3);
movieStream.subscribe(Util.subscriber("sumit"));
Util.sleep(10);
System.out.println("Sam is about to join");
movieStream.subscribe(Util.subscriber("sam"));
Util.sleep(20);
}
private static Stream<String> getMovies() {
System.out.println("Got he movies streaming request");
return Stream.of(
"Scene1",
"Scene2",
"Scene3",
"Scene4",
"Scene5"
);
}
}
package com.rp.sec05HotColdPublisher;
import com.rp.coureutil.Util;
public class Lec06Assignment {
public static void main(String[] args) {
OrderService orderService=new OrderService();
RevenusService revenusService=new RevenusService();
InventoryService inventoryService =new InventoryService();
//revenus and inventory observe the order service
orderService.orderStream().subscribe(revenusService.subscribeOrderStream());
orderService.orderStream().subscribe(inventoryService.subscribeOrderStream());
inventoryService.inventoryStream().subscribe(Util.subscriber("Inventory"));
revenusService.revenuStream().subscribe(Util.subscriber("Revenue "));
Util.sleep(60);
}
}
package com.rp.sec05HotColdPublisher;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Objects;
public class OrderService {
private static Flux<PurchaseOrder> flux;
public Flux<PurchaseOrder> orderStream(){
if (Objects.isNull(flux)){
flux=getOrderStream();
}
return flux;
}
private static Flux<PurchaseOrder> getOrderStream(){
return Flux.interval(Duration.ofSeconds(1))
.map(i->new PurchaseOrder())
.publish()
.refCount(2);
}
}
package com.rp.sec05HotColdPublisher;
import com.rp.coureutil.Util;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class PurchaseOrder {
private String item;
private double price;
private String category;
private int quantity;
public PurchaseOrder() {
this.item= Util.getFaker().commerce().productName();
this.price= Double.parseDouble(Util.getFaker().commerce().price());
this.category=Util.getFaker().commerce().department();
this.quantity=Util.getFaker().random().nextInt(1,10);
}
}
package com.rp.sec05HotColdPublisher;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
public class RevenusService {
private Map<String, Double> revenueDb =new HashMap<>();
public RevenusService() {
revenueDb.put("Kids",0.0);
revenueDb.put("Automotive",0.0);
}
public Consumer<PurchaseOrder> subscribeOrderStream(){
return purchaseOrder->revenueDb.computeIfPresent(purchaseOrder.getCategory(),(K,V)-> V+purchaseOrder.getPrice());
}
public Flux<String> revenuStream(){
return Flux.interval(Duration.ofSeconds(2))
.map(i->revenueDb.toString());
}
}
package com.rp.sec06PublishOnSubscribeON;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class Flux07FluxInterval {
public static void main(String[] args) {
Flux.interval(Duration.ofSeconds(1))
.subscribe();
//it will exit immediatly because it internally uses scheduler.parallel execution.
//Also we cannot change the schedular from parralel to any other schedular like boundelastic
}
private static void printThreadName(String msg){
System.out.println(msg+"\tThread:\t."+Thread.currentThread().getName());
}
}
package com.rp.sec06PublishOnSubscribeON;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class Lec01ThreadDemo {
public static void main(String[] args) {
Flux<Object> create = Flux.create(sink -> {
Util.sleep(2);
printThreadName("create");
sink.next(1);
})
.doOnNext(i -> printThreadName("next" + i));
Runnable runnable=()->create.subscribe(v->printThreadName("subscription: "+v));
for(int i=0;i<2;i++){
System.out.println("for loop"+"\tThread:\t."+Thread.currentThread().getName());
new Thread(runnable).start();
}
System.out.println("end"+"\tThread:\t."+Thread.currentThread().getName());
}
private static void printThreadName(String msg){
System.out.println(msg+"\tThread:\t."+Thread.currentThread().getName());
}
}
package com.rp.sec06PublishOnSubscribeON;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Lec02SubscribeOnDemo {
public static void main(String[] args) {
Flux<Object> create = Flux.create(sink -> {
Util.sleep(2);
printThreadName("create");
sink.next(1);
})
.doOnNext(i -> printThreadName("next" + i))
;
/*create
.doFirst(()->printThreadName("first2"))
.subscribeOn(Schedulers.boundedElastic())
.doFirst(()->printThreadName("first1"))
.subscribe(v->printThreadName("subscription: "+v));*/
Runnable runnable=()->create
.doFirst(()->printThreadName("first2"))
.subscribeOn(Schedulers.boundedElastic())
.doFirst(()->printThreadName("first1"))
.subscribe(v->printThreadName("subscription: "+v));
for (int i = 0; i <2 ; i++) {
new Thread(runnable).start();
}
Util.sleep(2);
}
private static void printThreadName(String msg){
System.out.println(msg+"\tThread:\t."+Thread.currentThread().getName());
}
}
package com.rp.sec06PublishOnSubscribeON;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Lec03SubscribeOnMultiplems {
public static void main(String[] args) {
Flux<Object> create = Flux.create(sink -> {
printThreadName("create");
for (int i = 0; i <2 ; i++) {
sink.next(i);
Util.sleep(1);
}
sink.complete();
}).doOnNext(i -> printThreadName("next" + i));
Runnable runnable=()->create.doFirst(()->printThreadName("first2"))
.subscribeOn(Schedulers.boundedElastic())
.doFirst(()->printThreadName("first1"))
.subscribe(v->printThreadName("subscription: "+v));
for (int i = 0; i <5 ; i++) {
new Thread(runnable).start();
}
Util.sleep(20);
}
private static void printThreadName(String msg){
System.out.println(msg+"\tThread:\t."+Thread.currentThread().getName());
}
}
package com.rp.sec06PublishOnSubscribeON;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Lec04PublishOn {
public static void main(String[] args) {
Flux<Object> create = Flux.create(sink -> {
printThreadName("create ");
for (int i = 0; i <3 ; i++) {
sink.next(i);
Util.sleep(1);
}
sink.complete();
}).doOnNext(i -> printThreadName("next1 " + i));
create.publishOn(Schedulers.boundedElastic())
.doOnNext(i->printThreadName("next2 "+i))
.publishOn(Schedulers.parallel())
.subscribe(o -> printThreadName("subscribe "+o))
;
Util.sleep(20);
}
private static void printThreadName(String msg){
System.out.println(msg+"\tThread:\t."+Thread.currentThread().getName());
}
}
package com.rp.sec06PublishOnSubscribeON;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Lec05PubSubOn {
public static void main(String[] args) {
Flux<Object> create = Flux.create(sink -> {
printThreadName("create ");
for (int i = 0; i <3 ; i++) {
sink.next(i);
Util.sleep(1);
}
sink.complete();
}).doOnNext(i -> printThreadName("next1 " + i));
create.publishOn(Schedulers.parallel())
.doOnNext(i->printThreadName("next2 "+i))
.subscribeOn(Schedulers.boundedElastic())
.subscribe(o -> printThreadName("subscribe "+o))
;
Util.sleep(20);
}
private static void printThreadName(String msg){
System.out.println(msg+"\tThread:\t."+Thread.currentThread().getName());
}
}
package com.rp.sec06PublishOnSubscribeON;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.ArrayList;
import java.util.List;
public class Lec06Parallel {
public static void main(String[] args) {
List<Integer> list=new ArrayList<>();
Flux.range(1,20)
.parallel(2)
.runOn(Schedulers.parallel())
.doOnNext(i->printThreadName("next: "+i))
//.sequential() it will make things sequential by thread wise
.subscribe(list::add);
Util.sleep(5);
System.out.println(list.size());
}
private static void printThreadName(String msg){
System.out.println(msg+"\tThread:\t."+Thread.currentThread().getName());
}
}
package com.rp.sec07Buffer;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Lec01Demo {
public static void main(String[] args) {
Flux.create(fluxSink -> {
for (int i = 0; i <501 ; i++) {
fluxSink.next(i);
System.out.println(" Pushed: "+i);
}
fluxSink.complete();
}).publishOn(Schedulers.boundedElastic())
.doOnNext(o -> {
Util.sleepMili(10);
})
.subscribe(Util.subscriber());
Util.sleep(20);
}
}
package com.rp.sec07Buffer;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.ArrayList;
import java.util.List;
public class Lec02Drop {
public static void main(String[] args) {
System.setProperty("reactor.bufferSize.small", "16");
//75 % 16=12
//output will be first 16 element will be printed than 119(whenever 75% of buffer size is empty it will store the latest pushed element)
// ,120 like that it will be printed as it is the latest element in
//the queue
List<Object> list=new ArrayList<>();
Flux.create(fluxSink -> {
for (int i = 0; i <201 ; i++) {
fluxSink.next(i);
System.out.println(" Pushed: "+i);
Util.sleepMili(1);
}
fluxSink.complete();
})
.onBackpressureDrop(list::add) //will add the dropped value in the list
.publishOn(Schedulers.boundedElastic())
.doOnNext(o -> {
Util.sleepMili(10);
})
.subscribe(Util.subscriber());
Util.sleep(20);
System.out.println(list);
}
}
package com.rp.sec07Buffer;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Lec03Latest {
public static void main(String[] args) {
System.setProperty("reactor.bufferSize.small", "16");
//75 % 16=12
//output will be first 16 element will be printed than 88(whenever 75% of buffer size is empty it will store the one latest onet)
// ,89 like that it will be printed as it is the latest element in
//the queue
Flux.create(fluxSink -> {
for (int i = 0; i <201 ; i++) {
fluxSink.next(i);
System.out.println(" Pushed: "+i);
Util.sleepMili(1);
}
fluxSink.complete();
})
.onBackpressureLatest()
.publishOn(Schedulers.boundedElastic())
.doOnNext(o -> {
Util.sleepMili(10);
})
.subscribe(Util.subscriber());
Util.sleep(20);
}
}
package com.rp.sec07Buffer;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Lec04Error {
public static void main(String[] args) {
System.setProperty("reactor.bufferSize.small", "16");
//75 % 16=12
// when queue is full null Error: The receiver is overrun by more signals than expected (bounded queue...)
Flux.create(fluxSink -> {
for (int i = 0; i <201 && !fluxSink.isCancelled(); i++) {
fluxSink.next(i);
System.out.println(" Pushed: "+i);
Util.sleepMili(1);
}
fluxSink.complete();
})
.onBackpressureError()
.publishOn(Schedulers.boundedElastic())
.doOnNext(o -> {
Util.sleepMili(10);
})
.subscribe(Util.subscriber());
Util.sleep(20);
}
}
package com.rp.sec07Buffer;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Lec05BufferWithSize {
public static void main(String[] args) {
System.setProperty("reactor.bufferSize.small", "16");
//75 % 16=12
Flux.create(fluxSink -> {
for (int i = 0; i <201 && !fluxSink.isCancelled(); i++) {
fluxSink.next(i);
System.out.println(" Pushed: "+i);
Util.sleepMili(1);
}
fluxSink.complete();
})
.onBackpressureBuffer(20,(o)-> System.out.println("Dropped: "+o))
.publishOn(Schedulers.boundedElastic())
.doOnNext(o -> {
Util.sleepMili(10);
})
.subscribe(Util.subscriber());
Util.sleep(20);
}
}
package com.rp.sec08MergePublisher.Assignement;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class Car {
static AtomicInteger integer=new AtomicInteger(50);
public static Flux<Integer> getPrice(){
return Flux.interval(Duration.ofSeconds(1))
.map(aLong -> integer.getAndDecrement());
}
}
package com.rp.sec08MergePublisher.Assignement;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class CarFactor {
public static Flux<Double> getDemand(){
return Flux.interval(Duration.ofSeconds(1))
.map(i-> Util.getFaker().random().nextInt(8,12))
.map(i->Double.valueOf(i/10));
}
}
package com.rp.sec08MergePublisher.Assignement;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class MainClass {
public static void main(String[] args) {
/* Flux.combineLatest(Car.getPrice(),CarFactor.getDemand(),((integer, aDouble) -> Double.valueOf(integer*aDouble)))
.subscribe(Util.subscriber("latest car prise\t"));*/
System.out.println("............");
final int intialPrice=10000;
Flux.combineLatest(monthStream(),demandStream(),(month,demand)-> (intialPrice-100*month)*demand)
.subscribe(Util.subscriber());
Util.sleep(30);
}
private static Flux<Long> monthStream(){
return Flux.interval(Duration.ZERO,Duration.ofSeconds(1));
}
private static Flux<Double> demandStream(){
return Flux.interval(Duration.ofSeconds(3))
.map(i->Util.getFaker().random().nextInt(80,120)/100d)
.startWith(1d);
}
}
package com.rp.sec08MergePublisher;
import com.rp.coureutil.Util;
import com.rp.sec08MergePublisher.helper.NameGenerator;
public class Lec01StartWith {
public static void main(String[] args) {
NameGenerator nameGenerator=new NameGenerator();
nameGenerator.generateNames()
.take(2)
.subscribe(Util.subscriber());
nameGenerator.generateNames()
.take(3)
.subscribe(Util.subscriber());
//he might get from cache or from freshly created
nameGenerator.generateNames()
.filter(n->n.startsWith("A"))
.take(1)
.subscribe(Util.subscriber());
}
}
package com.rp.sec08MergePublisher;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class Lec02ConcatWith {
public static void main(String[] args) {
Flux<String> flux1 = Flux.just("a", "b");
Flux<String> flux2=Flux.error(new RuntimeException("oops"));
Flux<String> flux3 = Flux.just("c", "d", "e");
Flux<String> flux=flux1.concatWith(flux2);
Flux<String> stringFlux=Flux.concat(flux1,flux2,flux3);
Flux<String> stringFlux2=Flux.concatDelayError(flux1,flux2,flux3);
// flux2.startWith(flux1).subscribe(Util.subscriber());
System.out.println("............");
//flux.subscribe(Util.subscriber());
stringFlux2.subscribe(Util.subscriber());
}
}
package com.rp.sec08MergePublisher;
import com.rp.coureutil.Util;
import com.rp.sec08MergePublisher.helper.AmericanAirlines;
import com.rp.sec08MergePublisher.helper.EmiratesFlights;
import com.rp.sec08MergePublisher.helper.QatarFlights;
import reactor.core.publisher.Flux;
public class Lec03Merge {
public static void main(String[] args) {
//merge can publish form all the publisher concurrantly
Flux<String> merge = Flux.merge(QatarFlights.getFlights, EmiratesFlights.getFlights, AmericanAirlines.getFlights);
merge.subscribe(Util.subscriber());
Util.sleep(30);
System.out.println("........................");
//concat will publish sequentially
Flux<String> concat = Flux.concat(QatarFlights.getFlights, EmiratesFlights.getFlights, AmericanAirlines.getFlights);
concat.subscribe(Util.subscriber());
Util.sleep(30);
}
}
package com.rp.sec08MergePublisher;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
public class Lec04ZIip {
public static void main(String[] args) {
Flux.zip(getBody(),getTyres(),getEngine())
.doOnNext(objects -> {
// System.out.println("onNext: "+objects.getT1());
})
.subscribe(Util.subscriber());
}
private static Flux<String > getBody(){
return Flux.range(1,5)
.map(i->{
System.out.println("body"+i);
return "body: "+i;
});
}
private static Flux<String > getEngine(){
return Flux.range(1,2)
.map(i->"engine: "+i);
}
private static Flux<String > getTyres(){
return Flux.range(1,10)
.map(i->"tyres: "+i);
}
}
package com.rp.sec08MergePublisher;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class Lec05CombineLatest {
public static void main(String[] args) {
Flux.combineLatest(getStrings(),getNo(),((s, integer) -> s+integer))
.subscribe(Util.subscriber());
Util.sleep(10);
}
private static Flux<String> getStrings(){
return Flux.just("a","b","c","d")
.delayElements(Duration.ofSeconds(1))
;
}
private static Flux<Integer> getNo(){
return Flux.just(1,2,3)
.delayElements(Duration.ofSeconds(3))
;
}
}
package com.rp.sec08MergePublisher.helper;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class AmericanAirlines {
public static Flux<String> getFlights=Flux.range(1,5)
.delayElements(Duration.ofSeconds(1))
.map(i-> "AAA: "+ Util.getFaker().random().nextInt(100,999))
.filter(i->Util.getFaker().random().nextBoolean());
}
package com.rp.sec08MergePublisher.helper;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class EmiratesFlights {
public static Flux<String> getFlights=Flux.range(1,10)
.delayElements(Duration.ofSeconds(1))
.map(i-> "Emirates: "+ Util.getFaker().random().nextInt(100,999))
.filter(i->Util.getFaker().random().nextBoolean());
}
package com.rp.sec08MergePublisher.helper;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.util.ArrayList;
import java.util.List;
public class NameGenerator {
private List<String> list=new ArrayList<>();
public Flux<String> generateNames(){
return Flux.generate(synchronousSink -> {
String name= Util.getFaker().name().fullName();
System.out.println("genrated frsh\t:"+name);
synchronousSink.next(name);
list.add(name);
Util.sleep(1);
})
.cast(String.class)
.startWith(getFromCache());
}
private Flux<String> getFromCache(){
return Flux.fromIterable(list);
}
}
package com.rp.sec08MergePublisher.helper;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class QatarFlights {
public static Flux<String> getFlights=Flux.range(1,5)
.delayElements(Duration.ofSeconds(1))
.map(i-> "Qatar: "+Util.getFaker().random().nextInt(100,999))
.filter(i->Util.getFaker().random().nextBoolean());
}
package com.rp.sec09Batching;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class Lec01Buffer {
public static void main(String[] args) {
eventStream()
//.buffer(Duration.ofSeconds(2))
.bufferTimeout(5,Duration.ofSeconds(2))
// .buffer(3)
//.collectList()
.subscribe(Util.subscriber());
Util.sleep(30);
}
private static Flux<String> eventStream(){
return Flux.interval(Duration.ofMillis(400))
//.take(30)
.map(i->"event: "+i);
}
}
package com.rp.sec09Batching;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class Lec02OverlapDropped {
public static void main(String[] args) {
eventStream()
//.buffer(Duration.ofSeconds(2))
.buffer(3,2) //consecutive ->maxSize+1-skip
.subscribe(Util.subscriber());
Util.sleep(30);
}
private static Flux<String> eventStream(){
return Flux.interval(Duration.ofMillis(400))
//.take(30)
.map(i->"event: "+i);
}
}
package com.rp.sec09Batching;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class Lec04Window {
private static AtomicInteger atomicInteger=new AtomicInteger(1);
public static void main(String[] args) {
eventStream()
//.window(3)
.window(Duration.ofSeconds(2))
//.map(Lec04Window::saveEvents) //it wil give the mono again .
.flatMap(Lec04Window::saveEvents)
//.flatMap(stringFlux -> Flux.just(stringFlux))
.subscribe( Util.subscriber()); //subsrcibing parant publsiher.
Util.sleep(30);
}
private static Flux<String> eventStream(){
return Flux.interval(Duration.ofMillis(400))
.take(1)
.map(i->"event: "+i);
}
private static Mono<Integer> saveEvents(Flux<String>stringFlux){
return stringFlux
.doOnNext(e-> System.out.println("saving this:\t"+e))
.doOnComplete(()-> {
System.out.println("Saved this batch");
System.out.println(".............");
})
.then(getAtomicInteger()) //it will invoke once it will get onComplete signal
;
}
private static Mono<Integer> getAtomicInteger(){
System.out.println("Returning getAtomic integer");
return Mono.just(atomicInteger.getAndIncrement());
}
}
package com.rp.sec09Batching;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class Lec05Group {
public static void main(String[] args) {
Flux.range(1,30)
.delayElements(Duration.ofSeconds(1))
.groupBy(i->i%2) //key here key will be even and odd.
.subscribe(gf->process(gf,gf.key()));
Flux.range(1,30)
.delayElements(Duration.ofSeconds(1))
.groupBy(i->i%2+"a")
.flatMap(gf -> gf)
.subscribe();
Util.sleep(60);
}
private static void process(Flux<Integer> integerFlux,int key){
System.out.println("called>>>>>>");
//it will be called the no of key can be possible.Here it will be two.
integerFlux
//.map()
.subscribe(i-> System.out.println("Key\t:"+key+"value\t:"+i));
}
}
package com.rp.sec09Batching.assignment;
import com.github.javafaker.Book;
import com.rp.coureutil.Util;
import lombok.Getter;
import lombok.ToString;
@Getter
@ToString
public class BookOrder {
private String title;
private String auther;
private String category;
private double price;
public BookOrder(){
Book book=Util.getFaker().book();
this.title=book.title();
this.auther= book.author();
this.category=book.genre();
this.price=Double.parseDouble(Util.getFaker().commerce().price());
}
}
package com.rp.sec09Batching.assignment;
import com.rp.coureutil.Util;
import com.rp.sec05HotColdPublisher.PurchaseOrder;
import reactor.core.publisher.Flux;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
public class Lec07Assignment {
public static void main(String[] args) {
Map<String, Function<Flux<PurchaseOrder>,Flux<PurchaseOrder>>> map=new HashMap<>();
map.put("Kids",ProcessProduct.processKids());
map.put("Automotive",ProcessProduct.processAutomotive());
Set<String> keySet = map.keySet();
ProcessProduct.orderStream()
.filter(p->keySet.contains(p.getCategory()))
.groupBy(PurchaseOrder::getCategory)//two keys
.flatMap(gf -> map.get(gf.key()).apply(gf))
.subscribe(Util.subscriber());
Util.sleep(60);
}
}
package com.rp.sec09Batching.assignment;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
public class MainClass {
public static void main(String[] args) {
Set<String> allowedCategory=new HashSet<String>(Arrays.asList("Science Fiction","Fantasy","Suspense/Thriller"));
bookStream()
.filter(bookOrder -> allowedCategory.contains(bookOrder.getCategory()))
.buffer(Duration.ofSeconds(5))
.map(MainClass::revenuCalculator)
.subscribe(Util.subscriber("Generated revenue:\t"));
Util.sleep(60);
}
private static Flux<BookOrder> bookStream(){
return Flux.interval(Duration.ofMillis(200))
.map(i-> new BookOrder());
}
private static RevenueReport revenuCalculator(List<BookOrder>bookOrders){
Map<String, Double> map = bookOrders.stream()
.collect(Collectors.groupingBy(
BookOrder::getCategory,
Collectors.summingDouble(BookOrder::getPrice)
));
return new RevenueReport(map);
}
}
package com.rp.sec09Batching.assignment;
import com.rp.coureutil.Util;
import com.rp.sec05HotColdPublisher.PurchaseOrder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
public class ProcessProduct {
private static List<String> allowedCategory= Arrays.asList("Kids","Automotive");
private static double kidsDiscount=0.5;
private static double tax=0.1;
public static void main(String[] args) {
groupOrder()
.flatMap(ProcessProduct::processOrder)
.subscribe(Util.subscriber("Processed order is:\t"));
Util.sleep(30);
}
public static Function<Flux<PurchaseOrder>, Flux<PurchaseOrder>> processAutomotive(){
return flux->flux
.doOnNext(p->p.setPrice(1.1*p.getPrice()))
.doOnNext(p->p.setItem("{{: "+p.getItem()+":}}"))
;
}
public static Function<Flux<PurchaseOrder>, Flux<PurchaseOrder>> processKids(){
return flux->flux
.doOnNext(p->p.setPrice(0.5*p.getPrice()))
.flatMap(p->Flux.concat(Flux.just(p),getFreeKidsOrder()))
;
}
private static Flux<PurchaseOrder> processOrder(GroupedFlux<Object,PurchaseOrder> flux){
return flux
.flatMap(purchaseOrder -> {
System.out.println(">>>>>>>>>."+purchaseOrder);
double price=purchaseOrder.getPrice();
if(purchaseOrder.getCategory().equals("Kids")){
purchaseOrder.setPrice(price*kidsDiscount);
return Flux.just(purchaseOrder).concatWith(Flux.just(freeOrder()));
}else {
purchaseOrder.setPrice(price+price*tax);
return Flux.just(purchaseOrder);
}
});
}
private static Flux<GroupedFlux<Object,PurchaseOrder>> groupOrder(){
return orderStream()
.filter(order->allowedCategory.contains(order.getCategory()))
.groupBy(order->order.getCategory())
;
}
public static Flux<PurchaseOrder> orderStream(){
return Flux.interval(Duration.ofSeconds(1))
.map(i->new PurchaseOrder());
}
private static PurchaseOrder freeOrder(){
PurchaseOrder purchaseOrder= new PurchaseOrder();
purchaseOrder.setCategory("Free");
purchaseOrder.setPrice(0.0);
return purchaseOrder;
}
private static Mono<PurchaseOrder> getFreeKidsOrder(){
return Mono.fromSupplier(()->{
PurchaseOrder purchaseOrder=new PurchaseOrder();
purchaseOrder.setPrice(0.0);
purchaseOrder.setItem("Free Product: "+purchaseOrder.getItem());
purchaseOrder.setCategory("Kids");
return purchaseOrder;
});
}
}
package com.rp.sec09Batching.assignment;
import lombok.ToString;
import java.time.LocalDateTime;
import java.util.Map;
@ToString
public class RevenueReport {
private LocalDateTime localDateTime=LocalDateTime.now();
private Map<String,Double> revenue;
public RevenueReport(Map<String,Double> revenue){
this.revenue=revenue;
}
}
package com.rp.sec10RetryRepeat;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicInteger;
public class Lec01Repeat {
private static AtomicInteger atomicInteger=new AtomicInteger(1);
public static void main(String[] args) {
integerFlux()
//.repeat(2)//repeat two times so total times producer will produce is three
.repeat(()->atomicInteger.get()<20)//repeat two times so total times producer will produce is three
.subscribe(Util.subscriber());
}
private static Flux<Integer> integerFlux(){
return Flux.range(1,3)
.doOnSubscribe(s-> System.out.println("---Subscribed"))
.doOnComplete(()-> System.out.println("---completed"))
.map(i->atomicInteger.getAndIncrement())
//.map(i->i/0)
;
}
}
package com.rp.sec10RetryRepeat;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicInteger;
public class Lec02Retry {
private static AtomicInteger atomicInteger=new AtomicInteger(1);
public static void main(String[] args) {
integerFlux()
//.repeat(2)//repeat two times so total times producer will produce is three
.retry(2)
.subscribe(Util.subscriber());
}
private static Flux<Integer> integerFlux(){
return Flux.range(1,30)
.doOnSubscribe(s-> System.out.println("---Subscribed"))
.doOnComplete(()-> System.out.println("---completed"))
.map(i->atomicInteger.getAndIncrement())
.map(i->i/(Util.getFaker().random().nextInt(1,10)>6?0:1))
.doOnError(i-> System.out.println("error: "+i))
;
}
}
package com.rp.sec10RetryRepeat;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class Lec03RetryWhen {
private static AtomicInteger atomicInteger=new AtomicInteger(1);
public static void main(String[] args) {
integerFlux()
//.repeat(2)//repeat two times so total times producer will produce is three
.retryWhen(Retry.fixedDelay(1, Duration.ofSeconds(10)))
.subscribe(Util.subscriber());
Util.sleep(10);
}
private static Flux<Integer> integerFlux(){
return Flux.range(1,30)
.doOnSubscribe(s-> System.out.println("---Subscribed"))
.doOnComplete(()-> System.out.println("---completed"))
.map(i->atomicInteger.getAndIncrement())
.map(i->i/(Util.getFaker().random().nextInt(1,10)>6?0:1))
.doOnError(i-> System.out.println("error: "+i))
;
}
}
package com.rp.sec10RetryRepeat;
import com.rp.coureutil.Util;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
public class Lec04RetryWhenAdvanced {
public static void main(String[] args) {
orderService(Util.getFaker().business().creditCardNumber())
.doOnError(e-> System.out.println(e.getMessage()))
.retryWhen(Retry.from(
flux->flux
.doOnNext(rs->{
System.out.println(rs.totalRetries());
System.out.println(rs.failure());
})
.handle((retrySignal, synchronousSink) -> {
if(retrySignal.failure().getMessage().equals("500")){
synchronousSink.next(1);
}else{
synchronousSink.complete();
}
})
.delayElements(Duration.ofMillis(1000))
))
.subscribe(Util.subscriber());
Util.sleep(20);
}
//order service
private static Mono<String> orderService(String ccNumber){
return Mono.fromSupplier(()->{
processPayment();
return Util.getFaker().idNumber().valid();
});
}
//payment service
private static void processPayment(){
int random= Util.getFaker().random().nextInt(1,10);
if(random<8){
throw new RuntimeException("500");
}else if(random<10){
throw new RuntimeException("400");
}
}
}
package com.rp.sec11SinkTelecast;
import com.rp.coureutil.Util;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
public class Lec01SinkOne {
public static void main(String[] args) {
//mono of one value
Sinks.One<Object> sink = Sinks.one();
//sink.tryEmitValue("Hi");
/* sink.emitValue("Hey",(signalType, emitResult) -> {
System.out.println(signalType.name());
System.out.println(emitResult.name());
return false;
});
sink.emitValue("Hey",(signalType, emitResult) -> {
System.out.println(signalType.name());
System.out.println(emitResult.name());
return false; //if it is true it will retry and if it again get failed than this loop will keep going
});*/
Mono<Object> mono = sink.asMono();
mono.subscribe(Util.subscriber("Sumit"));
mono.subscribe(Util.subscriber("Sam"));
sink.tryEmitValue("Hello");
//sink.tryEmitEmpty();
}
}
package com.rp.sec11SinkTelecast;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
public class Lec02SinkUnicast {
public static void main(String[] args) {
//hankde through which we will push items. It is exposed to publisher
Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();
//handle through which subscriber will receive data. It is exposed to subscriber.
Flux<Object> flux = sink.asFlux();
flux.subscribe(Util.subscriber("sumit"));
flux.subscribe(Util.subscriber("sam"));//it will receive the error
sink.tryEmitNext("Hi Sumit");
sink.tryEmitNext("How are you ");
sink.tryEmitNext("??");
}
}
package com.rp.sec11SinkTelecast;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class Lec03SinkThreadSafety {
public static void main(String[] args) {
//hankde through which we will push items. It is exposed to publisher
Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();
//handle through which subscriber will receive data. It is exposed to subscriber.
Flux<Object> flux = sink.asFlux();
List<Object> list=new ArrayList<>();
flux.subscribe(list::add);
/*for (int i = 0; i < 1000; i++) {
final int j=i;
//Multiple thread will run and try to emit data in the sink.If it is thread safe than it size will be 1000
//It is thread safe.
CompletableFuture.runAsync(()->{
sink.tryEmitNext(j);
});
}*/
for (int i = 0; i < 1000; i++) {
final int j=i;
//Multiple thread will run and try to emit data in the sink.
CompletableFuture.runAsync(()->{
sink.emitNext(j,(s,e)->true);
});
}
Util.sleep(3);
System.out.println(list.size()); //by default sink is thread safe.
}
}
package com.rp.sec11SinkTelecast;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
public class Lec04MulticastSink {
public static void main(String[] args) {
//hankde through which we will push items. It is exposed to publisher
Sinks.Many<Object> sink = Sinks.many().multicast().onBackpressureBuffer();
//Sinks.Many<Object> sink = Sinks.many().multicast().directAllOrNothing();
//handle through which subscriber will receive data. It is exposed to subscriber.
Flux<Object> flux = sink.asFlux();
//flux.subscribe(Util.subscriber("sumit"));
sink.tryEmitNext("Hi Sumit");
sink.tryEmitNext("How are you ");
flux.subscribe(Util.subscriber("sumit"));
flux.subscribe(Util.subscriber("sam"));
sink.tryEmitNext("??");
}
}
package com.rp.sec11SinkTelecast;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.time.Duration;
public class Lec05SinkMultiDirectAll {
public static void main(String[] args) {
//hankde through which we will push items. It is exposed to publisher
Sinks.Many<Object> sink = Sinks.many().multicast().directBestEffort();
System.setProperty("reactor.bufferSize.small", "16");
//handle through which subscriber will receive data. It is exposed to subscriber.
Flux<Object> flux = sink.asFlux();
flux.subscribe(Util.subscriber("sumit"));//it is fast
flux.delayElements(Duration.ofMillis(100))
.subscribe(Util.subscriber("sam"));//it is slow
for (int i = 0; i < 100; i++) {
sink.tryEmitNext(i);
}
Util.sleep(5);
}
}
package com.rp.sec11SinkTelecast;
import com.rp.coureutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
public class Lec05SinkReply {
public static void main(String[] args) {
//hankde through which we will push items. It is exposed to publisher
Sinks.Many<Object> sink = Sinks.many().replay().all();
//Sinks.Many<Object> sink = Sinks.many().multicast().directAllOrNothing();
//handle through which subscriber will receive data. It is exposed to subscriber.
Flux<Object> flux = sink.asFlux();
//flux.subscribe(Util.subscriber("sumit"));
sink.tryEmitNext("Hi Sumit");
sink.tryEmitNext("How are you ");
flux.subscribe(Util.subscriber("sumit: "));
flux.subscribe(Util.subscriber("sam: "));
sink.tryEmitNext("??");
flux.subscribe(Util.subscriber("Jake: "));
sink.tryEmitNext("New message ");
}
}
package com.rp.sec11SinkTelecast.assignment;
import lombok.Data;
import lombok.ToString;
@ToString
@Data
public class Member {
private String name;
private String role;
public Member(String name){
this.name=name;
}
public void subscribe(){
}
}
package com.rp.sec11SinkTelecast.assignment;
import com.rp.coureutil.Util;
public class PublishMessage {
public static void main(String[] args) {
SlackRoom slackRoom=new SlackRoom("Offshore Meeting");
Member sumit=new Member("Sumit");
Member jack=new Member("Jack");
slackRoom
.joinRoom(sumit);
slackRoom.sendMessage(sumit,"Hii everyone");
Member sam=new Member("sam");
slackRoom.joinRoom(sam);
slackRoom.joinRoom(jack);
slackRoom.sendMessage(sam,"nice to see you all");
slackRoom.sendMessage(sumit,"welcome sam");
Util.sleep(20);
}
}
package com.rp.sec11SinkTelecast.assignment;
import com.rp.coureutil.Util;
public class SlackDemo {
public static void main(String[] args) {
SlackRoom2 slackRoom=new SlackRoom2("Reactor");
SlackMember sumit=new SlackMember("Sumit");
SlackMember sam=new SlackMember("Sam");
SlackMember jack=new SlackMember("Jack");
slackRoom.joinRoom(sumit);
slackRoom.joinRoom(sam);
sumit.says("Welcome All");
Util.sleep(2);
sam.says("Good Morning");
Util.sleep(2);
Util.sleep(2);
slackRoom.joinRoom(jack);
jack.says("Nice to meet you");
}
}
package com.rp.sec11SinkTelecast.assignment;
import java.util.function.Consumer;
public class SlackMember {
private String name;
private Consumer<String> consumer;
public SlackMember(String name){
this.name=name;
}
public String getName() {
return name;
}
public void receive(String message){
System.out.println(message);
}
public void says(String message){
consumer.accept(message);
}
public void setConsumer(Consumer<String> consumer) {
this.consumer = consumer;
}
}
package com.rp.sec11SinkTelecast.assignment;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class SlackMessage {
private static final String FORMAT="[%s->%s] : %s";
private String sender;
private String receiver;
private String message;
public String toString(){
return String.format(FORMAT,this.sender,this.receiver,this.message);
}
}
package com.rp.sec11SinkTelecast.assignment;
import lombok.Data;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@Data
public class SlackRoom {
private Sinks.Many<Message> sink;
private static Sinks.Many<Message> sink2=null;
private Flux<Message> flux;
private static List<String> members=new ArrayList<>();
private String name;
public SlackRoom(String name){
this.name=name;
this.sink=Sinks.many().multicast().directAllOrNothing();
this.flux=sink.asFlux();
}
public void joinRoom(Member member){
this.flux
.filter(message -> !member.getName().equals(message.getName()))
.subscribe(message->
System.out.println(member.getName()+" Received:\t From :"+message.getName().toUpperCase(Locale.ROOT)
+"-> "+message.getMessage())
);
}
/*public static Flux<Message> joinRoom(String name){
*//*if(sink==null){
sink=Sinks.many().multicast().directAllOrNothing();
}
System.out.println(name+ "\tjoined the room successfully");
members.add(name);
if(flux==null){
flux=sink.asFlux();
}
return flux;*//*
}*/
public void sendMessage(Member member,String msg){
sink.tryEmitNext(new Message(member.getName(),msg));
}
}
@Data
class Message{
private String name;
private String message;
Message(String name,String message){
this.name=name;
this.message=message;
}
}
package com.rp.sec11SinkTelecast.assignment;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
public class SlackRoom2 {
private String name;
private Sinks.Many<SlackMessage> sink;
private Flux<SlackMessage> flux;
public SlackRoom2(String name){
this.name=name;
this.sink=Sinks.many().replay().all();
this.flux=sink.asFlux();
}
public void joinRoom(SlackMember member){
System.out.println(member.getName()+".......... joined......: "+this.name);
this.subscribe(member);
member.setConsumer(
msg->this.postMessage(msg,member)
);
}
private void postMessage(String msg,SlackMember member){
SlackMessage message=new SlackMessage();
message.setMessage(msg);
message.setSender(member.getName());
this.sink.tryEmitNext(message);
}
private void subscribe(SlackMember member){
this.flux
.filter(msg->!msg.getSender().equals(member.getName()))
.doOnNext(sm->sm.setReceiver(member.getName()))
.map(SlackMessage::toString)
.subscribe(member::receive);
}
}
package com.rp.sec12Context;
import com.rp.coureutil.Util;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import java.util.Locale;
public class Lec01Context {
public static void main(String[] args) {
getWelcomeMessage()
//can update context
.contextWrite(ctx->ctx.put("user",ctx.get("user").toString().toUpperCase(Locale.ROOT)))
.contextWrite(Context.of("user","sumit"))
.subscribe(Util.subscriber());
}
private static Mono<String> getWelcomeMessage(){
return Mono.deferContextual(contextView ->{
if(contextView.hasKey("user")){
return Mono.just("Welcome: "+contextView.get("user"));
}
return Mono.error(new RuntimeException("Unathenticated"));
});
}
}
package com.rp.sec12Context;
import com.rp.coureutil.Util;
import com.rp.sec12Context.helper.BookService;
import com.rp.sec12Context.helper.UserService;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
public class Lec02RateLimiterDemo {
public static void main(String[] args) {
BookService bookService=new BookService();
UserService userService=new UserService();
Mono<String> mono = BookService.getBook()
.contextWrite(ctx -> UserService.userCategoryContext().apply(ctx));
mono
.repeat(3)
.contextWrite(Context.of("user","sam"))
.subscribe(Util.subscriber("..."));
}
}
package com.rp.sec12Context.helper;
import com.rp.coureutil.Util;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
public class BookService {
private static Map<String,Integer> map=new HashMap<>();
static {
map.put("std",2);
map.put("prime",3);
}
public static Mono<String> getBook(){
return Mono.deferContextual(ctx->{
if(ctx.get("allow")){
return Mono.just(Util.getFaker().book().title());
}
return Mono.error(new RuntimeException("not-allowed"));
})
.contextWrite(rateLimiterContect());
}
private static Function<Context,Context> rateLimiterContect(){
return ctx->{
if(ctx.hasKey("category")){
String category = ctx.get("category").toString();
Integer allowedAttemts=map.get(category);
if(allowedAttemts>0){
map.put(category,allowedAttemts-1);
return ctx.put("allow",true);
}
}
return ctx.put("allow",false);
};
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment