Commit 5258eac4 authored by vikram singh's avatar vikram singh

changed the project from maven to gradle and also added logger removed unnecessary comments

parents
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
### VS Code ###
.vscode/
plugins {
id 'org.springframework.boot' version '2.2.6.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
group = 'com.nisum.webflux'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.apache.kafka:kafka-streams'
implementation 'io.projectreactor.kafka:reactor-kafka'
implementation 'org.springframework.boot:spring-boot-starter-aop'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
annotationProcessor 'org.projectlombok:lombok'
testImplementation'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
test {
useJUnitPlatform()
}
#Fri Apr 03 20:10:06 IST 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-all.zip
#!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn () {
echo "$*"
}
die () {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=`expr $i + 1`
done
case $i in
0) set -- ;;
1) set -- "$args0" ;;
2) set -- "$args0" "$args1" ;;
3) set -- "$args0" "$args1" "$args2" ;;
4) set -- "$args0" "$args1" "$args2" "$args3" ;;
5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
exec "$JAVACMD" "$@"
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto init
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:init
@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2
:win9xME_args_slurp
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
rootProject.name = 'reactor-api-mongo-kafka'
package com.nisum.webflux;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ReactorApiMongoKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(ReactorApiMongoKafkaApplication.class, args);
}
}
package com.nisum.webflux.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Component
@Configuration
public class KafkaConsumerConfig {
@Value("${com.kafka.topic}")
private String topic;
@Autowired
private KafkaProperties kafkaProperties;
@Bean("simpleConsumer")
public KafkaReceiver<String,String> simpleConsumer() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton(topic)); //topic name
KafkaReceiver<String,String> receiver = KafkaReceiver.create(receiverOptions);
return receiver;
}
}
package com.nisum.webflux.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import java.util.HashMap;
import java.util.Map;
/**
* Return a kafka sender that represent config for each kafka producer.
* @author Alessandro Pio Ardizio
*/
@Configuration
@Component
public class KafkaProducerConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean("simpleProducer")
public KafkaSender<String,String> simpleProducer(){
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String,String> senderOptions = SenderOptions.<String,String>create(configs).maxInFlight(1024);
return KafkaSender.create(senderOptions);
}
}
\ No newline at end of file
package com.nisum.webflux.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaServer {
@Autowired
KafkaTemplate<String, String> template;
@Value("${com.kafka.topic}")
private String topic;
public String send(String message) {
template.send(topic,message);
return message;
}
}
package com.nisum.webflux.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
public class WebClientConfig {
@Bean
@Qualifier("webclient")
@Scope("prototype")
public WebClient.Builder getWebClient() {
return WebClient.builder();
}
}
package com.nisum.webflux.controller;
  • This controller can be cleaned because no where used in client application. Or is there any reason to use this.

Please register or sign in to reply
import com.nisum.webflux.model.Employee;
import com.nisum.webflux.service.IEmployeeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
public class EmployeeController {
@Autowired
IEmployeeService employeeService;
@PostMapping("/employee")
public Mono<Employee> save(@RequestBody
Employee employee) {
System.out.println("employee obj ##################################################"+employee);
return employeeService.save(employee);
}
@GetMapping("/employee")
public Flux<Employee> getAll(){
return employeeService.getAllEmployee();
}
@GetMapping("/employee/byName")
public Flux<Employee> findByName(@RequestParam
String name){
return employeeService.findByName(name);
}
@GetMapping("/employee/byCity")
public Flux<Employee> findByCity(@RequestParam
String city){
return employeeService.findByCity(city);
}
@GetMapping("/employee/byHobby")
public Flux<Employee> findByHobbyName(@RequestParam
String hobbyName){
return employeeService.findByHobbyName(hobbyName);
}
@GetMapping("/employee/byNameAndHobby")
public Flux<Employee> findByNameAndHobbyName(@RequestParam
String name, @RequestParam
String hobbyName){
return employeeService.findByNameAndHobbyName(name,hobbyName);
}
@GetMapping("/employee/byNameAndCityAndHobby")
public Flux<Employee> findByNameAndCityAndHobby(@RequestParam
String name, @RequestParam
String cityName, @RequestParam
String hobbyName){
return employeeService.findByNameAndCityAndHobby(name,cityName,hobbyName);
}
}
package com.nisum.webflux.controller;
import com.nisum.webflux.model.Fruit;
import com.nisum.webflux.service.IFruitService;
import com.nisum.webflux.service.impl.OperationService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import java.util.List;
import java.util.function.Function;
@RestController
public class FruitController {
@Value("${com.kafka.topic}")
private String topic;
@Autowired
IFruitService fruitService;
Please register or sign in to reply
@Autowired
OperationService operationService;
Please register or sign in to reply
@Qualifier("simpleProducer")
@Autowired
KafkaSender<String,String> employeeSender;
Please register or sign in to reply
@Qualifier("simpleConsumer")
@Autowired
KafkaReceiver<String, String> receiver;
Please register or sign in to reply
@PostMapping("/fruit")
public Mono<Fruit> save(@RequestBody
Fruit fruit){
return Mono.just(fruit)
.flatMap(operationService::operate)
.map(operationService::doNothing)
.flatMap(fruitService::save)
.flatMapMany(saveToKafka()).collectList().flatMap(getMono(fruit))
.flatMap(operationService::revert)
.onErrorResume(operationService::getErrorResponse);
}
@PostMapping("/saveTokafka")
public Mono<Fruit> normalSave(@RequestBody
Fruit fruit){
return fruitService.save(fruit).flatMapMany(h -> {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, null, h.getId(),
h.toString());
Mono<SenderRecord<String, String, String>> mono = Mono.just(SenderRecord.create(record, null));
return employeeSender.send(mono);
}).collectList().flatMap(m -> Mono.just(fruit));
}
private Function<Fruit, Flux<SenderResult<String>>> saveToKafka() {
return h -> {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, null, ((Fruit) h).getId(),
h.toString());
Mono<SenderRecord<String, String, String>> mono = Mono.just(SenderRecord.create(record, null));
return employeeSender.send(mono);
};
}
private Function< List<SenderResult<String>>, Mono<Fruit>> getMono(Fruit fruit) {
return m -> Mono.just(fruit);
}
@GetMapping("/getAll")
public Flux<Fruit> getAll(){
return fruitService.getAll();
}
@GetMapping("/getAllByWebClient")
public Flux<Fruit> getAllByWebClient(){
return fruitService.getFruitsByWebClient();
}
@GetMapping("/saveFruitByWebClient")
public Mono<Fruit> saveFruitsByWebClient() {
return fruitService.saveFruitsByWebClient();
}
@PostMapping("/save")
public Mono<Fruit> savetoMongo(@RequestBody
Fruit fruit) {
return fruitService.save(fruit);
}
@GetMapping("/getFruitById/{id}")
public Mono<Fruit> getFruitById(@PathVariable
String id) {
return fruitService.getFruitById(id);
}
@GetMapping("/getFruitByPriceAndName/{price}")
public Mono<Fruit> getFruitByPriceAndName(@PathVariable
String price , @RequestParam
String name) {
return fruitService.getFruitByPriceAndName(price,name);
}
}
package com.nisum.webflux.controller;
import com.nisum.webflux.config.KafkaServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class SampleController {
@Autowired
KafkaServer kafkaServer;
Please register or sign in to reply
@GetMapping("/send")
public String producer(@RequestParam
String msg) {
log.info("into Controller");
return kafkaServer.send(msg);
}
}
package com.nisum.webflux.model;
import lombok.*;
import java.util.List;
@Data
@Setter
@Getter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class Address {
private String addressId;
private String zipCode;
private List<City> city;
}
package com.nisum.webflux.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class City {
private String cityId;
private String cityName;
}
package com.nisum.webflux.model;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.List;
@Setter
@Getter
@ToString
//@ToString(exclude= {"id","dob"})
@NoArgsConstructor
@AllArgsConstructor
@Document
public class Employee {
@Id
private String id;
private String name;
private List<Address> address;
private List<Hobby> hobbies;
}
package com.nisum.webflux.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@Document
@AllArgsConstructor
@NoArgsConstructor
public class Fruit {
@Id
private String id;
private String name;
private String price;
}
package com.nisum.webflux.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Hobby {
@Id
private String hobbyId;
private String hobbyName;
}
package com.nisum.webflux.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.mongodb.core.mapping.Document;
@Document
@Data
@NoArgsConstructor
public class Product {
private String id;
private String name;
private String price;
}
package com.nisum.webflux.repository;
import com.nisum.webflux.model.Employee;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
@Repository
public interface EmployeeRepo extends ReactiveMongoRepository<Employee, String> {
Flux<Employee> findByName(String name);
@Query("{'address.city.cityName': ?0}")
Flux<Employee> findByCity(String city);
@Query("{'hobbies.hobbyName': ?0}")
Flux<Employee> findbyHobbyName(String hobbyName);
@Query("{'name': ?0 ,'hobbys.hobbyName': ?1}")
Flux<Employee> findByNameAndHobbyName(String name, String hobbyName);
@Query("{'name': ?0 , 'address.city.cityName': ?1 , 'hobbies.hobbyName': ?2}")
Flux<Employee> findByNameAndCityAndHobby(String name, String cityName, String hobbyName);
}
package com.nisum.webflux.repository;
import com.nisum.webflux.model.Fruit;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
@Repository
public interface FruitRepository extends ReactiveMongoRepository<Fruit, String> {
Mono<Fruit> findByNameAndPrice(String e, String price);
}
package com.nisum.webflux.repository;
import com.nisum.webflux.model.Hobby;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface HobbyRepo extends ReactiveMongoRepository<Hobby, String> {
}
package com.nisum.webflux.service;
import com.nisum.webflux.model.Employee;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface IEmployeeService {
Mono<Employee> save(Employee employee);
Flux<Employee> getAllEmployee();
Flux<Employee> findByName(String name);
Flux<Employee> findByCity(String city);
Flux<Employee> findByHobbyName(String hobbyName);
Flux<Employee> findByNameAndHobbyName(String hobbyName, String hobbyName2);
Flux<Employee> findByNameAndCityAndHobby(String name, String cityName, String hobbyName);
}
package com.nisum.webflux.service;
import com.nisum.webflux.model.Fruit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface IFruitService {
Mono<Fruit> save(Fruit fruit);
Flux<Fruit> getAll();
Flux<Fruit> getFruitsByWebClient();
Mono<Fruit> saveFruitsByWebClient();
Mono<Fruit> getFruitById(String id);
Mono<Fruit> getFruitByPriceAndName(String price, String name);
}
package com.nisum.webflux.service.impl;
import com.nisum.webflux.model.Address;
import com.nisum.webflux.model.City;
import com.nisum.webflux.model.Employee;
import com.nisum.webflux.model.Hobby;
import com.nisum.webflux.repository.EmployeeRepo;
import com.nisum.webflux.service.IEmployeeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Service
@Slf4j
public class EmployeeService implements IEmployeeService{
@Autowired
EmployeeRepo empRepo;
@Override
public Mono<Employee> save(Employee employee) {
log.info("Into save method");
return empRepo.save(employee).onErrorResume(e->{
log.info("into Error "+e);
return getMockEmployee();
});
}
private Mono<Employee> getMockEmployee() {
List<Hobby> hobbyObj=new ArrayList<>();
hobbyObj.add(new Hobby("1","cricket"));
List<City> cities=Arrays.asList(new City("1","patna"));
List<Address> address =Arrays.asList(new Address("1","bhr",cities));
return Mono.just(new Employee("2","servicename",address,hobbyObj));
}
@Override
public Flux<Employee> getAllEmployee() {
return empRepo.findAll().onErrorResume(e->{
log.info("into error for getAllEmployee");
return Flux.just(getMockEmployee().block());});
}
@Override
public Flux<Employee> findByName(String name) {
return empRepo.findByName(name);
}
@Override
public Flux<Employee> findByCity(String city) {
return empRepo.findByCity(city);
}
@Override
public Flux<Employee> findByHobbyName(String hobbyName) {
return empRepo.findbyHobbyName(hobbyName);
}
@Override
public Flux<Employee> findByNameAndHobbyName(String name, String hobbyName) {
return empRepo.findByNameAndHobbyName(name, hobbyName);
}
@Override
public Flux<Employee> findByNameAndCityAndHobby(String name, String cityName, String hobbyName) {
return empRepo.findByNameAndCityAndHobby(name,cityName,hobbyName);
}
}
package com.nisum.webflux.service.impl;
import com.nisum.webflux.model.Fruit;
import com.nisum.webflux.repository.FruitRepository;
import com.nisum.webflux.service.IFruitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@Service
@Slf4j
public class FruitService implements IFruitService {
private final WebClient webClient;
@Autowired
public FruitService( @Qualifier("webclient") final WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.build();
}
@Autowired
FruitRepository fruitRepository;
Please register or sign in to reply
@Override
public Mono<Fruit> save(Fruit fruit) {
return fruitRepository.save(fruit);
}
public Mono<Fruit> get(Fruit fruit) {
return fruitRepository.findAll().collect(Collectors.maxBy(Comparator.comparing(e->Integer.parseInt(e.getPrice())))).map(Optional::get);
}
@Override
public Flux<Fruit> getFruitsByWebClient() {
return this.webClient.get().uri("http://localhost:8080/getAll").retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse-> clientResponse.bodyToMono(Map.class).flatMap(e->{
log.error(" 4XX error is occurred ");
return Mono.error(new Exception());
}))
.onStatus(HttpStatus::is5xxServerError, serverResponse->serverResponse.bodyToMono(Map.class).flatMap(e->{
log.error("there is 5XX error ");
return Mono.error(new Exception());
}))
.bodyToFlux(Fruit.class);
}
@Override
public Mono<Fruit> saveFruitsByWebClient() {
Fruit fruit=new Fruit();
fruit.setId("45");
fruit.setName("PineApple");
fruit.setPrice("70");
return this.webClient.post().uri("http://localhost:8080/save")
.body(BodyInserters.fromObject(fruit))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse-> clientResponse.bodyToMono(Map.class).flatMap(e->{
log.error(" 4XX error is occurred ");
return Mono.error(new Exception());
}))
.onStatus(HttpStatus::is5xxServerError, serverResponse->serverResponse.bodyToMono(Map.class).flatMap(e->{
log.error("there is 5XX error ");
return Mono.error(new Exception());
}))
.bodyToMono(Fruit.class);
}
@Override
public Flux<Fruit> getAll() {
return fruitRepository.findAll();
}
@Override
public Mono<Fruit> getFruitById(String id) {
return Optional.ofNullable(id).map(fruitRepository::findById).get();
}
@Override
public Mono<Fruit> getFruitByPriceAndName(String price, String name) {
return Optional.ofNullable(name).map(e->fruitRepository.findByNameAndPrice(e,price)).get();
}
}
package com.nisum.webflux.service.impl;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
@Aspect
public class Logging {
@Pointcut("execution(* com.nisum.*.*(..))")
private void selectAll(){}
@Before("selectAll()")
public void beforeAdvice(){
System.out.println("Going to Call desired Method");
}
}
package com.nisum.webflux.service.impl;
import com.nisum.webflux.model.Fruit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@Slf4j
public class NameService {
private String preName;
public Mono<Fruit> name(Fruit fruit){
return Mono.just(fruit).map(e->{
this.preName=e.getName(); e.setName(preName+"+pro");
return e;
}).onErrorResume(e->{
log.error("Error occurred during changing name"+e);
return Mono.just(new Fruit("0","",""));
});
}
public Mono<Fruit> revertName(Fruit fruit){
return Mono.just(fruit).map(e->{
e.setName(preName);
return e;
}).onErrorResume(e->{
log.error("Error occurred during changing name"+e);
return Mono.just(new Fruit("0","",""));
});
}
}
package com.nisum.webflux.service.impl;
import com.nisum.webflux.model.Fruit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@Slf4j
public class OperationService {
@Autowired
NameService nameService;
public Mono<Fruit> operate(Fruit fruit) {
log.info("Into operate method of OperationService" + fruit);
return Mono.just(fruit).flatMap(nameService::name).onErrorResume(e -> {
log.error("Error occurred during changing name");
return Mono.just(new Fruit("0", "", ""));
}).map(e -> {
e.setPrice(Integer.parseInt(e.getPrice()) + 1000 + "");
return e;
});
}
public Mono<Fruit> revert(Fruit fruit) {
log.info("into revertMethod of OperationService" + fruit);
return Mono.just(fruit).flatMap(nameService::revertName).onErrorResume(e -> {
log.error("Error occurred during changing name");
return Mono.just(new Fruit("0", "", ""));
}).map(e -> {
e.setPrice(Integer.parseInt(e.getPrice()) - 1000 + "");
return e;
});
}
public Mono<Fruit> getErrorResponse(Throwable th) {
log.error("SomeThing Error has happened ::" + th);
return Mono.just(new Fruit("1", "ErrorFruit", "3"));
}
public Fruit doNothing(Fruit fruit) {
log.info("into doNothing method of OperationService " + fruit);
return fruit;
}
}
com:
kafka:
topic: test
spring:
data:
mongodb:
database: EmployeeDB
port: 27017
host: localhost
kafka:
bootstrap:
servers: localhost:9092
log4j.rootLogger=INFO, A1
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
# Print the date in ISO 8601 format
#log4j.appender.A1.layout.ConversionPattern=%d %-5p - %m%n
log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
log4j.logger.net.tekstenenuitleg=DEBUG
\ No newline at end of file
package com.nisum.webflux.basicOperators;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
//link:http://www.vinsguru.com/reactive-programming-schedulers/
/**
* Schedulers provides various Scheduler flavors usable by publishOn or subscribeOn :
* • parallel(): Optimized for fast Runnable non-blocking executions
* • single(): Optimized for low-latency Runnable one-off executions
* • elastic(): Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) can grow indefinitely
* • boundedElastic(): Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) is capped
* • immediate(): to immediately run submitted Runnable instead of scheduling them (somewhat of a no-op or "null object" Scheduler)
*/
public class AdvancedConcept_Schedular_test {
// SimplePublisher
public static void main(String args[]) {
simplePublisher();
/**
* If you notice the above output of simplePublisher carefully, for each subscription (cold subscription), our Publisher publishes values. The map and consumption operations are getting executed in the respective threads where subscription happens. This is the default behavior.
*
* However Reactor provides an easy way to switch the task execution in the reactive chain using below methods.
*
* publishOn
* subscribeOn
*
* PublishOn:
* PublishOn accepts a Scheduler which changes the task execution context for the operations in the downstream. (for all the operations or until another PublishOn switches the context in the chain). Lets take a look at the below examples.
*/
//Scheulers.immediate():To keep the execution in the current thread.
publisherOn_with_immidiateScheduler();
//Schedulers.single():
//A single reusable thread. When we use this thread, all the operations of the reactive chain are executed using this thread by all the callers.
publisherOn_with_singleScheduler();
/**
* Schedulers.newSingle():
* Same as above. But a dedicated single thread just for the caller.
*/
publisherOn_with_newSingleScheduler();
/**
* Schedulers.elastic():
* This is a thread pool with unlimited threads which is no longer preferred. So DO NOT USE this option.
*/
/**
* Schedulers.boundedElastic():
* This is a preferred one instead of above elastic. This thread pool contains 10 * number of CPU cores you have. Good choice for IO operations.
*/
publisherOn_with_boundedElastic_Scheduler();
/**
* Schedulers.parallel():
* A fixed pool of workers that is tuned for parallel work. It creates as many workers as you have CPU cores.
*/
publisherOn_with_parallel_Scheduler();
/**
* Multiple PublishOn Methods:
* parallel/boundedElastic_Schedular
*/
multiplePublishOnMethods();
/**
* SubscribeOn:
* SubscribeOn method affects the context of the source emission. That is, as we had said earlier, nothing happens in the reactive chain until we subscribe! Once subscribed, the pipeline is getting executed by default on the thread which subscribed.
* When the publishOn method is encountered, it switches the context for the downstream operations. But the source which is the Flux / Mono / or any publisher, is always executed on the current thread which subscribed.
* This SubscribeOn method will change the behavior.
*
*/
multiplePublishOnMethodsUsingSubscribeOn();
}
private static void simplePublisher() {
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
//create a runnable with flux subscription
Runnable r = getRunnable(flux);
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
private static void publisherOn_with_immidiateScheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.immediate())
.map(i -> {
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
//create a runnable with flux subscription
Runnable r = getRunnable(flux);
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
private static void publisherOn_with_singleScheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.single())
.map(i -> {
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
//create a runnable with flux subscription
Runnable r = getRunnable(flux);
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
private static void publisherOn_with_newSingleScheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.newSingle("newScheduler"));
//create a runnable with flux subscription
Runnable r = getRunnable(flux);
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
private static void publisherOn_with_boundedElastic_Scheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.boundedElastic());
//create a runnable with flux subscription
Runnable r = getRunnable(flux);
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
private static void publisherOn_with_parallel_Scheduler() {
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.parallel());
//create a runnable with flux subscription
Runnable r = getRunnable(flux);
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
private static void multiplePublishOnMethods(){
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
System.out.println("Mapping one for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("Mapping two for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.parallel())
.map(i -> {
System.out.println("Mapping three for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
Runnable r=getRunnable(flux);
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
private static void multiplePublishOnMethodsUsingSubscribeOn(){
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
System.out.println("Mapping one for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("Mapping two for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.parallel())
.map(i -> {
System.out.println("Mapping three for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
//creating runnable subscriber;
Runnable r = () -> flux
.subscribeOn(Schedulers.single())
.subscribe(s -> {
System.out.println("Received " + s + " via " + Thread.currentThread().getName());
});
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
}
private static Runnable getRunnable(Flux<Integer> flux) {
return () -> flux.subscribe(s -> {
System.out.println("Received " + s + " via " + Thread.currentThread().getName());
});
}
}
package com.nisum.webflux.basicOperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
//check java controllers also to understand it more clearly;
public class Basic_Flux_Mono_Test {
/**
* flux with error
*
* public final Disposable subscribe(
* @Nullable Consumer<? super T> consumer,
* @Nullable Consumer<? super Throwable> errorConsumer,
* @Nullable Runnable completeConsumer)
*
* Subscribe method is having cunsumer ,erroCunsumer, and completeConsumer, and one more field subscription
* is there;
* It returns Disposable:It means the spacified flux can be cancelled or disposed
*/
@Test
public void fluxTest() {
Flux<String> stringFlux = Flux.just("V", "Spring Boot", "Reactive Spring")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.concatWith(Flux.just("After Error"))
.log();
stringFlux
.subscribe(System.out::println,
(e) -> System.err.println("Exception is -> " + e)
, () -> System.out.println("Completed"));
}
@Test
public void fluxTestElements_WithoutError() {
Flux<String> stringFlux = Flux.just("Spring", "Spring Boot", "Reactive Spring")
.log();
StepVerifier.create(stringFlux)
.expectNext("Spring")
.expectNext("Spring Boot")
.expectNext("Reactive Spring")
.verifyComplete();
}
@Test
public void fluxTestElements_WithError() {
Flux<String> stringFlux = Flux.just("Spring", "Spring Boot", "Reactive Spring")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.log();
StepVerifier.create(stringFlux)
.expectNext("Spring")
.expectNext("Spring Boot")
.expectNext("Reactive Spring")
//.expectError(RuntimeException.class)
.expectErrorMessage("Exception Occurred")
.verify();
}
/**
*
*/
@Test
public void fluxTestElements_WithError1() {
Flux<String> stringFlux = Flux.just("A", "B", "C")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.log();
StepVerifier.create(stringFlux)
.expectNext("A","B","C")
.expectErrorMessage("Exception Occurred")
.verify();
}
/**
* use of stepverifier
*/
@Test
public void fluxTestElementsCount_WithError() {
Flux<String> stringFlux = Flux.just("Spring", "Spring Boot", "Reactive Spring")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.log();
StepVerifier.create(stringFlux)
.expectNextCount(3)
.expectErrorMessage("Exception Occurred")
.verify();
}
@Test
public void monoTest(){
Mono<String> stringMono = Mono.just("Spring");
StepVerifier.create(stringMono.log())
.expectNext("Spring")
.verifyComplete();
}
/**
* stepverifier for mono having error
*/
@Test
public void monoTest_Error(){
StepVerifier.create(Mono.error(new RuntimeException("Exception Occurred")).log())
.expectError(RuntimeException.class)
.verify();
}
}
package com.nisum.webflux.basicOperators;
import org.junit.Test;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import java.time.Duration;
/**
* Examples on Cold and hot publisher
* reference:::: http://www.vinsguru.com/reactive-programming-publisher-types-cold-vs-hot/
*/
public class ColdAndHotPublisherTest {
/**
* cold publishers generates new data every time a new subscriber subscribes to them.
* Publishers by default do not produce any value unless at least 1 observer subscribes to it.
* Publishers create new data producers for each new subscription.
*
* creating the cold publisher
* @throws InterruptedException
*/
@Test
public void coldPublisherTest() throws InterruptedException {
Flux<String> stringFlux = Flux.just("A","B","C","D","E","F")
.delayElements(Duration.ofSeconds(1));
stringFlux.subscribe(s -> System.out.println("Subscriber 1 : " + s)); //emits the value from beginning
Thread.sleep(2000);
stringFlux.subscribe(s -> System.out.println("Subscriber 2 : " + s));//emits the value from beginning
Thread.sleep(4000);
}
/**
* creating hotpubisher
* Hot Publishers do not create new data producer for each new subscription (as the Cold Publisher does).
* Instead there will be only one data producer and all the observers listen to the data produced by the single data producer.
* So all the observers get the same data.
* subscriber 2 is missing some of the content as hot publisher is not waiting for the subscriber
* @throws InterruptedException
*/
@Test
public void hotPublisherTest() throws InterruptedException {
Flux<String> stringFlux = Flux.just("A","B","C","D","E","F")
.delayElements(Duration.ofSeconds(1));
ConnectableFlux<String> connectableFlux = stringFlux.publish();
connectableFlux.connect();
connectableFlux.subscribe(s -> System.out.println("Subscriber 1 : " + s));
Thread.sleep(3000);
connectableFlux.subscribe(s -> System.out.println("Subscriber 2 : " + s)); // does not emit the values from beginning
Thread.sleep(4000);
}
}
package com.nisum.webflux.basicOperators;
/**
* Custom class to handle the exception exteding throwable
*/
public class CustomException extends Throwable {
private String message;
@Override
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public CustomException(Throwable e) {
this.message = e.getMessage();
}
}
package com.nisum.webflux.basicOperators;
import org.junit.Test;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
/**
* link : https://www.reactiveprogramming.be/project-reactor-backpressure/
* Understanding BackPressure concept
* subscriber can tell the publisher that what amount of data it needs.
* also subscriber can cancel request;
*
* 1.request
* 2.cancel
*/
public class FluxAndMonoBackPressureTest {
/**
* verifies the backPressure
*/
@Test
public void backPressureTest() {
Flux<Integer> finiteFlux = Flux.range(1, 10)
.log();
StepVerifier.create(finiteFlux)
.expectSubscription()
.thenRequest(1)
.expectNext(1)
.thenRequest(1)
.expectNext(2)
.thenCancel()
.verify();
}
/**
* handling backbressure at the subscribe method itself as a forth argument
*
*/
@Test
public void backPressure() {
Flux<Integer> finiteFlux = Flux.range(1, 10)
.log();
finiteFlux.subscribe((element) -> System.out.println("Element is : " + element)
, (e) -> System.err.println("Exception is : " + e)
, () -> System.out.println("Done")
, (subscription -> subscription.request(2)));
}
/**
* cancel method used by subscriber to cancel the subscription;
*/
@Test
public void backPressure_cancel() {
Flux<Integer> finiteFlux = Flux.range(1, 10)
.log();
finiteFlux.subscribe((element) -> System.out.println("Element is : " + element)
, (e) -> System.err.println("Exception is : " + e)
, () -> System.out.println("Done")
, (subscription -> subscription.cancel()));
}
/**
* implementation of customized backPressure overriding hookOnNext
* method of BaseSubscriber abstract class
*/
@Test
public void customized_backPressure() {
Flux<Integer> finiteFlux = Flux.range(1, 10)
.log();
finiteFlux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value) {
request(1);
System.out.println("Value received is : " + value);
if(value == 4){
cancel();
}
}
});
}
}
package com.nisum.webflux.basicOperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.Duration;
/**
* link: https://www.baeldung.com/reactor-combine-streams
* combining two or more flux/mono;
*/
public class FluxAndMonoCombineTest {
/**
* Combine two or more flux using merge method;
* showing result as a,b,c,d,e,f
*/
@Test
public void combineUsingMerge(){
Flux<String> flux1 = Flux.just("A","B","C");
Flux<String> flux2 = Flux.just("D","E","F");
Flux<String> mergedFlux = Flux.merge(flux1,flux2);
StepVerifier.create(mergedFlux.log())
.expectSubscription()
.expectNext("A","B","C","D","E","F")
.verifyComplete();
}
/**
* adding delay to she the exact behaviour of the merge method
* showing result as a,d,b,e,f,c
*/
@Test
public void combineUsingMerge_withDelay(){
Flux<String> flux1 = Flux.just("A","B","C").delayElements(Duration.ofSeconds(1));
Flux<String> flux2 = Flux.just("D","E","F").delayElements(Duration.ofSeconds(1));;
Flux<String> mergedFlux = Flux.merge(flux1,flux2);
StepVerifier.create(mergedFlux.log())
.expectSubscription()
.expectNextCount(6)
//.expectNext("A","B","C","D","E","F")
.verifyComplete();
}
/**
* combine using concat method of the flux
*
*/
@Test
public void combineUsingConcat(){
Flux<String> flux1 = Flux.just("A","B","C");
Flux<String> flux2 = Flux.just("D","E","F");
Flux<String> mergedFlux = Flux.concat(flux1,flux2);
StepVerifier.create(mergedFlux.log())
.expectSubscription()
.expectNext("A","B","C","D","E","F")
.verifyComplete();
}
/**
* combine using concat method of the flux
* after adding delay also it will concat first and the second flux;
* will show the result as A,B,C,D,E,F
*/
@Test
public void combineUsingConcat_withDelay(){
Flux<String> flux1 = Flux.just("A","B","C").delayElements(Duration.ofSeconds(1));
Flux<String> flux2 = Flux.just("D","E","F").delayElements(Duration.ofSeconds(1));
Flux<String> mergedFlux = Flux.concat(flux1,flux2);
StepVerifier.create(mergedFlux.log())
.expectSubscription()
.expectNextCount(6)
.verifyComplete();
}
/**
* combine using Zip method
* we can add custom function in zip method agrument
* here we have more control on the flux;
*/
@Test
public void combineUsingZip(){
Flux<String> flux1 = Flux.just("A","B","C");
Flux<String> flux2 = Flux.just("D","E","F");
Flux<String> mergedFlux = Flux.zip(flux1,flux2, (t1, t2) -> {
return t1.concat(t2); // AD, BE, CF
}); //A,D : B,E : C:F
StepVerifier.create(mergedFlux.log())
.expectSubscription()
.expectNext("AD", "BE", "CF")
.verifyComplete();
}
}
package com.nisum.webflux.basicOperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.Duration;
/**
*
* check java controllers also to understand it more clearly;
* handling error reactor api
* understanding error operators
*/
public class FluxAndMonoErrorTest {
/**
*
* onErrorResume method
* this block will execute if error event is occured .
* it return the default/flux or mono function added by programmer;
*/
@Test
public void fluxErrorHandling() {
Flux<String> stringFlux = Flux.just("A", "B", "C")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.concatWith(Flux.just("D"))
.onErrorResume((e) -> { // this block gets executed
System.out.println("Exception is : " + e);
return Flux.just("default", "default1");
});
StepVerifier.create(stringFlux.log())
.expectSubscription()
.expectNext("A", "B", "C")
//.expectError(RuntimeException.class)
//.verify();
.expectNext("default", "default1")
.verifyComplete();
}
/**
* here it take value as input and returns a flux;
* as it internally uses the onErrorResume method
*/
@Test
public void fluxErrorHandling_OnErrorReturn() {
Flux<String> stringFlux = Flux.just("A", "B", "C")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.concatWith(Flux.just("D"))
.onErrorReturn("default");
StepVerifier.create(stringFlux.log())
.expectSubscription()
.expectNext("A", "B", "C")
.expectNext("default")
.verifyComplete();
}
/**
* we can call the onErrorMap
* if we are getting exception and
* there we want to call any custom exception class
*/
@Test
public void fluxErrorHandling_OnErrorMap() {
Flux<String> stringFlux = Flux.just("A", "B", "C")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.concatWith(Flux.just("D"))
.onErrorMap((e) -> new CustomException(e));
StepVerifier.create(stringFlux.log())
.expectSubscription()
.expectNext("A", "B", "C")
.expectError(CustomException.class)
.verify();
}
/**
* retry() method
* giving the user a chance to try 2 more time
*/
@Test
public void fluxErrorHandling_OnErrorMap_withRetry() {
Flux<String> stringFlux = Flux.just("A", "B", "C")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.concatWith(Flux.just("D"))
.onErrorMap((e) -> new CustomException(e))
.retry(2);
StepVerifier.create(stringFlux.log())
.expectSubscription()
.expectNext("A", "B", "C")
.expectNext("A", "B", "C")
.expectNext("A", "B", "C")
.expectError(CustomException.class)
.verify();
}
/**
* use of retryBackoff
* adding timestamp in retry
*/
@Test
public void fluxErrorHandling_OnErrorMap_withRetryBackoff() {
Flux<String> stringFlux = Flux.just("A", "B", "C")
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.concatWith(Flux.just("D"))
.onErrorMap((e) -> new CustomException(e))
.retryBackoff(2, Duration.ofSeconds(5));
StepVerifier.create(stringFlux.log())
.expectSubscription()
.expectNext("A", "B", "C")
.expectNext("A", "B", "C")
.expectNext("A", "B", "C")
.expectError(IllegalStateException.class)
.verify();
}
}
package com.nisum.webflux.basicOperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
/**
* check java controllers/services also to understand it more clearly;
* some basic operators of Flux and mono
*/
public class FluxAndMonoFactoryTest {
List<String> names = Arrays.asList("adam","anna","jack","jenny");
/**
* fromIterable to get flux of collections
*/
@Test
public void fluxUsingIterable(){
Flux<String> namesFlux = Flux.fromIterable(names)
.log();
StepVerifier.create(namesFlux)
.expectNext("adam","anna","jack","jenny")
.verifyComplete();
}
/**
* fromArray
*/
@Test
public void fluxUsingArray(){
String[] names = new String[]{"adam","anna","jack","jenny"};
Flux<String> namesFlux = Flux.fromArray(names);
StepVerifier.create(namesFlux)
.expectNext("adam","anna","jack","jenny")
.verifyComplete();
}
/**
* fromStream for converting into flux from any stream;
*/
@Test
public void fluxUsingStream(){
Flux<String> namesFlux = Flux.fromStream(names.stream());
StepVerifier.create(namesFlux)
.expectNext("adam","anna","jack","jenny")
.verifyComplete();
}
/**
* justOrEmpty
*/
@Test
public void monoUsingJustOrEmpty(){
Mono<String> mono = Mono.justOrEmpty(null); //Mono.Empty();
StepVerifier.create(mono.log())
.verifyComplete();
}
/**
* fromSupplier of mono
* it takes the input as supplier
*/
@Test
public void monoUsingSupplier(){
Supplier<String> stringSupplier = () -> "adam";
Mono<String> stringMono = Mono.fromSupplier(stringSupplier);
System.out.println(stringSupplier.get());
StepVerifier.create(stringMono.log())
.expectNext("adam")
.verifyComplete();
}
/**
* Creating flux using range
* it takes the initial value and count of the input
*/
@Test
public void fluxUsingRange(){
Flux<Integer> integerFlux = Flux.range(1,5).log();
StepVerifier.create(integerFlux)
.expectNext(1,2,3,4,5)
.verifyComplete();
}
}
package com.nisum.webflux.basicOperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.List;
/**
* check java controllers also to understand it more clearly;
* Filter
* same as java 8 filter
* we can filter flux or mono
*/
public class FluxAndMonoFilterTest {
List<String> names = Arrays.asList("adam","anna","jack","jenny");
@Test
public void filterTest(){
Flux<String> namesFlux = Flux.fromIterable(names) // adam, anna, jack,jenny
.filter(s->s.startsWith("a"))
.log(); //adam,anna
StepVerifier.create(namesFlux)
.expectNext("adam","anna")
.verifyComplete();
}
@Test
public void filterTestLength(){
Flux<String> namesFlux = Flux.fromIterable(names) // adam, anna, jack,jenny
.filter(s->s.length() >4)
.log(); //jenny
StepVerifier.create(namesFlux)
.expectNext("jenny")
.verifyComplete();
}
}
package com.nisum.webflux.basicOperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.List;
import static reactor.core.scheduler.Schedulers.parallel;
/**
* check java controllers also to understand it more clearly;
* transformation operator
* same as java 8
* map,flatmap,filter,flatmapMany
*/
public class FluxAndMonoTransformTest {
List<String> names = Arrays.asList("adam", "anna", "jack", "jenny");
/**
* map uses
*/
@Test
public void transformUsingMap() {
Flux<String> namesFlux = Flux.fromIterable(names)
.map(s -> s.toUpperCase()) //ADAM, ANNA, JACK, JENNY
.log();
StepVerifier.create(namesFlux)
.expectNext("ADAM", "ANNA", "JACK", "JENNY")
.verifyComplete();
}
@Test
public void transformUsingMap_Length() {
Flux<Integer> namesFlux = Flux.fromIterable(names)
.map(s -> s.length()) //ADAM, ANNA, JACK, JENNY
.log();
StepVerifier.create(namesFlux)
.expectNext(4,4,4,5)
.verifyComplete();
}
/**
* map with repeat operator
*/
@Test
public void transformUsingMap_Length_repeat() {
Flux<Integer> namesFlux = Flux.fromIterable(names)
.map(s -> s.length()) //ADAM, ANNA, JACK, JENNY
.repeat(1)
.log();
StepVerifier.create(namesFlux)
.expectNext(4,4,4,5,4,4,4,5)
.verifyComplete();
}
/**
* filter and map
*/
@Test
public void transformUsingMap_Filter() {
Flux<String> namesFlux = Flux.fromIterable(names)
.filter(s -> s.length()>4)
.map(s -> s.toUpperCase()) // JENNY
.log();
StepVerifier.create(namesFlux)
.expectNext("JENNY")
.verifyComplete();
}
/**
* flatmap impl
* converting flux of list of data to
* flux of data using flatmap
* it flatten the data and then map;
*/
@Test
public void tranformUsingFlatMap(){
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("A","B","C","D","E","F")) // A, B, C, D, E, F
.flatMap(s -> {
return Flux.fromIterable(convertToList(s)); // A -> List[A, newValue] , B -> List[B, newValue]
})//db or external service call that returns a flux -> s -> Flux<String>
.log();
StepVerifier.create(stringFlux)
.expectNextCount(12)
.verifyComplete();
}
private List<String> convertToList(String s) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Arrays.asList(s, "newValue");
}
/**
* getting parallelism on flatmap;
*/
@Test
public void tranformUsingFlatMap_usingparallel(){
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("A","B","C","D","E","F")) // Flux<String>
.window(2) //Flux<Flux<String> -> (A,B), (C,D), (E,F)
.flatMap((s) ->
s.map(this::convertToList).subscribeOn(parallel())) // Flux<List<String>
.flatMap(s -> Flux.fromIterable(s)) //Flux<String>
.log();
StepVerifier.create(stringFlux)
.expectNextCount(12)
.verifyComplete();
}
/**
* flatMapSequential
* for using parallalism as well as maintaining the order of the data
*/
@Test
public void tranformUsingFlatMap_parallel_maintain_order(){
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("A","B","C","D","E","F")) // Flux<String>
.window(2) //Flux<Flux<String> -> (A,B), (C,D), (E,F)
/* .concatMap((s) ->
s.map(this::convertToList).`(parallel())) */// Flux<List<String>
.flatMapSequential((s) ->
s.map(this::convertToList).subscribeOn(parallel()))
.flatMap(s -> Flux.fromIterable(s)) //Flux<String>
.log();
StepVerifier.create(stringFlux)
.expectNextCount(12)
.verifyComplete();
}
}
package com.nisum.webflux.basicOperators;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.Duration;
/**
* Playing with flux with timestamp and delays;
*/
public class FluxAndMonoWithTimeTest {
/**
* uses
* to create infinite sequence of the flux;
* @throws InterruptedException
*/
@Test
public void infiniteSequence() throws InterruptedException {
Flux<Long> infiniteFlux = Flux.interval(Duration.ofMillis(100))
.log(); // starts from 0 --> ......
infiniteFlux
.subscribe((element) -> System.out.println("Value is : " + element));
Thread.sleep(3000);
}
/**
* use of flux with take method
* its like limit of the flux
* @throws InterruptedException
*/
@Test
public void infiniteSequenceTest() throws InterruptedException {
Flux<Long> finiteFlux = Flux.interval(Duration.ofMillis(100))
.take(3)
.log();
StepVerifier.create(finiteFlux)
.expectSubscription()
.expectNext(0L, 1L,2L)
.verifyComplete();
}
/**
* creating infinite sequence of flux and use of map and take operator
* @throws InterruptedException
*/
@Test
public void infiniteSequenceMap() throws InterruptedException {
Flux<Integer> finiteFlux = Flux.interval(Duration.ofMillis(100))
.map(l -> new Integer(l.intValue()))
.take(3)
.log();
StepVerifier.create(finiteFlux)
.expectSubscription()
.expectNext(0, 1,2)
.verifyComplete();
}
/**
* creating the flux with delay
*
* @throws InterruptedException
*/
@Test
public void infiniteSequenceMap_withDelay() throws InterruptedException {
Flux<Integer> finiteFlux = Flux.interval(Duration.ofMillis(100))
.delayElements(Duration.ofSeconds(1))
.map(l -> new Integer(l.intValue()))
.take(3)
.log();
StepVerifier.create(finiteFlux)
.expectSubscription()
.expectNext(0, 1,2)
.verifyComplete();
}
}
package com.nisum.webflux.controller;
import com.nisum.webflux.model.Address;
import com.nisum.webflux.model.City;
import com.nisum.webflux.model.Employee;
import com.nisum.webflux.model.Hobby;
import com.nisum.webflux.repository.EmployeeRepo;
import com.nisum.webflux.service.impl.TestConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Mockito.when;
@SpringBootTest
@RunWith(SpringRunner.class)
@AutoConfigureWebTestClient
@ContextConfiguration(classes = TestConfig.class)
/*
* Use of Mockito for component tests using webtestClient;
*/
public class EmployeeControllerComponentTest {
@Autowired
WebTestClient webTestClient;
@Autowired
EmployeeRepo empRepo;
@Before
public void setup() {
}
@Test
public void saveUsingWebTestClientAndMockito() {
ArgumentCaptor<Employee> requestCaptor = ArgumentCaptor.forClass(
Employee.class);
when(empRepo.save(requestCaptor.capture())).thenReturn(Mono.error(new RuntimeException()));
webTestClient.post().uri("/employee").contentType(MediaType.APPLICATION_JSON_UTF8).body(getMonEmployee(), Employee.class)
.exchange()
.expectStatus().isOk().expectBody().jsonPath("$.id").isEqualTo("2");
}
@Test
public void saveUsingWebTestClientAndMockitoAndStepVerifier() {
ArgumentCaptor<Employee> requestCaptor = ArgumentCaptor.forClass(
Employee.class);
when(empRepo.save(requestCaptor.capture())).thenReturn(Mono.error(new RuntimeException()));
Employee employees=webTestClient.post().uri("/employee").contentType(MediaType.APPLICATION_JSON_UTF8).body(getMonEmployee(), Employee.class)
.exchange()
.expectStatus().isOk().expectBody(Employee.class).returnResult().getResponseBody();
StepVerifier.create(Flux.just(employees)).expectSubscription()
.expectNextMatches(e-> e.getName().equals("servicename")
).verifyComplete();
}
@Test
public void getAll_UsingWebClientComponentTestCaseMockito() {
when(empRepo.findAll()).thenReturn(Flux.error(new RuntimeException()));
webTestClient.get().uri("/employee").exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBodyList(Employee.class)
.hasSize(1);
//hasSize(7);
}
@Test
public void getAll_UsingWebClientUsingStepVerifier() {
when(empRepo.findAll()).thenReturn(Flux.error(new RuntimeException()));
Flux<Employee> employeeFlux=webTestClient.get().uri("/employee").exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.returnResult(Employee.class)
.getResponseBody();
StepVerifier.create(employeeFlux.log()).expectSubscription()
.expectNextMatches(e->{
return e.getName().equals("servicename");
}).verifyComplete();
}
private Flux<Employee> getMockEmployee() {
List<Hobby> hobbyObj=new ArrayList<>();
hobbyObj.add(new Hobby("1","cricket"));
List<City> cities=Arrays.asList(new City("1","patna"));
List<Address> address =Arrays.asList(new Address("1","bhr",cities));
return Flux.just(new Employee("1","abc",address,hobbyObj));
}
private Mono<Employee> getMonEmployee() {
List<Hobby> hobbyObj=new ArrayList<>();
hobbyObj.add(new Hobby("1","cricket"));
List<City> cities=Arrays.asList(new City("1","patna"));
List<Address> address =Arrays.asList(new Address("1","bhr",cities));
return Mono.just(new Employee("1","testName",address,hobbyObj));
}
}
package com.nisum.webflux.controller;
import com.nisum.webflux.model.Address;
import com.nisum.webflux.model.City;
import com.nisum.webflux.model.Employee;
import com.nisum.webflux.model.Hobby;
import com.nisum.webflux.service.IEmployeeService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
@SpringBootTest
@RunWith(SpringRunner.class)
@AutoConfigureWebTestClient
public class EmployeeControllerTest {
@Autowired
WebTestClient webTestClient;
Please register or sign in to reply
//// @MockBean
// @Autowired
// EmployeeRepo empRepo;
@InjectMocks
EmployeeController employeeController;
Please register or sign in to reply
@Mock
private IEmployeeService employeeService;
@Before
public void setup() {
}
@Test
public void save_UsingWebTestClient() {
webTestClient.post().uri("/employee").contentType(MediaType.APPLICATION_JSON).body(Mono.just(getMockEmployee().blockFirst()), Employee.class)
.exchange()
.expectStatus().isOk().expectBody().jsonPath("$.id").isEqualTo("1");
// .jsonPath("$.description").isEqualTo("xx").jsonPath("$.price").isEqualTo(34.4);
}
@Test
public void getAll_UsingWebClientComponentTestCase() {
webTestClient.get().uri("/employee").exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBodyList(Employee.class)
.hasSize(1);
}
@Test
public void getAll_UsingWebClientUsingStepVerifier() {
Flux<Employee> employeeFlux=webTestClient.get().uri("/employee").exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.returnResult(Employee.class)
.getResponseBody();
StepVerifier.create(employeeFlux.log()).expectSubscription()
.expectNextCount(1).verifyComplete();
}
@Test
public void getAll_UsingMockito(){
Flux<Employee> employeeFlux=getMockEmployee();
when(employeeService.getAllEmployee()).thenReturn(employeeFlux);
assertEquals(employeeController.getAll(),employeeFlux);
}
private Flux<Employee> getMockEmployee() {
List<Hobby> hobbyObj=new ArrayList<>();
hobbyObj.add(new Hobby("1","cricket"));
List<City> cities=Arrays.asList(new City("1","patna"));
List<Address> address =Arrays.asList(new Address("1","bhr",cities));
return Flux.just(new Employee("1","abc",address,hobbyObj));
}
}
package com.nisum.webflux.controller;
import com.nisum.webflux.model.Fruit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.BodyInserters;
@SpringBootTest
@RunWith(SpringRunner.class)
@AutoConfigureWebTestClient
//@ContextConfiguration(classes = TestConfig.class)
public class FruitControllerComponentTest {
@Autowired
WebTestClient webTestClient;
Please register or sign in to reply
@Test
public void save_UsingWebTestClient() {
Fruit fruit=new Fruit("1","apple","300");
webTestClient.post().uri("/fruit").contentType(MediaType.APPLICATION_JSON_UTF8).body(BodyInserters.fromObject(fruit))
.exchange()
.expectStatus().isOk().expectBody().jsonPath("$.id").isEqualTo("1");
}
}
package com.nisum.webflux.service.impl;
import com.nisum.webflux.model.Address;
import com.nisum.webflux.model.City;
import com.nisum.webflux.model.Employee;
import com.nisum.webflux.model.Hobby;
import com.nisum.webflux.repository.EmployeeRepo;
import com.nisum.webflux.service.impl.EmployeeService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Mockito.when;
@SpringBootTest
@RunWith(SpringRunner.class)
public class EmployeeServiceTest {
@Mock
EmployeeRepo empRepo;
@InjectMocks
EmployeeService employeeService;
@Test
public void getAllEmployeeTestUsingStepVerifier() {
Flux<Employee> empFlux=getFluxEmployee();
when(empRepo.findAll()).thenReturn(empFlux);
StepVerifier.create(employeeService.getAllEmployee()).expectSubscription()
.expectNextMatches(e->e.equals(empFlux.blockFirst()))
.verifyComplete();
}
@Test
public void saveUsingStepVerifier() {
Mono<Employee> empMono=getMonoEmployee();
Employee e=new Employee();
when(empRepo.save(e)).thenReturn(empMono);
StepVerifier.create(employeeService.save(e))
.expectNextCount(1)
.verifyComplete();
}
@Test
public void saveSVConsumeNextWith() {
Mono<Employee> empMono=getMonoEmployee();
Employee e=new Employee();
when(empRepo.save(e)).thenReturn(empMono);
StepVerifier.create(employeeService.save(e))
.consumeNextWith(res->res.equals(empMono.blockOptional().get())).verifyComplete();
}
private Mono<Employee> getMonoEmployee() {
List<Hobby> hobbyObj=new ArrayList<>();
hobbyObj.add(new Hobby("1","cricket"));
List<City> cities=Arrays.asList(new City("1","patna"));
List<Address> address =Arrays.asList(new Address("1","bhr",cities));
return Mono.just(new Employee("1","abc",address,hobbyObj));
}
private Flux<Employee> getFluxEmployee() {
List<Hobby> hobbyObj=new ArrayList<>();
hobbyObj.add(new Hobby("1","cricket"));
List<City> cities=Arrays.asList(new City("1","patna"));
List<Address> address =Arrays.asList(new Address("1","bhr",cities));
return Flux.just(new Employee("1","abc",address,hobbyObj));
}
}
package com.nisum.webflux.service.impl;
import com.nisum.webflux.repository.EmployeeRepo;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
@TestConfiguration
public class TestConfig {
@MockBean
EmployeeRepo empRepo;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment