Commit 3555acfa authored by Giridhari Sahoo's avatar Giridhari Sahoo

initial commit

parent 3617ecd6
/mvnw text eol=lf
*.cmd text eol=crlf
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
wrapperVersion=3.3.2
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.9/apache-maven-3.9.9-bin.zip
This diff is collapsed.
<# : batch portion
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Apache Maven Wrapper startup batch script, version 3.3.2
@REM
@REM Optional ENV vars
@REM MVNW_REPOURL - repo url base for downloading maven distribution
@REM MVNW_USERNAME/MVNW_PASSWORD - user and password for downloading maven
@REM MVNW_VERBOSE - true: enable verbose log; others: silence the output
@REM ----------------------------------------------------------------------------
@IF "%__MVNW_ARG0_NAME__%"=="" (SET __MVNW_ARG0_NAME__=%~nx0)
@SET __MVNW_CMD__=
@SET __MVNW_ERROR__=
@SET __MVNW_PSMODULEP_SAVE=%PSModulePath%
@SET PSModulePath=
@FOR /F "usebackq tokens=1* delims==" %%A IN (`powershell -noprofile "& {$scriptDir='%~dp0'; $script='%__MVNW_ARG0_NAME__%'; icm -ScriptBlock ([Scriptblock]::Create((Get-Content -Raw '%~f0'))) -NoNewScope}"`) DO @(
IF "%%A"=="MVN_CMD" (set __MVNW_CMD__=%%B) ELSE IF "%%B"=="" (echo %%A) ELSE (echo %%A=%%B)
)
@SET PSModulePath=%__MVNW_PSMODULEP_SAVE%
@SET __MVNW_PSMODULEP_SAVE=
@SET __MVNW_ARG0_NAME__=
@SET MVNW_USERNAME=
@SET MVNW_PASSWORD=
@IF NOT "%__MVNW_CMD__%"=="" (%__MVNW_CMD__% %*)
@echo Cannot start maven from wrapper >&2 && exit /b 1
@GOTO :EOF
: end batch / begin powershell #>
$ErrorActionPreference = "Stop"
if ($env:MVNW_VERBOSE -eq "true") {
$VerbosePreference = "Continue"
}
# calculate distributionUrl, requires .mvn/wrapper/maven-wrapper.properties
$distributionUrl = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionUrl
if (!$distributionUrl) {
Write-Error "cannot read distributionUrl property in $scriptDir/.mvn/wrapper/maven-wrapper.properties"
}
switch -wildcard -casesensitive ( $($distributionUrl -replace '^.*/','') ) {
"maven-mvnd-*" {
$USE_MVND = $true
$distributionUrl = $distributionUrl -replace '-bin\.[^.]*$',"-windows-amd64.zip"
$MVN_CMD = "mvnd.cmd"
break
}
default {
$USE_MVND = $false
$MVN_CMD = $script -replace '^mvnw','mvn'
break
}
}
# apply MVNW_REPOURL and calculate MAVEN_HOME
# maven home pattern: ~/.m2/wrapper/dists/{apache-maven-<version>,maven-mvnd-<version>-<platform>}/<hash>
if ($env:MVNW_REPOURL) {
$MVNW_REPO_PATTERN = if ($USE_MVND) { "/org/apache/maven/" } else { "/maven/mvnd/" }
$distributionUrl = "$env:MVNW_REPOURL$MVNW_REPO_PATTERN$($distributionUrl -replace '^.*'+$MVNW_REPO_PATTERN,'')"
}
$distributionUrlName = $distributionUrl -replace '^.*/',''
$distributionUrlNameMain = $distributionUrlName -replace '\.[^.]*$','' -replace '-bin$',''
$MAVEN_HOME_PARENT = "$HOME/.m2/wrapper/dists/$distributionUrlNameMain"
if ($env:MAVEN_USER_HOME) {
$MAVEN_HOME_PARENT = "$env:MAVEN_USER_HOME/wrapper/dists/$distributionUrlNameMain"
}
$MAVEN_HOME_NAME = ([System.Security.Cryptography.MD5]::Create().ComputeHash([byte[]][char[]]$distributionUrl) | ForEach-Object {$_.ToString("x2")}) -join ''
$MAVEN_HOME = "$MAVEN_HOME_PARENT/$MAVEN_HOME_NAME"
if (Test-Path -Path "$MAVEN_HOME" -PathType Container) {
Write-Verbose "found existing MAVEN_HOME at $MAVEN_HOME"
Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD"
exit $?
}
if (! $distributionUrlNameMain -or ($distributionUrlName -eq $distributionUrlNameMain)) {
Write-Error "distributionUrl is not valid, must end with *-bin.zip, but found $distributionUrl"
}
# prepare tmp dir
$TMP_DOWNLOAD_DIR_HOLDER = New-TemporaryFile
$TMP_DOWNLOAD_DIR = New-Item -Itemtype Directory -Path "$TMP_DOWNLOAD_DIR_HOLDER.dir"
$TMP_DOWNLOAD_DIR_HOLDER.Delete() | Out-Null
trap {
if ($TMP_DOWNLOAD_DIR.Exists) {
try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null }
catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" }
}
}
New-Item -Itemtype Directory -Path "$MAVEN_HOME_PARENT" -Force | Out-Null
# Download and Install Apache Maven
Write-Verbose "Couldn't find MAVEN_HOME, downloading and installing it ..."
Write-Verbose "Downloading from: $distributionUrl"
Write-Verbose "Downloading to: $TMP_DOWNLOAD_DIR/$distributionUrlName"
$webclient = New-Object System.Net.WebClient
if ($env:MVNW_USERNAME -and $env:MVNW_PASSWORD) {
$webclient.Credentials = New-Object System.Net.NetworkCredential($env:MVNW_USERNAME, $env:MVNW_PASSWORD)
}
[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12
$webclient.DownloadFile($distributionUrl, "$TMP_DOWNLOAD_DIR/$distributionUrlName") | Out-Null
# If specified, validate the SHA-256 sum of the Maven distribution zip file
$distributionSha256Sum = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionSha256Sum
if ($distributionSha256Sum) {
if ($USE_MVND) {
Write-Error "Checksum validation is not supported for maven-mvnd. `nPlease disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties."
}
Import-Module $PSHOME\Modules\Microsoft.PowerShell.Utility -Function Get-FileHash
if ((Get-FileHash "$TMP_DOWNLOAD_DIR/$distributionUrlName" -Algorithm SHA256).Hash.ToLower() -ne $distributionSha256Sum) {
Write-Error "Error: Failed to validate Maven distribution SHA-256, your Maven distribution might be compromised. If you updated your Maven version, you need to update the specified distributionSha256Sum property."
}
}
# unzip and move
Expand-Archive "$TMP_DOWNLOAD_DIR/$distributionUrlName" -DestinationPath "$TMP_DOWNLOAD_DIR" | Out-Null
Rename-Item -Path "$TMP_DOWNLOAD_DIR/$distributionUrlNameMain" -NewName $MAVEN_HOME_NAME | Out-Null
try {
Move-Item -Path "$TMP_DOWNLOAD_DIR/$MAVEN_HOME_NAME" -Destination $MAVEN_HOME_PARENT | Out-Null
} catch {
if (! (Test-Path -Path "$MAVEN_HOME" -PathType Container)) {
Write-Error "fail to move MAVEN_HOME"
}
} finally {
try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null }
catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" }
}
Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD"
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.nisum</groupId>
<artifactId>product-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>product-consumer</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
<spring-cloud.version>2024.0.1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!--<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
</dependency>-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.nisum.product_consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication(scanBasePackages = "com.nisum.product_consumer")
@EnableDiscoveryClient
public class ProductConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ProductConsumerApplication.class, args);
}
}
package com.nisum.product_consumer.advice;
import com.nisum.product_consumer.exception.InvalidMessageException;
import com.nisum.product_consumer.exception.ProductProcessingException;
import com.nisum.product_consumer.service.KafkaConsumerServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import reactor.core.publisher.Mono;
import java.util.Map;
@RestControllerAdvice
//@Slf4j
public class GlobalExceptionHandler {
private Logger log = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);
@ExceptionHandler(InvalidMessageException.class)
public Mono<Map<String, String>> handleInvalidMessageException(InvalidMessageException ex) {
log.warn("Invalid Kafka message received: {}", ex.getMessage());
return Mono.just(Map.of("error", ex.getMessage(), "status", "INVALID_MESSAGE"));
}
@ExceptionHandler(ProductProcessingException.class)
public Mono<Map<String, String>> handleProductProcessingException(ProductProcessingException ex) {
log.error("Product processing failed: {}", ex.getMessage());
return Mono.just(Map.of("error", ex.getMessage(), "status", "PROCESSING_ERROR"));
}
@ExceptionHandler(Exception.class)
public Mono<Map<String, String>> handleGenericException(Exception ex) {
log.error("Unexpected error occurred: {}", ex.getMessage(), ex);
return Mono.just(Map.of("error", "Internal Server Error", "status", "FAILURE"));
}
}
package com.nisum.product_consumer.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.topic}")
private String topic;
@Value("${spring.kafka.group-id}")
private String groupID;
@Bean
public ReceiverOptions<String, String> receiverOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return ReceiverOptions.<String, String>create(props)
.subscription(Collections.singleton(topic));
}
@Bean
public KafkaReceiver<String, String> kafkaReceiver(ReceiverOptions<String, String> receiverOptions) {
return KafkaReceiver.create(receiverOptions);
}
}
package com.nisum.product_consumer.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
public class WebClientConfig {
@Value("${producer.base-url}")
private String baseUrl;
@Bean
public WebClient getWebClient(){
return WebClient
.builder()
.baseUrl(baseUrl)
.build();
}
}
package com.nisum.product_consumer.controller;
import com.nisum.product_consumer.dto.ProductDto;
import com.nisum.product_consumer.service.ProductConsumerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/ProductConsumer")
public class ProductConsumerController {
@Autowired
private ProductConsumerService productConsumerService;
@GetMapping("/fetchAllProducts")
public ResponseEntity<Flux<ProductDto>> fetchAllProducts(){
return ResponseEntity.ok(productConsumerService.getAllProductsFromProducer());
}
}
package com.nisum.product_consumer.dto;
public class ProductDto {
private String id;
private String name;
private Double price;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return "Product{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", price=" + price +
'}';
}
}
package com.nisum.product_consumer.entity;
import lombok.*;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Document(collection = "product_consumer")
public class Product {
private String id;
private String name;
private Double price;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
}
package com.nisum.product_consumer.exception;
public class InvalidMessageException extends RuntimeException{
public InvalidMessageException(String message) {
super(message);
}
}
package com.nisum.product_consumer.exception;
public class ProductProcessingException extends RuntimeException{
public ProductProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
package com.nisum.product_consumer.repository;
import com.nisum.product_consumer.entity.Product;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface ProductConsumerRepository extends ReactiveMongoRepository<Product,String> {
}
package com.nisum.product_consumer.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nisum.product_consumer.entity.Product;
import com.nisum.product_consumer.exception.InvalidMessageException;
import com.nisum.product_consumer.exception.ProductProcessingException;
import com.nisum.product_consumer.repository.ProductConsumerRepository;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.util.retry.Retry;
import java.time.Duration;
@Service
//@RequiredArgsConstructor
//@Slf4j
public class KafkaConsumerServiceImpl {
private Logger log = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);
private final KafkaReceiver<String, String> kafkaReceiver;
private final ProductConsumerRepository productRepository;
private final ObjectMapper objectMapper;
public KafkaConsumerServiceImpl(KafkaReceiver<String, String> kafkaReceiver, ProductConsumerRepository productRepository, ObjectMapper objectMapper) {
this.kafkaReceiver = kafkaReceiver;
this.productRepository = productRepository;
this.objectMapper = objectMapper;
}
@PostConstruct
public void startConsuming() {
consumeMessages();
}
public void consumeMessages() {
kafkaReceiver.receive()
.flatMap(this::processMessage)
.onErrorContinue((error, obj) -> log.error("Error processing message: {}", error.getMessage(), error))
.subscribe();
}
private Mono<Void> processMessage(ReceiverRecord<String, String> record) {
return Mono.just(record.value())
.filter(StringUtils::isNotBlank)
.switchIfEmpty(Mono.error(new InvalidMessageException("Received blank message from Kafka.")))
.flatMap(this::parseProduct)
.flatMap(this::validateProduct)
.flatMap(productRepository::save)
.doOnSuccess(product -> log.info("Successfully saved product: {}", product))
.doOnError(error -> log.error("Error processing product: {}", error.getMessage(), error))
.onErrorResume(InvalidMessageException.class, error -> {
log.warn("Skipping invalid message: {}", error.getMessage());
return Mono.empty();
})
.onErrorResume(ProductProcessingException.class, error -> {
log.error("Product processing failed: {}", error.getMessage());
return Mono.empty();
})
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)) // Retry 3 times on failure
.filter(error -> error instanceof ProductProcessingException) // Only retry on processing failure
.onRetryExhaustedThrow((retryBackoffSpec, signal) -> signal.failure()))
.then(Mono.fromRunnable(() -> record.receiverOffset().acknowledge())); // Commit offset if successful
}
private Mono<Product> parseProduct(String message) {
try {
Product product = new ObjectMapper().readValue(message, Product.class);
return Mono.just(product);
} catch (Exception e) {
log.error("Error parsing JSON message: {}", message, e);
return Mono.empty();
}
}
private Mono<Product> validateProduct(Product product) {
if (ObjectUtils.isEmpty(product)) {
log.warn("Product object is null or empty. Skipping processing.");
return Mono.empty();
}
if (StringUtils.isBlank(product.getName()) || ObjectUtils.isEmpty(product.getPrice()) || product.getPrice() <= 0) {
log.warn("Invalid product data: {}", product);
return Mono.empty();
}
return Mono.just(product);
}
}
package com.nisum.product_consumer.service;
import com.nisum.product_consumer.dto.ProductDto;
import reactor.core.publisher.Flux;
public interface ProductConsumerService{
public Flux<ProductDto> getAllProductsFromProducer();
}
package com.nisum.product_consumer.service;
import com.nisum.product_consumer.dto.ProductDto;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
@Service
public class ProductConsumerServiceImpl implements ProductConsumerService{
private static final Logger logger = LoggerFactory.getLogger(ProductConsumerServiceImpl.class);
private static final String CIRCUIT_BREAKER_INSTANCE = "productConsumer";
@Autowired
private WebClient webClient;
@CircuitBreaker(name = CIRCUIT_BREAKER_INSTANCE, fallbackMethod = "fallbackGetAllProducts")
@Override
public Flux<ProductDto> getAllProductsFromProducer() {
return webClient
.get()
.uri("/getAllProduct")
.retrieve()
.bodyToFlux(ProductDto.class)
.doOnNext(product -> logger.info("Received product: {}", product));
}
public Flux<ProductDto> fallbackGetAllProducts(Throwable ex) {
logger.error("Fallback triggered for getAllProductsFromProducer due to: {}", ex.getMessage());
return Flux.empty();
}
}
spring.application.name=product-consumer
server :
port : 8082
spring:
application:
name: PRODUCT-CONSUMER
data:
mongodb:
database: productdb
host: localhost
port: 27017
# auto-index-creation: true
kafka:
bootstrap-servers: localhost:9092 # Kafka broker
topic: product-topic # Kafka topic name
group-id: product-consumer-group # Consumer group for Kafka
consumer:
group-id: product-consumer-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false # Ensures manual acknowledgment in KafkaReceiver
producer :
base-url : "http://localhost:8081/product"
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/ # Eureka Server URL
register-with-eureka: true # Enables registration
fetch-registry: true # Enables service discovery
instance:
hostname: localhost
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
circuitbreakers.enabled: true
tracing:
sampling:
probability: 1.0
#
#resilience4j.circuitbreaker:
# configs:
# default:
# registerHealthIndicator: true
# slidingWindowSize: 10
# minimumNumberOfCalls: 5
# permittedNumberOfCallsInHalfOpenState: 3
# automaticTransitionFromOpenToHalfOpenEnabled: true
# waitDurationInOpenState: 5s
# failureRateThreshold: 50
# eventConsumerBufferSize: 10
#management.endpoints.web.exposure.include: '*'
#management.endpoint.health.show-details: always
resilience4j.circuitbreaker:
instances:
productConsumer:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 5s
failureRateThreshold: 50
ventConsumerBufferSize: 10
package com.nisum.product_consumer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ProductConsumerApplicationTests {
@Test
void contextLoads() {
}
}
package com.nisum.product_consumer.controller;
import com.nisum.product_consumer.dto.ProductDto;
import com.nisum.product_consumer.service.ProductConsumerService;
import com.nisum.product_consumer.service.ProductConsumerServiceImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.*;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
/*@WebFluxTest(ProductConsumerController.class)
@ExtendWith(MockitoExtension.class)*/
@ExtendWith(MockitoExtension.class)
public class ProductConsumerControllerTest {
@InjectMocks
private ProductConsumerServiceImpl productConsumerService;
@Mock
private WebClient.Builder webClientBuilder;
@Mock
private WebClient webClient;
@Mock
private WebClient.RequestHeadersUriSpec<?> requestHeadersUriSpec;
@Mock
private WebClient.RequestHeadersSpec<?> requestHeadersSpec;
@Mock
private WebClient.ResponseSpec responseSpec;
@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
when(webClientBuilder.baseUrl(Mockito.anyString())).thenReturn(webClientBuilder);
when(webClientBuilder.build()).thenReturn(webClient);
}
@Test
void testGetAllProductsFromProducer() {
// Sample mock response
ProductDto productDto1 = new ProductDto();
productDto1.setId("101");
productDto1.setName("iPhone 15");
productDto1.setPrice(999.99);
ProductDto productDto2 = new ProductDto();
productDto2.setId("102");
productDto2.setName("LED TV");
productDto2.setPrice(1299.99);
Flux<ProductDto> productDtoFlux = Flux.just(productDto1, productDto2);
WebClient.RequestHeadersUriSpec requestHeadersUriSpec1 = requestHeadersUriSpec;
when(webClient.get()).thenReturn(requestHeadersUriSpec1);
when(requestHeadersUriSpec1.uri(Mockito.anyString())).thenReturn(requestHeadersSpec);
when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
when(responseSpec.bodyToFlux(ProductDto.class)).thenReturn(productDtoFlux);
// Call the method
Flux<ProductDto> result = productConsumerService.getAllProductsFromProducer();
// Verify the result
List<ProductDto> productList = result.collectList().block();
StepVerifier.create(result)
.expectNext(productList.get(0))
.expectNext(productList.get(1))
.verifyComplete();
}
// @Mock
// private ProductConsumerService productConsumerService;
//
// @InjectMocks
// private ProductConsumerController productConsumerController;
//
// @Mock
// private WebTestClient webTestClient;
//
// @BeforeEach
// void setUp() {
// MockitoAnnotations.openMocks(this);
// webTestClient = WebTestClient.bindToController(productConsumerController).build();
// }
// @Test
// public void fetchAllProductTest() {
// ProductDto productDto1 = new ProductDto();
// productDto1.setId("101");
// productDto1.setName("iPhone 15");
// productDto1.setPrice(999.99);
//
// ProductDto productDto2 = new ProductDto();
// productDto2.setId("102");
// productDto2.setName("LED TV");
// productDto2.setPrice(1299.99);
//
// Flux<ProductDto> productDtoFlux = Flux.just(productDto1, productDto2);
//
// when(productConsumerService.getAllProductsFromProducer()).thenReturn(productDtoFlux);
// webTestClient.get()
// .uri("/ProductConsumer/fetchAllProducts")
// .accept(MediaType.APPLICATION_JSON)
// .exchange()
// .expectStatus().isOk()
// .expectBodyList(ProductDto.class)
// .value(products -> {
// assertThat(products).hasSize(2);
// assertThat(products.get(0).getId()).isEqualTo("101");
// assertThat(products.get(0).getName()).isEqualTo("iPhone 14");
// assertThat(products.get(1).getId()).isEqualTo("102");
// assertThat(products.get(1).getName()).isEqualTo("LED TV");
// });
//
//
// //Verify interactions
// verify(productConsumerService, times(1)).getAllProductsFromProducer();
// }
}
package com.nisum.product_consumer.service;
import com.nisum.product_consumer.dto.ProductDto;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.*;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
//@WebFluxTest(ProductConsumerService.class)
public class ProductConsumerServiceTest {
@Mock
private WebClient webClient; // Mock WebClient
@Mock
private WebClient.RequestHeadersUriSpec<?> requestHeadersUriSpec;
@Mock
private WebClient.RequestHeadersSpec<?> requestHeadersSpec;
@Mock
private WebClient.ResponseSpec responseSpec;
@InjectMocks
private ProductConsumerServiceImpl productConsumerService;
@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
// Manually creating mocks to resolve generic issues
requestHeadersUriSpec = mock(WebClient.RequestHeadersUriSpec.class);
requestHeadersSpec = mock(WebClient.RequestHeadersSpec.class);
responseSpec = mock(WebClient.ResponseSpec.class);
}
// Given: Mock product list
@Test
public void getAllProductFromProducer() {
ProductDto product1 = new ProductDto();//"101", "iPhone 14", 999.99
product1.setId("101");
product1.setName("iPhone 14");
product1.setPrice(999.99);
ProductDto product2 = new ProductDto();//"102", "Samsung S22", 899.99
product2.setId("102");
product2.setName("Samsung S22");
product2.setPrice(899.99);
Flux<ProductDto> mockProductFlux = Flux.just(product1, product2);
// when(webClient.get()).thenReturn(requestHeadersUriSpec); // Mock get()
// when(requestHeadersUriSpec.uri(anyString())).thenReturn(requestHeadersSpec); // Mock uri()
// when(requestHeadersSpec.retrieve()).thenReturn(responseSpec); // Mock retrieve()
// when(responseSpec.bodyToFlux(ArgumentMatchers.any(Class.class))).thenReturn(mockProductFlux); // Mock bodyToFlux()
WebClient.RequestHeadersUriSpec requestHeadersUriSpec1 = requestHeadersUriSpec;
when(webClient.get()).thenReturn(requestHeadersUriSpec1);
when(requestHeadersUriSpec1.uri(Mockito.anyString())).thenReturn(requestHeadersSpec);
when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
when(responseSpec.bodyToFlux(ProductDto.class)).thenReturn(mockProductFlux);
Flux<ProductDto> result = productConsumerService.getAllProductsFromProducer();
StepVerifier.create(result)
.expectNext(product1)
.expectNext(product2)
.verifyComplete();
// Verify interactions with WebClient
verify(webClient, times(1)).get();
verify(requestHeadersUriSpec, times(1)).uri("/getAllProduct");
verify(requestHeadersSpec, times(1)).retrieve();
verify(responseSpec, times(1)).bodyToFlux(ProductDto.class);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment