Commit 8c10364c authored by vikram singh's avatar vikram singh

added the reactive stream examples using java 9

parent 5dab544a
** Java 9 Feature ** #Java 9 Feature
1.ReactiveStreams
2.DiamondOperator ReactiveStreams
3.FactoryMethodsForUnmodifiableCollections DiamondOperator
4.PrivateMethods FactoryMethodsForUnmodifiableCollections
5.ProcessAPI PrivateMethods
6.StreamApI ProcessAPI
7.TryWithResourcesEnhancements StreamApI
\ No newline at end of file TryWithResourcesEnhancements
Java 9 is providing standard of the Reactive programming
___Publisher
Subscriber
Processor
Subscription___
asynchronous
coldpublisher
drop
error
hotpublisher
rxjavaflowable
slowsubscriber
submissionpublisher
synchronous
...@@ -11,8 +11,12 @@ repositories { ...@@ -11,8 +11,12 @@ repositories {
mavenCentral() mavenCentral()
} }
dependencies { dependencies {
compileOnly 'org.projectlombok:lombok:1.18.12' compileOnly 'org.projectlombok:lombok:1.18.12'
compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.16'
testCompile group: 'junit', name: 'junit', version: '4.12' testCompile group: 'junit', name: 'junit', version: '4.12'
} }
package com.nisum.java9Features.DiamondOperator; package com.nisum.java9Features.diamondoperator;
import java.util.logging.Logger; import java.util.logging.Logger;
......
package com.nisum.java9Features.DiamondOperator; package com.nisum.java9Features.diamondoperator;
public class Test { public class Test {
public static void main(String[] args) { public static void main(String[] args) {
......
package com.nisum.java9Features.FactoryMethodsForUnmodifiableCollections; package com.nisum.java9Features.factorymethodsforunmodifiablecollections;
public class Employee { public class Employee {
private int eno; private int eno;
......
package com.nisum.java9Features.FactoryMethodsForUnmodifiableCollections; package com.nisum.java9Features.factorymethodsforunmodifiablecollections;
import java.util.List; import java.util.List;
......
package com.nisum.java9Features.FactoryMethodsForUnmodifiableCollections; package com.nisum.java9Features.factorymethodsforunmodifiablecollections;
......
package com.nisum.java9Features.FactoryMethodsForUnmodifiableCollections; package com.nisum.java9Features.factorymethodsforunmodifiablecollections;
import java.util.Set; import java.util.Set;
......
package com.nisum.java9Features.PrivateMethods; package com.nisum.java9Features.privatemethod;
import java.util.logging.Logger; import java.util.logging.Logger;
......
package com.nisum.java9Features.PrivateMethods; package com.nisum.java9Features.privatemethod;
import java.util.logging.Logger; import java.util.logging.Logger;
......
package com.nisum.java9Features.PrivateMethods; package com.nisum.java9Features.privatemethod;
public class PrivateStaticMethod implements Java9InterfStatic { public class PrivateStaticMethod implements Java9InterfStatic {
public static void main(String[] args) public static void main(String[] args)
......
package com.nisum.java9Features.ProcessAPI; package com.nisum.java9Features.processAPI;
import java.util.logging.Logger; import java.util.logging.Logger;
......
package com.nisum.java9Features.ProcessAPI; package com.nisum.java9Features.processAPI;
import lombok.extern.slf4j.Slf4j;
import java.util.logging.Logger; import java.util.logging.Logger;
......
package com.nisum.java9Features.ProcessAPI; package com.nisum.java9Features.processAPI;
public class NotepadFromProcessBuilder { public class NotepadFromProcessBuilder {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
......
package com.nisum.java9Features.ProcessAPI; package com.nisum.java9Features.processAPI;
import java.util.Optional; import java.util.Optional;
......
package com.nisum.java9Features.ProcessAPI; package com.nisum.java9Features.processAPI;
import java.util.logging.Logger; import java.util.logging.Logger;
......
package com.nisum.java9Features.reactiveapi.asynchronous;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.schedulers.Schedulers;
/**
* creating asynchronous subscriber
* by putting delay
* Here back pressure is handled using BackpressureStrategy.BUFFER
*/
public class Sample {
public static void main(String[] args) {
Flowable.<Integer>create(emitter -> emit(emitter), BackpressureStrategy.BUFFER)
.map(data -> data * 1.0)
.filter(data -> data > 4)
.observeOn(Schedulers.io())
.subscribe(Sample::printIt,
err -> System.out.println("ERROR: " + err),
() -> System.out.println("DONE"));
}
private static void printIt(Double value) throws InterruptedException {
System.out.println(value + " -- " + Thread.currentThread());
Thread.sleep(1000);
}
private static void emit(FlowableEmitter<Integer> emitter) throws InterruptedException {
int count = 0;
while(count < 20) {
System.out.println("emitting " + count + " --" + Thread.currentThread());
emitter.onNext(count++);
Thread.sleep(500);
}
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.coldpublisher;
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;
/**
* Cold publisher will have defferent different session for different subscribers;
*/
public class Sample {
public static void main(String[] args) throws InterruptedException {
Flowable<Long> interval =
Flowable.interval(1, TimeUnit.SECONDS);
interval.subscribe(data -> printMessage("s1:" + data));
Thread.sleep(5000);
interval.subscribe(data -> printMessage("s2:" + data));
Thread.sleep(10000);
}
private static void printMessage(String message) {
System.out.println(message);
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.drop;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.schedulers.Schedulers;
/**
* BackpressureStrategy.DROP
* here all the previous data produced by the publisher is droped.
*
*/
public class Sample {
public static void main(String[] args) {
Flowable.<Integer>create(emitter -> emit(emitter), BackpressureStrategy.DROP)
.map(data -> data * 1.0)
.filter(data -> data > 4)
.observeOn(Schedulers.io(), false, 2)
.subscribe(Sample::printIt,
err -> System.out.println("ERROR: " + err),
() -> System.out.println("DONE"));
}
private static void printIt(Double value) throws InterruptedException {
System.out.println(value + " -- " + Thread.currentThread());
Thread.sleep(2000);
}
private static void emit(FlowableEmitter<Integer> emitter) throws InterruptedException {
int count = 0;
while(count < 20) {
System.out.println("emitting " + count + " --" + Thread.currentThread());
emitter.onNext(count++);
Thread.sleep(500);
}
System.out.println("DONE emitting");
Thread.sleep(10000);
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.error;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
/**
* handling error using the reactively;
* Exceptions and the functional programming are mutually exclusive;
* exceptions are imparative programming idea
* so in reactive api it deals it with downstream;
*
*/
public class Sample {
public static void main(String[] args) {
Flowable.<Integer>create(emitter -> emit(emitter), BackpressureStrategy.BUFFER)
.map(data -> data * 1.0)
.filter(data -> data > 4)
.subscribe(System.out::println,
err -> System.out.println("ERROR: " + err),
() -> System.out.println("DONE"));
}
private static void emit(FlowableEmitter<Integer> emitter) throws InterruptedException {
int count = 0;
while(count < 20) {
emitter.onNext(count++);
if(count == 7) throw new RuntimeException("something went wrong");
Thread.sleep(500);
}
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.hotpublisher;
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;
/**
* hot publisher uses to share the subscription amoung all the subscriber;
* Any subscriber joining late will miss some amount of data;
*/
public class Sample {
public static void main(String[] args) throws InterruptedException {
Flowable<Long> interval =
Flowable.interval(1, TimeUnit.SECONDS)
.share();
interval.subscribe(data -> printMessage("s1:" + data));
Thread.sleep(5000);
interval.subscribe(data -> printMessage("s2:" + data));
Thread.sleep(10000);
}
private static void printMessage(String message) {
System.out.println(message);
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.latest;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.schedulers.Schedulers;
/**
*
* BackpressureStrategy.LATES
* It is one of the backpressure strategy to handle the flow of the data;
* It gives the latest data to the subscriber rather if any subscriber subscribes;
*/
public class Sample {
public static void main(String[] args) {
Flowable.<Integer>create(emitter -> emit(emitter), BackpressureStrategy.LATEST)
.map(data -> data * 1.0)
.filter(data -> data > 4)
.observeOn(Schedulers.io(), false, 2)
.subscribe(Sample::printIt,
err -> System.out.println("ERROR: " + err),
() -> System.out.println("DONE"));
}
private static void printIt(Double value) throws InterruptedException {
System.out.println(value + " -- " + Thread.currentThread());
Thread.sleep(2000);
}
private static void emit(FlowableEmitter<Integer> emitter) throws InterruptedException {
int count = 0;
while(count < 20) {
System.out.println("emitting " + count + " --" + Thread.currentThread());
emitter.onNext(count++);
Thread.sleep(500);
}
System.out.println("DONE emitting");
Thread.sleep(10000);
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.rxjavaflowable;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
public class Sample {
public static void main(String[] args) {
Flowable.<Integer>create(emitter -> emit(emitter), BackpressureStrategy.BUFFER)
.map(data -> data * 1.0)
.filter(data -> data > 4)
.subscribe(System.out::println);
}
private static void emit(FlowableEmitter<Integer> emitter) throws InterruptedException {
int count = 0;
while(count < 20) {
emitter.onNext(count++);
Thread.sleep(500);
}
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.slowsubscriber;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
public class Sample {
public static void main(String[] args) {
Flowable.<Integer>create(emitter -> emit(emitter), BackpressureStrategy.BUFFER)
.map(data -> data * 1.0)
.filter(data -> data > 4)
.subscribe(Sample::printIt,
err -> System.out.println("ERROR: " + err),
() -> System.out.println("DONE"));
}
private static void printIt(Double value) throws InterruptedException {
System.out.println(value + " -- " + Thread.currentThread());
Thread.sleep(1000);
}
private static void emit(FlowableEmitter<Integer> emitter) throws InterruptedException {
int count = 0;
while(count < 20) {
System.out.println("emitting " + count + " --" + Thread.currentThread());
emitter.onNext(count++);
Thread.sleep(500);
}
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.submissionpublisher;
import java.util.concurrent.Flow;
/**
* creating the subscriber implementing the Flow.Subscriber given by jdk 9
* here we are maintaining the flow of data as well using request method of Subscription Interface;;
*/
public class PrintResultSubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("subscribed...");
this.subscription = subscription;
subscription.request(5);
}
@Override
public void onNext(String data) {
System.out.println(data);
subscription.request(1);
if(data.equals("CoronaPatient9")) subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.submissionpublisher;
import java.util.concurrent.SubmissionPublisher;
/**
*Use of SubmissionPublisher provided by java 9 Flow API
* it provides submit method to publish the data
* It is implementing the Flow.Publisher
* x
*/
public class SampleUserOfSubmissionPublisher {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>();
PrintResultSubscriber subscriber = new PrintResultSubscriber();
publisher.subscribe(subscriber);
int count = 0;
while(count < 20) {
Thread.sleep(500);
if(!publisher.hasSubscribers()) {
System.out.println("Subscription is removed: as we are getting more than nine corona patients in hyderabd");
break;
}
String res="CoronaPatient"+count++;
publisher.submit(res);
}
Thread.sleep(2000);
}
}
\ No newline at end of file
package com.nisum.java9Features.reactiveapi.synchronous;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
//sync by default
public class Sample {
public static void main(String[] args) {
Flowable.<Integer>create(emitter -> emit(emitter), BackpressureStrategy.BUFFER)
.map(data -> data * 1.0)
.filter(data -> data > 4)
.subscribe(Sample::printIt,
err -> System.out.println("ERROR: " + err),
() -> System.out.println("DONE"));
}
private static void printIt(Double value) {
System.out.println(value + " -- " + Thread.currentThread());
}
private static void emit(FlowableEmitter<Integer> emitter) throws InterruptedException {
int count = 0;
while(count < 20) {
System.out.println("emitting " + count + " --" + Thread.currentThread());
emitter.onNext(count++);
Thread.sleep(500);
}
}
}
\ No newline at end of file
package com.nisum.java9Features.ReactiveStreams; package com.nisum.java9Features.reactivestreams;
import java.util.ArrayList; import java.util.ArrayList;
......
package com.nisum.java9Features.ReactiveStreams; package com.nisum.java9Features.reactivestreams;
public class Employee { public class Employee {
private int id; private int id;
......
package com.nisum.java9Features.ReactiveStreams; package com.nisum.java9Features.reactivestreams;
import java.util.List; import java.util.List;
......
package com.nisum.java9Features.ReactiveStreams; package com.nisum.java9Features.reactivestreams;
import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscriber;
......
package com.nisum.java9Features.StreamApi; package com.nisum.java9Features.streamapi;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -11,7 +11,7 @@ public class TestDropWhile { ...@@ -11,7 +11,7 @@ public class TestDropWhile {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
public static void main(String[] args) { public static void main(String[] args) {
ArrayList<Integer> l1 = new ArrayList<Integer>(); List<Integer> l1 = new ArrayList<Integer>();
l1.add(2); l1.add(2);
l1.add(4); l1.add(4);
l1.add(1); l1.add(1);
......
package com.nisum.java9Features.StreamApi; package com.nisum.java9Features.streamapi;
import java.util.logging.Logger; import java.util.logging.Logger;
...@@ -8,7 +8,6 @@ public class TestIterate { ...@@ -8,7 +8,6 @@ public class TestIterate {
private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); private final static Logger log =Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
public static void main(String[] args) { public static void main(String[] args) {
//Stream.iterate(1, x->x+1).forEach(System.out::println); // return infinite elements
Stream.iterate(1, x->x+1).limit(5).forEach(System.out::println); Stream.iterate(1, x->x+1).limit(5).forEach(System.out::println);
log.info("3argument iterate"); log.info("3argument iterate");
Stream.iterate(1,x->x<10, x->x+1).forEach(System.out::println); Stream.iterate(1,x->x<10, x->x+1).forEach(System.out::println);
......
package com.nisum.java9Features.StreamApi; package com.nisum.java9Features.streamapi;
import java.util.ArrayList; import java.util.ArrayList;
......
package com.nisum.java9Features.StreamApi; package com.nisum.java9Features.streamapi;
import java.util.ArrayList; import java.util.ArrayList;
......
package com.nisum.java9Features.TryWithResourcesEnhancements; package com.nisum.java9Features.trywithresourcesenhancements;
import java.util.logging.Logger; import java.util.logging.Logger;
......
package com.nisum.java9Features.TryWithResourcesEnhancements; package com.nisum.java9Features.trywithresourcesenhancements;
import lombok.extern.flogger.Flogger;
import lombok.extern.slf4j.Slf4j;
import java.util.logging.Logger; import java.util.logging.Logger;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment