Commit e4d83a56 authored by mrekkala's avatar mrekkala

webflux and kafka producer code changes

parents
/mvnw text eol=lf
*.cmd text eol=crlf
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
wrapperVersion=3.3.2
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.9/apache-maven-3.9.9-bin.zip
This diff is collapsed.
<# : batch portion
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Apache Maven Wrapper startup batch script, version 3.3.2
@REM
@REM Optional ENV vars
@REM MVNW_REPOURL - repo url base for downloading maven distribution
@REM MVNW_USERNAME/MVNW_PASSWORD - user and password for downloading maven
@REM MVNW_VERBOSE - true: enable verbose log; others: silence the output
@REM ----------------------------------------------------------------------------
@IF "%__MVNW_ARG0_NAME__%"=="" (SET __MVNW_ARG0_NAME__=%~nx0)
@SET __MVNW_CMD__=
@SET __MVNW_ERROR__=
@SET __MVNW_PSMODULEP_SAVE=%PSModulePath%
@SET PSModulePath=
@FOR /F "usebackq tokens=1* delims==" %%A IN (`powershell -noprofile "& {$scriptDir='%~dp0'; $script='%__MVNW_ARG0_NAME__%'; icm -ScriptBlock ([Scriptblock]::Create((Get-Content -Raw '%~f0'))) -NoNewScope}"`) DO @(
IF "%%A"=="MVN_CMD" (set __MVNW_CMD__=%%B) ELSE IF "%%B"=="" (echo %%A) ELSE (echo %%A=%%B)
)
@SET PSModulePath=%__MVNW_PSMODULEP_SAVE%
@SET __MVNW_PSMODULEP_SAVE=
@SET __MVNW_ARG0_NAME__=
@SET MVNW_USERNAME=
@SET MVNW_PASSWORD=
@IF NOT "%__MVNW_CMD__%"=="" (%__MVNW_CMD__% %*)
@echo Cannot start maven from wrapper >&2 && exit /b 1
@GOTO :EOF
: end batch / begin powershell #>
$ErrorActionPreference = "Stop"
if ($env:MVNW_VERBOSE -eq "true") {
$VerbosePreference = "Continue"
}
# calculate distributionUrl, requires .mvn/wrapper/maven-wrapper.properties
$distributionUrl = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionUrl
if (!$distributionUrl) {
Write-Error "cannot read distributionUrl property in $scriptDir/.mvn/wrapper/maven-wrapper.properties"
}
switch -wildcard -casesensitive ( $($distributionUrl -replace '^.*/','') ) {
"maven-mvnd-*" {
$USE_MVND = $true
$distributionUrl = $distributionUrl -replace '-bin\.[^.]*$',"-windows-amd64.zip"
$MVN_CMD = "mvnd.cmd"
break
}
default {
$USE_MVND = $false
$MVN_CMD = $script -replace '^mvnw','mvn'
break
}
}
# apply MVNW_REPOURL and calculate MAVEN_HOME
# maven home pattern: ~/.m2/wrapper/dists/{apache-maven-<version>,maven-mvnd-<version>-<platform>}/<hash>
if ($env:MVNW_REPOURL) {
$MVNW_REPO_PATTERN = if ($USE_MVND) { "/org/apache/maven/" } else { "/maven/mvnd/" }
$distributionUrl = "$env:MVNW_REPOURL$MVNW_REPO_PATTERN$($distributionUrl -replace '^.*'+$MVNW_REPO_PATTERN,'')"
}
$distributionUrlName = $distributionUrl -replace '^.*/',''
$distributionUrlNameMain = $distributionUrlName -replace '\.[^.]*$','' -replace '-bin$',''
$MAVEN_HOME_PARENT = "$HOME/.m2/wrapper/dists/$distributionUrlNameMain"
if ($env:MAVEN_USER_HOME) {
$MAVEN_HOME_PARENT = "$env:MAVEN_USER_HOME/wrapper/dists/$distributionUrlNameMain"
}
$MAVEN_HOME_NAME = ([System.Security.Cryptography.MD5]::Create().ComputeHash([byte[]][char[]]$distributionUrl) | ForEach-Object {$_.ToString("x2")}) -join ''
$MAVEN_HOME = "$MAVEN_HOME_PARENT/$MAVEN_HOME_NAME"
if (Test-Path -Path "$MAVEN_HOME" -PathType Container) {
Write-Verbose "found existing MAVEN_HOME at $MAVEN_HOME"
Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD"
exit $?
}
if (! $distributionUrlNameMain -or ($distributionUrlName -eq $distributionUrlNameMain)) {
Write-Error "distributionUrl is not valid, must end with *-bin.zip, but found $distributionUrl"
}
# prepare tmp dir
$TMP_DOWNLOAD_DIR_HOLDER = New-TemporaryFile
$TMP_DOWNLOAD_DIR = New-Item -Itemtype Directory -Path "$TMP_DOWNLOAD_DIR_HOLDER.dir"
$TMP_DOWNLOAD_DIR_HOLDER.Delete() | Out-Null
trap {
if ($TMP_DOWNLOAD_DIR.Exists) {
try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null }
catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" }
}
}
New-Item -Itemtype Directory -Path "$MAVEN_HOME_PARENT" -Force | Out-Null
# Download and Install Apache Maven
Write-Verbose "Couldn't find MAVEN_HOME, downloading and installing it ..."
Write-Verbose "Downloading from: $distributionUrl"
Write-Verbose "Downloading to: $TMP_DOWNLOAD_DIR/$distributionUrlName"
$webclient = New-Object System.Net.WebClient
if ($env:MVNW_USERNAME -and $env:MVNW_PASSWORD) {
$webclient.Credentials = New-Object System.Net.NetworkCredential($env:MVNW_USERNAME, $env:MVNW_PASSWORD)
}
[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12
$webclient.DownloadFile($distributionUrl, "$TMP_DOWNLOAD_DIR/$distributionUrlName") | Out-Null
# If specified, validate the SHA-256 sum of the Maven distribution zip file
$distributionSha256Sum = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionSha256Sum
if ($distributionSha256Sum) {
if ($USE_MVND) {
Write-Error "Checksum validation is not supported for maven-mvnd. `nPlease disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties."
}
Import-Module $PSHOME\Modules\Microsoft.PowerShell.Utility -Function Get-FileHash
if ((Get-FileHash "$TMP_DOWNLOAD_DIR/$distributionUrlName" -Algorithm SHA256).Hash.ToLower() -ne $distributionSha256Sum) {
Write-Error "Error: Failed to validate Maven distribution SHA-256, your Maven distribution might be compromised. If you updated your Maven version, you need to update the specified distributionSha256Sum property."
}
}
# unzip and move
Expand-Archive "$TMP_DOWNLOAD_DIR/$distributionUrlName" -DestinationPath "$TMP_DOWNLOAD_DIR" | Out-Null
Rename-Item -Path "$TMP_DOWNLOAD_DIR/$distributionUrlNameMain" -NewName $MAVEN_HOME_NAME | Out-Null
try {
Move-Item -Path "$TMP_DOWNLOAD_DIR/$MAVEN_HOME_NAME" -Destination $MAVEN_HOME_PARENT | Out-Null
} catch {
if (! (Test-Path -Path "$MAVEN_HOME" -PathType Container)) {
Write-Error "fail to move MAVEN_HOME"
}
} finally {
try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null }
catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" }
}
Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD"
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.nisum</groupId>
<artifactId>webflux-poc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>webflux-poc</name>
<description>Demo project for Webflux</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.nisum.webflux_poc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class WebfluxPocApplication {
public static void main(String[] args) {
SpringApplication.run(WebfluxPocApplication.class, args);
}
}
package com.nisum.webflux_poc.config;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class KafkaProducer {
private final KafkaTemplate<String , String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
}
package com.nisum.webflux_poc.controller;
import reactor.core.publisher.Flux;
public class BackpressureExample {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 1000) // A stream of 1000 integers
.log(); // Logs the processing of each item
// Requesting data from the stream in chunks of 100
flux
.onBackpressureBuffer(100, // Buffer size
(i) -> System.out.println("Dropped item: " + i)) // Action on dropped items
.subscribe(System.out::println, // Consumer processes the data
Throwable::printStackTrace,
() -> System.out.println("Completed"));
}
}
package com.nisum.webflux_poc.controller;
import com.nisum.webflux_poc.dto.EmployeeDto;
import com.nisum.webflux_poc.service.KafkaProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import com.nisum.webflux_poc.service.EmployeeService;
@RestController
@RequestMapping("/employee")
public class EmployeeController {
private static final Logger log= LoggerFactory.getLogger(EmployeeController.class);
@Autowired
EmployeeService employeeService;
@Autowired
KafkaProducerService kafkaProducerService;
@PostMapping("/save")
public Mono<ResponseEntity<EmployeeDto>> saveEmployee(@RequestBody EmployeeDto employeeDto){
log.info("Received request to save employee: {}", employeeDto);
return employeeService.saveEmployeeData(Mono.just(employeeDto))
.map(savedEmployee -> ResponseEntity.ok(savedEmployee))
.defaultIfEmpty(ResponseEntity.badRequest().build())
.onErrorResume(e -> {
log.error("Failed to save employee: {}", e.getMessage());
return Mono.just(ResponseEntity.status(500).build());
});
}
@GetMapping
public ResponseEntity<Flux<EmployeeDto>> getEmployee(){
log.info("getEmployee");
return ResponseEntity.ok(employeeService.getAllEmployees());
}
@PutMapping("/update/{id}")
public ResponseEntity<Mono<EmployeeDto>> updateEmployee(@RequestBody EmployeeDto employeeDto, @PathVariable String id){
log.info("updateProduct controller is called: {}, id");
return ResponseEntity.ok(employeeService.updateEmployee(Mono.just(employeeDto),id));
}
@DeleteMapping("/delete/{id}")
public ResponseEntity<Mono<Void>> deleteEmployee(@PathVariable String id){
return ResponseEntity.ok(employeeService.deleteEmployee(id));
}
@GetMapping("/{id}")
public ResponseEntity<Mono<EmployeeDto>> getEmployee(@PathVariable String id){
return ResponseEntity.ok(employeeService.getEmployee(id));
}
@GetMapping("/average/salary")
public ResponseEntity<Flux<Double>> getAverageSalary(){
return ResponseEntity.ok(employeeService.getAverageSalary());
}
@GetMapping("/kafka/send/{id}")
public ResponseEntity<Mono<EmployeeDto>> sendDataToKafkaTopic(@PathVariable String id){
kafkaProducerService.sendMessage(id);
return null;
}
}
package com.nisum.webflux_poc.controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
public class FluxAndMOnoGeneratorService {
public Flux<String> namesFlux(){
return Flux.fromIterable(List.of("Manasa","Nisum"))
.log();
}
public Mono<String> nameMono(){
return Mono.just("Manasa")
.log();
}
public Flux<String> namesFluxMap(){
return Flux.just("name","manasa")
.map(String::toUpperCase)
.log();
}
public Flux<String> namesFluxMapImmutable(){
var namesFlux= Flux.fromIterable(List.of("name","manasa"));
namesFlux.map(String::toUpperCase)
.log();
return namesFlux;
}
public Flux<String> namesFluxFlatMap(int length){
return Flux.fromIterable(List.of("Manasa","Nisum"))
.map(String::toUpperCase)
.filter(s -> s.length()>length)
.flatMap(s -> splitString(s))
.log();
}
public Flux<String> namesFluxFlatMapAsync(int length){
return Flux.fromIterable(List.of("Manasa","Nisum"))
.map(String::toUpperCase)
.filter(s -> s.length()>length)
.flatMap(s -> splitStringWithDelay(s))
.log();
}
public Flux<String> namesFluxConcatMap(int length){
return Flux.fromIterable(List.of("Manasa","Nisum"))
.map(String::toUpperCase)
.filter(s -> s.length()>length)
.concatMap(s -> splitStringWithDelay(s))
.log();
}
public Mono<List<String>> namesMonoFlatMap(int length){
return Mono.just("Manasa")
.map(String::toUpperCase)
.filter(s -> s.length()>length)
.flatMap(this::splitStringMono)
.log();
}
public Flux<String> namesFluxTransform(int length){
Function<Flux<String>, Flux<String>> filtermap = name -> name.map(String::toUpperCase)
.filter(s -> s.length()>length);
return Flux.fromIterable(List.of("Manasa","Nisum"))
.transform(filtermap)
.flatMap(s -> splitString(s))
.log();
}
public Flux<String> namesFluxTransformSwitchIfEmpty(int length){
Function<Flux<String>, Flux<String>> filtermap = name ->
name.map(String::toUpperCase)
.filter(s -> s.length()>length)
.flatMap(s->splitString(s));
var defaultflux = Flux.just("default")
.transform(filtermap);
return Flux.fromIterable(List.of(""))
.transform(filtermap).
switchIfEmpty(defaultflux)
.log();
}
public Flux<String> exploreConcat(){
var data= Flux.just("A","B","C");
var data1= Flux.just("D","E","F");
return Flux.concat(data,data1).log();
}
public Flux<String> exploreConcatWithMono(){
var data= Mono.just("A");
var data1= Mono.just("D");
return data.concatWith(data1).log();
}
public Flux<String> exploreMerge(){
var data= Flux.just("A","B","C")
.delayElements(Duration.ofMillis(100));
var data1= Flux.just("D","E","F")
.delayElements(Duration.ofMillis(125));
return Flux.merge(data,data1).log();
}
public Flux<String> exploreMergeWithMono(){
var data= Mono.just("A");
var data1= Flux.just("D");
return data.mergeWith(data1).log();
}
public Flux<String> exploreMergeSequential(){
var data= Flux.just("A","B","C")
.delayElements(Duration.ofMillis(100));
var data1= Flux.just("D","E","F")
.delayElements(Duration.ofMillis(125));
return Flux.mergeSequential(data,data1).log();
}
public Flux<String> exploreZip(){
var data= Flux.just("A","B","C");
var data1= Flux.just("D","E","F");
return Flux.zip(data,data1, (a,b) -> a+b)
.log(); //AD BE CF
}
public Flux<String> exploreZip1(){
var data= Flux.just("A","B","C");
var data1= Flux.just("D","E","F");
var data2= Flux.just("1","2","3");
var data3= Flux.just("4","5","6");
return Flux.zip(data,data1,data2,data3)
.map(t4 ->t4.getT1()+t4.getT2()+t4.getT3()+t4.getT4()).log(); //AD BE CF
}
public Flux<String> exploreZipWith(){
var data= Flux.just("A","B","C");
var data1= Flux.just("D","E","F");
return data.zipWith(data1 ,(a,b) -> a+b).log(); //AD BE CF
}
public Flux<String> namesMonoFlatMapMany(int length){
return Mono.just("Manasa")
.map(String::toUpperCase)
.filter(s -> s.length()>length)
.flatMapMany(this::splitString)
.log();
}
public Mono<String> namesMonoMap(int length){
return Mono.just("name")
.map(String::toUpperCase)
.filter(s -> s.length()>4)
.log();
}
public Mono<List<String>> splitStringMono(String name){
var charArray = name.split("");
var charList = List.of(charArray);
return Mono.just(charList);
}
public Flux<String> splitString(String name){
var charArray = name.split("");
return Flux.fromArray(charArray);
}
public Flux<String> splitStringWithDelay(String name){
var charArray = name.split("");
// var delay=new Random().nextInt();
var delay=1000;
return Flux.fromArray(charArray)
.delayElements(Duration.ofMillis(delay));
}
public static void main(String[] args){
FluxAndMOnoGeneratorService fluxAndMOnoGeneratorService=new FluxAndMOnoGeneratorService();
fluxAndMOnoGeneratorService.namesFlux()
.subscribe(name -> System.out.println(name));
fluxAndMOnoGeneratorService.nameMono()
.subscribe(name -> System.out.println(name));
fluxAndMOnoGeneratorService.namesFluxMap()
.subscribe(name -> System.out.println(name));
fluxAndMOnoGeneratorService.namesFluxMapImmutable()
.subscribe(name -> System.out.println(name));
}
}
package com.nisum.webflux_poc.controller;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
@RestController
public class FluxAndMonoController {
@GetMapping("/flux")
public Flux<Integer> getFluxData(){
return Flux.just(1,2,3)
.log();
}
@GetMapping(value = "/stream" ,produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Long> stream(){
return Flux.interval(Duration.ofSeconds(1))
.log();
}
}
package com.nisum.webflux_poc.controller;
public class Test {
public static void main(String[] args){
FluxAndMOnoGeneratorService fluxAndMOnoGeneratorService=new FluxAndMOnoGeneratorService();
fluxAndMOnoGeneratorService.namesFlux();
fluxAndMOnoGeneratorService.nameMono();
}
}
package com.nisum.webflux_poc.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.*;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public class EmployeeDto {
@JsonProperty("id")
private String id;
@JsonProperty("name")
private String name;
@JsonProperty("salary")
private double salary;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getSalary() {
return salary;
}
public void setSalary(double salary) {
this.salary = salary;
}
@Override
public String toString() {
return "EmployeeDto{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", salary=" + salary +
'}';
}
}
package com.nisum.webflux_poc.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProductDto {
private String id;
private String name;
private int qty;
private double price;
}
package com.nisum.webflux_poc.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection ="employee")
public class Employee {
@Id
@JsonProperty("id")
private String id;
@JsonProperty("name")
private String name;
@JsonProperty("salary")
private double salary;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getSalary() {
return salary;
}
public void setSalary(double salary) {
this.salary = salary;
}
}
package com.nisum.webflux_poc.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "products")
public class Product {
@Id
private String id;
private String name;
private int qty;
private double price;
}
package com.nisum.webflux_poc.model;
public class SalaryAggregation {
private double averageSalary;
public double getAverageSalary() {
return averageSalary;
}
public void setAverageSalary(double averageSalary) {
this.averageSalary = averageSalary;
}
}
package com.nisum.webflux_poc.repository;
import com.nisum.webflux_poc.model.Employee;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String> {
}
package com.nisum.webflux_poc.service;
import com.nisum.webflux_poc.model.Employee;
import com.nisum.webflux_poc.model.SalaryAggregation;
import com.nisum.webflux_poc.utills.AppUtils;
import com.nisum.webflux_poc.dto.EmployeeDto;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.ReactiveAggregationOperation;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import com.nisum.webflux_poc.repository.EmployeeRepository;
import org.springframework.stereotype.Service;
import java.util.Map;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.group;
@Service
public class EmployeeService{
@Autowired
private EmployeeRepository employeeRepository;
@Autowired
ReactiveMongoTemplate mongoTemplate;
private static final Logger log = LogManager.getLogger(EmployeeService.class);
public Mono<EmployeeDto> saveEmployeeData(Mono<EmployeeDto> employeeMono){
return employeeMono.map(AppUtils::dtoToentity)
.flatMap(employeeRepository::insert)
.map(AppUtils::entityToDto)
.doOnNext(savedData -> log.info("Data saved successfully: {}", savedData))
.doOnError(error -> log.error("Error occured while saving employee information: {}", error.getMessage()));
}
public Flux<EmployeeDto> getAllEmployees() {
return employeeRepository.findAll()
.map(AppUtils::entityToDto)
.doOnNext(data -> log.info("data from database: {}", data))
.doOnError(error -> log.error("Failed to fetch data from database: {}", error.getMessage()));
}
public Mono<EmployeeDto> updateEmployee(Mono<EmployeeDto> employeeDtoMono, String id){
log.info("service method called ...");
return employeeRepository.findById(id)
.flatMap(p->employeeDtoMono.map(AppUtils::dtoToentity)
.doOnNext(e->e.setName(e.getName())))
.flatMap(employeeRepository::save)
.map(AppUtils::entityToDto)
.doOnNext(data -> log.info("updated data in database for id: {} with data {}", id , data))
.doOnError(error -> log.error("Failed to update data in database: {}", error.getMessage()));
}
public Mono<Void> deleteEmployee(String id){
log.info("delete data with id: {}", id);
return employeeRepository.deleteById(id);
}
public Mono<EmployeeDto> getEmployee(String id){
return employeeRepository.findById(id)
.map(AppUtils::entityToDto)
.doOnNext(data -> log.info("Fetch data from database for id: {} with data {}", id , data))
.doOnError(error -> log.error("Failed to fetch data from database wit id: {}",id, error.getMessage()));
}
public Flux<Double> getAverageSalary(){
Aggregation aggregation = Aggregation.newAggregation(
group().avg("salary").as("averageSalary")
);
return mongoTemplate.aggregate(aggregation, "employee", Map.class)
.map(result -> (Double) result.get("averageSalary"))
.defaultIfEmpty(0.0);
}
public Mono<EmployeeDto> sendDataToKafkaTopic(String id){
Mono<EmployeeDto> employeeDtoMono=getEmployee(id);
return null;
}
}
package com.nisum.webflux_poc.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@Service
public class KafkaProducerService {
@Value("${spring.kafka.template.default-topic}")
private String topic;
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// Sending message to a topic asynchronously and reactively
public void sendMessage(String message) {
kafkaTemplate.send(topic, message);
}
}
package com.nisum.webflux_poc.utills;
import com.nisum.webflux_poc.model.Employee;
import com.nisum.webflux_poc.dto.EmployeeDto;
import org.springframework.beans.BeanUtils;
public class AppUtils {
public static EmployeeDto entityToDto(Employee employee) {
EmployeeDto employeeDto = new EmployeeDto();
BeanUtils.copyProperties(employee, employeeDto);
return employeeDto;
}
public static Employee dtoToentity(EmployeeDto employeeDto){
Employee employee=new Employee();
BeanUtils.copyProperties(employeeDto, employee);
return employee;
}
}
spring:
data:
mongodb:
database: Test
host: localhost
port: 27017
kafka:
producer:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
template:
default-topic: test
server:
port: 9292
package com.nisum.webflux_poc;
import com.nisum.webflux_poc.controller.FluxAndMOnoGeneratorService;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;
import java.util.List;
public class FluxAndMonoGeneratorServiceTest {
FluxAndMOnoGeneratorService fluxAndMOnoGeneratorService =
new FluxAndMOnoGeneratorService();
@Test
void namesFlux() {
var namesFlux= fluxAndMOnoGeneratorService.namesFlux();
StepVerifier.create(namesFlux)
.expectNext("Manasa","Nisum")
.verifyComplete();
}
@Test
void namesMono(){
var namesMono = fluxAndMOnoGeneratorService.nameMono();
StepVerifier.create(namesMono)
.expectNext("Manasa")
.verifyComplete();
}
@Test
void namesFluxMap(){
var upperCase = fluxAndMOnoGeneratorService.namesFluxMap();
StepVerifier.create(upperCase)
.expectNext("NAME","MANASA")
.verifyComplete();
}
@Test
void namesFluxMapImmutable(){
var upperCase = fluxAndMOnoGeneratorService.namesFluxMapImmutable();
StepVerifier.create(upperCase)
.expectNext("name","manasa")
.verifyComplete();
}
@Test
void namesFluxFlatMap() {
int length = 3;
var namesFlux = fluxAndMOnoGeneratorService.namesFluxFlatMap(length);
StepVerifier.create(namesFlux)
.expectNext("M","A","N","A","S","A","N","I","S","U","M")
.verifyComplete();
}
@Test
void namesFluxFlatMapAsync() {
int length = 3;
var namesFlux = fluxAndMOnoGeneratorService.namesFluxFlatMapAsync(length);
StepVerifier.create(namesFlux)
.expectNextCount(11)
.verifyComplete();
}
@Test
void namesConcatMap() {
int length = 3;
var namesFlux = fluxAndMOnoGeneratorService.namesFluxConcatMap(length);
StepVerifier.create(namesFlux)
.expectNext("M","A","N","A","S","A","N","I","S","U","M")
.verifyComplete();
}
void namesMonoFlatMap(){
int length = 4;
var value = fluxAndMOnoGeneratorService.namesMonoFlatMap(length);
StepVerifier.create(value)
.expectNext(List.of("M","A","N","A","S","A"))
.verifyComplete();
}
@Test
void namesMonoFlatMapMany(){
int length = 4;
var value = fluxAndMOnoGeneratorService.namesMonoFlatMapMany(length);
StepVerifier.create(value)
.expectNext("M","A","N","A","S","A")
.verifyComplete();
}
@Test
void namesFluxTransform(){
int length = 3;
var namesFlux = fluxAndMOnoGeneratorService.namesFluxTransform(length);
StepVerifier.create(namesFlux)
.expectNext("M","A","N","A","S","A","N","I","S","U","M")
.verifyComplete();
}
@Test
void namesFluxTransformSwitchIfEmpty(){
int length = 3;
var namesFlux = fluxAndMOnoGeneratorService.namesFluxTransformSwitchIfEmpty(length);
StepVerifier.create(namesFlux)
.expectNext("D","E","F","A","U","L","T")
.verifyComplete();
}
@Test
void exploreConcat(){
var exploreConcat=fluxAndMOnoGeneratorService.exploreConcat();
StepVerifier.create(exploreConcat)
.expectNext("A","B","C","D","E","F")
.verifyComplete();
}
@Test
void exploreConcatWithMono(){
var exploreConcatWithMono=fluxAndMOnoGeneratorService.exploreConcatWithMono();
StepVerifier.create(exploreConcatWithMono)
.expectNext("A","D")
.verifyComplete();
}
@Test
void exploreMerge(){
var exploreMerge=fluxAndMOnoGeneratorService.exploreMerge();
StepVerifier.create(exploreMerge)
.expectNext("A","D","B","E","C","F")
.verifyComplete();
}
@Test
void exploreMergeWithMono(){
var exploreMergeWithMono=fluxAndMOnoGeneratorService.exploreMergeWithMono();
StepVerifier.create(exploreMergeWithMono)
.expectNext("A","D")
.verifyComplete();
}
@Test
void exploreMergeSequential(){
var exploreMergeSequential=fluxAndMOnoGeneratorService.exploreMergeSequential();
StepVerifier.create(exploreMergeSequential)
.expectNext("A","B","C","D","E","F")
.verifyComplete();
}
@Test
void exploreZip(){
var exploreZip=fluxAndMOnoGeneratorService.exploreZip();
StepVerifier.create(exploreZip)
.expectNext("AD","BE","CF")
.verifyComplete();
}
@Test
void exploreZip1(){
var exploreZip1=fluxAndMOnoGeneratorService.exploreZip1();
StepVerifier.create(exploreZip1)
.expectNext("AD14","BE25","CF36")
.verifyComplete();
}
@Test
void exploreZipWith(){
var exploreZip=fluxAndMOnoGeneratorService.exploreZipWith();
StepVerifier.create(exploreZip)
.expectNext("AD","BE","CF")
.verifyComplete();
}
}
package com.nisum.webflux_poc;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class WebfluxPocApplicationTests {
@Test
void contextLoads() {
}
}
package com.nisum.webflux_poc.controller;
import com.nisum.webflux_poc.dto.EmployeeDto;
import com.nisum.webflux_poc.repository.EmployeeRepository;
import com.nisum.webflux_poc.service.EmployeeService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@WebFluxTest(EmployeeController.class)
public class EmployeeControllerTest {
@MockBean
EmployeeService employeeService;
@MockBean
EmployeeRepository employeeRepository;
@Autowired
private WebTestClient webTestClient;
private EmployeeDto employee;
private EmployeeDto employee1;
@BeforeEach
void setUp(){
employee=new EmployeeDto();
employee.setId("1");
employee.setName("manasa");
employee.setSalary(1234);
employee1=new EmployeeDto();
employee1.setId("2");
employee1.setName("asd");
employee1.setSalary(23456);
ReflectionTestUtils.setField(employeeService, "employeeRepository", employeeRepository);
}
@Test
void saveEmployeeDataTest(){
when(employeeService.getAllEmployees()).thenReturn(Flux.just(employee, employee1));
webTestClient.get()
.uri("/employee")
.exchange()
.expectStatus()
.is2xxSuccessful()
.expectBodyList(EmployeeDto.class);
}
@Test
void saveEmployeeTest(){
/*EmployeeDto employee2=new EmployeeDto();
employee2.setId("1");
employee2.setName("manasa");
employee2.setSalary(1234);*/
when(employeeService.saveEmployeeData(any(Mono.class))).thenReturn(Mono.just(employee));
webTestClient.post()
.uri("/employee/save")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(employee)
.exchange()
.expectStatus()
.is2xxSuccessful()
.expectBodyList(EmployeeDto.class);
}
@Test
void updateEmployeeTest(){
String employeeId="1";
Mockito.when(employeeService.getEmployee(any(String.class))).thenReturn(Mono.just(employee));
webTestClient.put()
.uri("/employee/update/{id}", employeeId)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(employee)
.exchange()
.expectStatus()
.is2xxSuccessful()
.expectBodyList(EmployeeDto.class);
}
@Test
void getEmployeeTest(){
String employeeId="1";
Mono<EmployeeDto> employeeDto= Mono.just(employee);
when(employeeService.getEmployee(employeeId)).thenReturn(employeeDto);
webTestClient.get()
.uri("/employee/{id}", employeeId)
.exchange()
.expectStatus()
.is2xxSuccessful()
.expectBodyList(EmployeeDto.class);
}
@Test
void deleteEmployeeTest(){
String employeeId="1";
when(employeeService.deleteEmployee(any(String.class))).thenReturn(Mono.empty());
webTestClient.delete()
.uri("/employee/delete/{id}", employeeId)
.exchange()
.expectStatus()
.is2xxSuccessful()
.expectBodyList(EmployeeDto.class);
}
@Test
void getAverageSalaryTest(){
Double averageSalary= 90.9;
Mockito.when(employeeService.getAverageSalary()).thenReturn(Flux.just(averageSalary));
webTestClient.get()
.uri("/employee/average/salary")
.exchange()
.expectStatus()
.is2xxSuccessful()
.expectBodyList(Double.class);
}
}
package com.nisum.webflux_poc.controller;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class FluxAndMOnoGeneratorServiceTest {
}
\ No newline at end of file
package com.nisum.webflux_poc.controller;
import com.nisum.webflux_poc.dto.EmployeeDto;
import com.nisum.webflux_poc.service.EmployeeService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.Objects;
import static org.mockito.Mockito.when;
@WebFluxTest(controllers = FluxAndMonoController.class)
@AutoConfigureWebTestClient
class FluxAndMonoControllerTest {
@Autowired
WebTestClient webTestClient;
@Test
void getFluxData() {
webTestClient.get()
.uri("/flux")
.exchange()
.expectStatus()
.is2xxSuccessful()
.expectBodyList(Integer.class)
.hasSize(3);
}
@Test
void getFluxDataApproach2() {
var flux=webTestClient.get()
.uri("/flux")
.exchange()
.expectStatus()
.is2xxSuccessful()
.returnResult(Integer.class)
.getResponseBody();
StepVerifier.create(flux)
.expectNext(1,2,3)
.verifyComplete();
}
@Test
void getFluxDataApproach3() {
webTestClient.get()
.uri("/flux")
.exchange()
.expectStatus()
.is2xxSuccessful()
.expectBodyList(Integer.class)
.consumeWith(a ->{
var responseBody= a.getResponseBody();
assert (Objects.requireNonNull(responseBody).size()==3);
});
}
@Test
void getstream() {
var flux=webTestClient.get()
.uri("/stream")
.exchange()
.expectStatus()
.is2xxSuccessful()
.returnResult(Long.class)
.getResponseBody();
StepVerifier.create(flux)
.expectNext(0L,1L,2L,3L)
.thenCancel()
.verify();
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment