Commit 40f409d9 authored by Giridhari Sahoo's avatar Giridhari Sahoo

src added

parent a79c1988
This diff is collapsed.
<# : batch portion
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Apache Maven Wrapper startup batch script, version 3.3.2
@REM
@REM Optional ENV vars
@REM MVNW_REPOURL - repo url base for downloading maven distribution
@REM MVNW_USERNAME/MVNW_PASSWORD - user and password for downloading maven
@REM MVNW_VERBOSE - true: enable verbose log; others: silence the output
@REM ----------------------------------------------------------------------------
@IF "%__MVNW_ARG0_NAME__%"=="" (SET __MVNW_ARG0_NAME__=%~nx0)
@SET __MVNW_CMD__=
@SET __MVNW_ERROR__=
@SET __MVNW_PSMODULEP_SAVE=%PSModulePath%
@SET PSModulePath=
@FOR /F "usebackq tokens=1* delims==" %%A IN (`powershell -noprofile "& {$scriptDir='%~dp0'; $script='%__MVNW_ARG0_NAME__%'; icm -ScriptBlock ([Scriptblock]::Create((Get-Content -Raw '%~f0'))) -NoNewScope}"`) DO @(
IF "%%A"=="MVN_CMD" (set __MVNW_CMD__=%%B) ELSE IF "%%B"=="" (echo %%A) ELSE (echo %%A=%%B)
)
@SET PSModulePath=%__MVNW_PSMODULEP_SAVE%
@SET __MVNW_PSMODULEP_SAVE=
@SET __MVNW_ARG0_NAME__=
@SET MVNW_USERNAME=
@SET MVNW_PASSWORD=
@IF NOT "%__MVNW_CMD__%"=="" (%__MVNW_CMD__% %*)
@echo Cannot start maven from wrapper >&2 && exit /b 1
@GOTO :EOF
: end batch / begin powershell #>
$ErrorActionPreference = "Stop"
if ($env:MVNW_VERBOSE -eq "true") {
$VerbosePreference = "Continue"
}
# calculate distributionUrl, requires .mvn/wrapper/maven-wrapper.properties
$distributionUrl = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionUrl
if (!$distributionUrl) {
Write-Error "cannot read distributionUrl property in $scriptDir/.mvn/wrapper/maven-wrapper.properties"
}
switch -wildcard -casesensitive ( $($distributionUrl -replace '^.*/','') ) {
"maven-mvnd-*" {
$USE_MVND = $true
$distributionUrl = $distributionUrl -replace '-bin\.[^.]*$',"-windows-amd64.zip"
$MVN_CMD = "mvnd.cmd"
break
}
default {
$USE_MVND = $false
$MVN_CMD = $script -replace '^mvnw','mvn'
break
}
}
# apply MVNW_REPOURL and calculate MAVEN_HOME
# maven home pattern: ~/.m2/wrapper/dists/{apache-maven-<version>,maven-mvnd-<version>-<platform>}/<hash>
if ($env:MVNW_REPOURL) {
$MVNW_REPO_PATTERN = if ($USE_MVND) { "/org/apache/maven/" } else { "/maven/mvnd/" }
$distributionUrl = "$env:MVNW_REPOURL$MVNW_REPO_PATTERN$($distributionUrl -replace '^.*'+$MVNW_REPO_PATTERN,'')"
}
$distributionUrlName = $distributionUrl -replace '^.*/',''
$distributionUrlNameMain = $distributionUrlName -replace '\.[^.]*$','' -replace '-bin$',''
$MAVEN_HOME_PARENT = "$HOME/.m2/wrapper/dists/$distributionUrlNameMain"
if ($env:MAVEN_USER_HOME) {
$MAVEN_HOME_PARENT = "$env:MAVEN_USER_HOME/wrapper/dists/$distributionUrlNameMain"
}
$MAVEN_HOME_NAME = ([System.Security.Cryptography.MD5]::Create().ComputeHash([byte[]][char[]]$distributionUrl) | ForEach-Object {$_.ToString("x2")}) -join ''
$MAVEN_HOME = "$MAVEN_HOME_PARENT/$MAVEN_HOME_NAME"
if (Test-Path -Path "$MAVEN_HOME" -PathType Container) {
Write-Verbose "found existing MAVEN_HOME at $MAVEN_HOME"
Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD"
exit $?
}
if (! $distributionUrlNameMain -or ($distributionUrlName -eq $distributionUrlNameMain)) {
Write-Error "distributionUrl is not valid, must end with *-bin.zip, but found $distributionUrl"
}
# prepare tmp dir
$TMP_DOWNLOAD_DIR_HOLDER = New-TemporaryFile
$TMP_DOWNLOAD_DIR = New-Item -Itemtype Directory -Path "$TMP_DOWNLOAD_DIR_HOLDER.dir"
$TMP_DOWNLOAD_DIR_HOLDER.Delete() | Out-Null
trap {
if ($TMP_DOWNLOAD_DIR.Exists) {
try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null }
catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" }
}
}
New-Item -Itemtype Directory -Path "$MAVEN_HOME_PARENT" -Force | Out-Null
# Download and Install Apache Maven
Write-Verbose "Couldn't find MAVEN_HOME, downloading and installing it ..."
Write-Verbose "Downloading from: $distributionUrl"
Write-Verbose "Downloading to: $TMP_DOWNLOAD_DIR/$distributionUrlName"
$webclient = New-Object System.Net.WebClient
if ($env:MVNW_USERNAME -and $env:MVNW_PASSWORD) {
$webclient.Credentials = New-Object System.Net.NetworkCredential($env:MVNW_USERNAME, $env:MVNW_PASSWORD)
}
[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12
$webclient.DownloadFile($distributionUrl, "$TMP_DOWNLOAD_DIR/$distributionUrlName") | Out-Null
# If specified, validate the SHA-256 sum of the Maven distribution zip file
$distributionSha256Sum = (Get-Content -Raw "$scriptDir/.mvn/wrapper/maven-wrapper.properties" | ConvertFrom-StringData).distributionSha256Sum
if ($distributionSha256Sum) {
if ($USE_MVND) {
Write-Error "Checksum validation is not supported for maven-mvnd. `nPlease disable validation by removing 'distributionSha256Sum' from your maven-wrapper.properties."
}
Import-Module $PSHOME\Modules\Microsoft.PowerShell.Utility -Function Get-FileHash
if ((Get-FileHash "$TMP_DOWNLOAD_DIR/$distributionUrlName" -Algorithm SHA256).Hash.ToLower() -ne $distributionSha256Sum) {
Write-Error "Error: Failed to validate Maven distribution SHA-256, your Maven distribution might be compromised. If you updated your Maven version, you need to update the specified distributionSha256Sum property."
}
}
# unzip and move
Expand-Archive "$TMP_DOWNLOAD_DIR/$distributionUrlName" -DestinationPath "$TMP_DOWNLOAD_DIR" | Out-Null
Rename-Item -Path "$TMP_DOWNLOAD_DIR/$distributionUrlNameMain" -NewName $MAVEN_HOME_NAME | Out-Null
try {
Move-Item -Path "$TMP_DOWNLOAD_DIR/$MAVEN_HOME_NAME" -Destination $MAVEN_HOME_PARENT | Out-Null
} catch {
if (! (Test-Path -Path "$MAVEN_HOME" -PathType Container)) {
Write-Error "fail to move MAVEN_HOME"
}
} finally {
try { Remove-Item $TMP_DOWNLOAD_DIR -Recurse -Force | Out-Null }
catch { Write-Warning "Cannot remove $TMP_DOWNLOAD_DIR" }
}
Write-Output "MVN_CMD=$MAVEN_HOME/bin/$MVN_CMD"
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.nisum</groupId>
<artifactId>products-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>products-producer</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
<spring-cloud.version>2024.0.1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.3.16</version> <!-- Ensure this version or latest -->
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<!--<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>-->
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.nisum.products_producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class ProductsProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProductsProducerApplication.class, args);
}
}
package com.nisum.products_producer.advice;
import com.nisum.products_producer.exception.DuplicateProductException;
import com.nisum.products_producer.exception.InvalidPriceException;
import com.nisum.products_producer.exception.InvalidProductNameException;
import com.nisum.products_producer.exception.KafkaPublishingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.bind.support.WebExchangeBindException;
import java.util.HashMap;
import java.util.Map;
@RestControllerAdvice
public class GlobalExceptionHandler {
private Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);
@ExceptionHandler(InvalidProductNameException.class)
public ResponseEntity<String> handleInvalidNameException(InvalidProductNameException ex) {
return ResponseEntity.badRequest().body(ex.getMessage());
}
/* @ExceptionHandler(DuplicateProductException.class)
public ResponseEntity<String> handleDuplicateProductException(DuplicateProductException ex) {
logger.error("Handling DuplicateProductGlobalException: {}", ex.getMessage());
return ResponseEntity.status(HttpStatus.CONFLICT).body(ex.getMessage());
}*/
@ExceptionHandler(DuplicateProductException.class)
public ResponseEntity<Map<String, String>> handleDuplicateProductException(DuplicateProductException ex) {
logger.error("Duplicate product detected: {}", ex.getMessage());
Map<String, String> errorResponse = new HashMap<>();
errorResponse.put("message", ex.getMessage());
return ResponseEntity.status(HttpStatus.CONFLICT).body(errorResponse);
}
@ExceptionHandler(InvalidPriceException.class)
public ResponseEntity<String> handleInvalidPriceException(InvalidPriceException ex) {
return ResponseEntity.badRequest().body(ex.getMessage());
}
@ExceptionHandler(MethodArgumentNotValidException.class)
public ResponseEntity<Map<String, String>> handleValidationExceptions(MethodArgumentNotValidException ex) {
Map<String, String> errors = new HashMap<>();
ex.getBindingResult().getFieldErrors().forEach(error ->
errors.put(error.getField(), error.getDefaultMessage())
);
return ResponseEntity.badRequest().body(errors);
}
@ExceptionHandler(Exception.class)
public ResponseEntity<String> handleGeneralException(Exception ex) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("An error occurred: " + ex.getMessage());
}
@ExceptionHandler(DuplicateKeyException.class)
public ResponseEntity<String> handleDuplicateKeyException(DuplicateKeyException ex) {
return ResponseEntity.status(HttpStatus.CONFLICT)
.body("Duplicate entry: The product with the same name and price already exists.");
}
@ExceptionHandler(WebExchangeBindException.class)
public ResponseEntity<Map<String, String>> handleFieldValidationException(WebExchangeBindException ex){
Map<String,String> errors = new HashMap<>();
ex.getBindingResult().getFieldErrors().forEach(error->
errors.put(error.getField(), error.getDefaultMessage())
);
//return ResponseEntity.badRequest().body(errors);
return new ResponseEntity<>(errors, HttpStatus.BAD_REQUEST);
}
@ExceptionHandler(KafkaPublishingException.class)
public ResponseEntity<Map<String, String>> handleKafkaPublishingException(KafkaPublishingException ex) {
Map<String, String> response = new HashMap<>();
response.put("error", ex.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
@ExceptionHandler(IllegalArgumentException.class)
public ResponseEntity<String> handleIllegalArgumentException(IllegalArgumentException ex) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST)
.body("Validation Error: " + ex.getMessage());
}
}
package com.nisum.products_producer.configuration;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
@Configuration
public class Config {
@Value("${spring.kafka.topic.product}")
private String topicName;
/* @Bean
CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
System.out.println("----- Registered Beans in Spring Context -----");
for (String beanName : ctx.getBeanDefinitionNames()) {
System.out.println(beanName);
}
};
}*/
/*@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}*/
@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"
));
}
@Bean
public NewTopic createTopic(){
return TopicBuilder
.name(topicName)
.partitions(1)
.replicas(1)
.configs(Map.of(
TopicConfig.RETENTION_MS_CONFIG, "604800000", // Retain messages for 7 days
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE
)).build();
}
@Bean
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
KafkaSender<String, String> kafkaSender = KafkaSender.create(senderOptions);
return new ReactiveKafkaProducerTemplate<>(kafkaSender);
//return new ReactiveKafkaProducerTemplate<>(KafkaSender.create(senderOptions));
}
}
package com.nisum.products_producer.controller;
import com.nisum.products_producer.dto.ProductDto;
import com.nisum.products_producer.exception.InvalidPriceException;
import com.nisum.products_producer.exception.InvalidProductNameException;
import com.nisum.products_producer.model.Product;
import com.nisum.products_producer.service.ProductService;
import com.nisum.products_producer.service.ProductServiceImpl;
import jakarta.validation.Valid;
import jakarta.validation.ValidationException;
import jakarta.validation.Validator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.BeanPropertyBindingResult;
import org.springframework.validation.BindingResult;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.support.WebExchangeBindException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/product")
@Validated
public class ProductController {
private final ProductService productService;
private static final Logger logger = LoggerFactory.getLogger(ProductController.class);
public ProductController(ProductService productService) {
this.productService = productService;
}
@PostMapping("/saveProduct")
public Mono<ResponseEntity<Product>> addProduct(@Valid @RequestBody Mono<Product> productInfo) {
logger.info("Received request to save product");
return productService.saveProduct(productInfo)
.doOnNext(product -> logger.info("Processing product: {}", product))
.doOnSuccess(savedProduct -> logger.info("Product saved successfully: {}", savedProduct))
.doOnError(error -> logger.error("ERROR: Failed to save product - {}", error.getMessage()))// Pass Mono<Product> directly
.map(savedProduct -> ResponseEntity.status(HttpStatus.CREATED).body(savedProduct))
.defaultIfEmpty(ResponseEntity.badRequest().build());
}
@GetMapping("/getAllProduct")
public ResponseEntity<Flux<Product>> getAllProducts() {
logger.info("Fetching all products");
return ResponseEntity.ok(productService.getAllProducts());
}
}
package com.nisum.products_producer.controller;
import com.nisum.products_producer.dto.ResponseDto;
import com.nisum.products_producer.model.Product;
import com.nisum.products_producer.service.KafkaProducerService;
import jakarta.validation.Valid;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
@RestController
@RequestMapping("/api/product")
@Slf4j
public class ProductKafkaController {
private final KafkaProducerService producerService;
public ProductKafkaController(KafkaProducerService producerService) {
this.producerService = producerService;
}
// @PostMapping("/publish")
// public Mono<ResponseEntity<String>> publishProduct(@RequestBody @Valid Product productRequest) {
// return producerService.sendProductToKafka(productRequest)
// .thenReturn(ResponseEntity.ok("Product sent successfully"))
// .onErrorResume(e -> Mono.just(ResponseEntity.badRequest().body("Failed to publish: " + e.getMessage())));
// }
@PostMapping("/publish")
public Mono<ResponseEntity<ResponseDto>> publishProduct(@RequestBody @Valid Product productRequest) {
return producerService.sendProductToKafka(productRequest)
.thenReturn(ResponseEntity.ok(
new ResponseDto(
true,
"Product sent successfully",
productRequest,
LocalDateTime.now().toString(),
HttpStatus.OK.value()
)))
.onErrorResume(e ->
Mono.just(ResponseEntity.badRequest().body(
new ResponseDto(
false,
"Failed to publish: " + e.getMessage(),
null,
LocalDateTime.now().toString(),
HttpStatus.BAD_REQUEST.value()
))));
}
}
package com.nisum.products_producer.dto;
import jakarta.validation.constraints.DecimalMin;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
public class ProductDto {
private String id;
@NotNull
@NotEmpty
@Pattern(regexp = "^(?!\\d+$)[a-zA-Z0-9 ]+$", message = "Product name must contain at least one letter and can include numbers, but cannot be only numbers.")
private String name;
@NotNull(message = "Price cannot be null.")
@DecimalMin(value = "0.01", inclusive = true, message = "Price must be positive.")
private Double price;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return "Product{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", price=" + price + '}';
}
}
package com.nisum.products_producer.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ResponseDto {
private boolean success;
private String message;
private Object data;
private String timestamp;
private int statusCode;
}
package com.nisum.products_producer.exception;
public class DuplicateProductException extends RuntimeException{
public DuplicateProductException(String message){
super(message);
}
}
package com.nisum.products_producer.exception;
public class InvalidPriceException extends RuntimeException{
public InvalidPriceException(String message){
super(message);
}
}
package com.nisum.products_producer.exception;
public class InvalidProductNameException extends RuntimeException{
public InvalidProductNameException(String message) {
super(message);
}
}
package com.nisum.products_producer.exception;
public class KafkaPublishingException extends RuntimeException{
public KafkaPublishingException(String message, Throwable cause) {
super(message, cause);
}
}
package com.nisum.products_producer.model;
import jakarta.validation.constraints.DecimalMin;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.CompoundIndex;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Document(collection = "products")
@CompoundIndex(def = "{'name': 1, 'price': 1}", unique = true)
public class Product {
@Id
private String id;
@Indexed(unique = true) // Ensure product name is unique if needed
@NotNull
@NotEmpty
@Pattern(regexp = "^(?!\\d+$)[a-zA-Z0-9 ]+$", message = "Product name must contain at least one letter and can include numbers, but cannot be only numbers.")
private String name;
@NotNull(message = "Price cannot be null.")
@DecimalMin(value = "0.01", inclusive = true, message = "Price must be positive.")
private Double price;
@Override
public String toString() {
return "Product{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", price=" + price +
'}';
}
}
package com.nisum.products_producer.repository;
import com.nisum.products_producer.model.Product;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface ProductRepository extends ReactiveMongoRepository<Product,String> {
}
package com.nisum.products_producer.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nisum.products_producer.exception.KafkaPublishingException;
import com.nisum.products_producer.model.Product;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import static org.springframework.kafka.support.KafkaHeaders.TOPIC;
@Service
public class KafkaProducerService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerService.class);
private final ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate;
private final ObjectMapper objectMapper;
@Value("${spring.kafka.topic.product}")
private String topic;
public KafkaProducerService(ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate, ObjectMapper objectMapper) {
this.kafkaProducerTemplate = kafkaProducerTemplate;
this.objectMapper = objectMapper;
}
public Mono<Void> sendProductToKafka(Product productRequest) {
return Mono.justOrEmpty(productRequest)
.switchIfEmpty(Mono.error(new IllegalArgumentException("ProductRequest cannot be null")))
.flatMap(request -> {
try {
String productJson = objectMapper.writeValueAsString(request);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, productJson);
return kafkaProducerTemplate.send(record)
.doOnSuccess(senderResult ->
LOGGER.info(" Message sent successfully to Kafka: topic={}, offset={}, partition={}",
topic, senderResult.recordMetadata().offset(), senderResult.recordMetadata().partition())
)
.doOnError(error ->
LOGGER.error(" Failed to send message to Kafka: {}", error.getMessage(), error)
)
.then(); // Converts to Mono<Void>
} catch (JsonProcessingException e) {
return Mono.error(new KafkaPublishingException("Error converting ProductRequest to JSON", e));
}
});
}
}
package com.nisum.products_producer.service;
import com.nisum.products_producer.dto.ProductDto;
import com.nisum.products_producer.model.Product;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface ProductService {
Mono<Product> saveProduct(Mono<Product> product);
Flux<Product> getAllProducts();
}
\ No newline at end of file
package com.nisum.products_producer.service;
import com.nisum.products_producer.dto.ProductDto;
import com.nisum.products_producer.exception.DuplicateProductException;
import com.nisum.products_producer.model.Product;
import com.nisum.products_producer.repository.ProductRepository;
import com.nisum.products_producer.util.AppUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class ProductServiceImpl implements ProductService {
private final ProductRepository productRepository;
private static final Logger logger = LoggerFactory.getLogger(ProductServiceImpl.class);
public ProductServiceImpl(ProductRepository productRepository) {
this.productRepository = productRepository;
}
@Override
public Mono<Product> saveProduct(Mono<Product> productMono) {
logger.info("Received request to save product.");
return productMono
.flatMap(product -> productRepository.save(product)) // Save the product
.doOnSuccess(saved -> logger.info("Product saved successfully: {}", saved))
.onErrorResume(DuplicateKeyException.class, ex -> {
logger.error("Duplicate product detected: {}", ex.getMessage());
return Mono.error(new DuplicateProductException("Product already exists with the same name and price!"));
})
.doOnError(ex -> logger.error("Failed to save product: {}", ex.getMessage()));
}
@Override
public Flux<Product> getAllProducts() {
logger.info("Received request to fetch all products.");
return productRepository.findAll()
.doOnNext(product -> logger.info("Retrieved product: {}", product))
.doOnComplete(() -> logger.info("Successfully fetched all products."))
.doOnError(ex -> logger.error("Failed to fetch products: {}", ex.getMessage()));
}
}
package com.nisum.products_producer.util;
import com.nisum.products_producer.dto.ProductDto;
import com.nisum.products_producer.model.Product;
import org.springframework.beans.BeanUtils;
public class AppUtil {
public static ProductDto toDto(Product product){
ProductDto dto= new ProductDto();
BeanUtils.copyProperties(product,dto);
return dto;
}
public static Product toEntity(ProductDto productDto){
Product product = new Product();
BeanUtils.copyProperties(productDto, product);
return product;
}
}
spring.application.name=products-producer
server :
port : 8081
spring:
application:
name: PRODUCTS-PRODUCER
data:
mongodb:
database : productdb
host : localhost
port : 27017
auto-index-creation: true
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic:
product: product-topic
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/ # Eureka Server URL
register-with-eureka: true # Enables registration
fetch-registry: true # Enables service discovery
instance:
hostname: localhost
management:
tracing:
sampling:
probability: 1.0
\ No newline at end of file
package com.nisum.products_producer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ProductsProducerApplicationTests {
@Test
void contextLoads() {
}
}
package com.nisum.products_producer.controller;
import com.nisum.products_producer.advice.GlobalExceptionHandler;
import com.nisum.products_producer.dto.ProductDto;
import com.nisum.products_producer.exception.DuplicateProductException;
import com.nisum.products_producer.exception.InvalidProductNameException;
import com.nisum.products_producer.model.Product;
import com.nisum.products_producer.service.ProductService;
import com.nisum.products_producer.service.ProductServiceImpl;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.bind.support.WebExchangeBindException;
import reactor.core.publisher.Mono;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.*;
import static org.assertj.core.api.Assertions.assertThat;
@WebFluxTest(ProductController.class)
@ExtendWith(MockitoExtension.class)
@Import(GlobalExceptionHandler.class)
class ProductControllerTest {
private WebTestClient webTestClient;
@MockBean
private ProductService productService;
@Autowired
private ProductController productController;
@BeforeEach
void setUp() {
productController = new ProductController(productService);
webTestClient = WebTestClient.bindToController(productController)
.controllerAdvice(new GlobalExceptionHandler())
.build();
}
@Test
public void addProductTest_Success(){
// Given: A duplicate product DTO
Product product = Product.builder()
.id("105")
.name("Laptop")
.price(2500.00)
.build();
when(productService.saveProduct(any())).thenReturn(Mono.just(product));
webTestClient.post()
.uri("/product/saveProduct")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(product)
.exchange()
.expectStatus().isCreated()
.expectBody(Product.class)
.consumeWith(response -> {
Product responseBody = response.getResponseBody();
assertNotNull(responseBody, "Response body should not be null");
assertEquals("105", responseBody.getId(), "Product ID mismatch");
assertEquals("Laptop", responseBody.getName(), "Product name mismatch");
assertEquals(2500.0, responseBody.getPrice(), "Product price mismatch");
});
// Verify that the service method was called once
verify(productService, times(1)).saveProduct(any());
}
/*@Test
void testSaveProduct_InvalidName_OnlyNumbers() {
Product invalidProduct = new Product();
invalidProduct.setId("102");
invalidProduct.setName("12345"); // Only numbers
invalidProduct.setPrice(2200.0);
webTestClient.post()
.uri("/product/saveProduct")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(invalidProduct)
.exchange()
.expectStatus().isBadRequest();
}*/
/* @Test
void testSaveProduct_InvalidName_OnlyNumbers() {
Product invalidProduct = new Product();
invalidProduct.setId("102");
invalidProduct.setName("12345"); // Invalid name
invalidProduct.setPrice(2200.0);
*//* WebExchangeBindException validationException =
new WebExchangeBindException(null, null);
validationException.rejectValue("name", "Invalid name", "Name cannot be only numbers");
when(productService.saveProduct(any())).thenReturn(Mono.error(validationException));*//*
webTestClient.post().uri("/product/saveProduct")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(invalidProduct)
.exchange()
.expectStatus().isBadRequest()
.expectBody(String.class)
.consumeWith(response ->
System.out.println("Response Body: " + response.getResponseBody())
);
}*/
@Test
public void testAddProduct_whenProductServiceFails_shouldReturnBadRequest() {
// Arrange
Product invalidProduct = Product.builder()
.id("102")
.name("1234")
.price(2200.0)
.build();
//when(productService.saveProduct(any(Mono.class))).thenReturn(Mono.error(new RuntimeException("Service failure")));
// Act & Assert
webTestClient.post().uri("/product/saveProduct")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(invalidProduct)
.exchange()
.expectStatus().isBadRequest()
.expectBody()
.jsonPath("$.name").isEqualTo("Product name must contain at least one letter and can include numbers, but cannot be only numbers.");
}
/*void testSaveProduct_DuplicateProduct() {
ProductDto duplicateProduct = new ProductDto();
duplicateProduct.setId("105");
duplicateProduct.setName("Laptop");
duplicateProduct.setPrice(2500.0);
Mockito.when(productService.saveProduct(Mockito.any()))
.thenReturn(Mono.error(new DuplicateProductException("Product already exists with the same name and price!")));
// When: Sending a POST request to save the duplicate product
webTestClient.post()
.uri("/product/saveProduct")
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(duplicateProduct), ProductDto.class)
.exchange()
// Then: Expecting HTTP 409 Conflict (because of @ExceptionHandler)
.expectStatus().isEqualTo(409)
.expectBody()
.consumeWith(response -> System.out.println("Response: " + new String(response.getResponseBody()))) // Print response for debugging
.jsonPath("$").isEqualTo("Product already exists with the same name and price!"); // Validate error message
}*/
@Test
void testSaveProduct_DuplicateProduct() {
Product duplicateProduct = new Product();
duplicateProduct.setId("105");
duplicateProduct.setName("Laptop");
duplicateProduct.setPrice(2500.0);
/*Mockito.when(productService.saveProduct(Mockito.any()))
.thenAnswer(invocation -> {
throw new DuplicateProductException("Product already exists with the same name and price!");
});*/
Mockito.when(productService.saveProduct(Mockito.any()))
.thenThrow(new DuplicateProductException("Product already exists with the same name and price!"));
webTestClient.post()
.uri("/product/saveProduct")
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(duplicateProduct), Product.class)
.exchange()
.expectStatus().isEqualTo(HttpStatus.CONFLICT) // Expect 409
.expectBody()
.jsonPath("$.message").isEqualTo("Product already exists with the same name and price!");
}
@Test
void testDuplicateProductExceptionHandler() {
GlobalExceptionHandler exceptionHandler = new GlobalExceptionHandler();
ResponseEntity<Map<String, String>> response =
exceptionHandler.handleDuplicateProductException(new DuplicateProductException("Duplicate Entry"));
Assertions.assertEquals(HttpStatus.CONFLICT, response.getStatusCode());
Assertions.assertEquals("Duplicate Entry", response.getBody().get("message"));
}
}
package com.nisum.products_producer.controller;
import com.nisum.products_producer.model.Product;
import com.nisum.products_producer.service.KafkaProducerService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@WebFluxTest
@ExtendWith(MockitoExtension.class)
public class ProductKafkaControllerTest {
//@MockitoBean
@Mock
private KafkaProducerService producerService;
@InjectMocks
private ProductKafkaController productKafkaController;
@Mock
private WebTestClient webTestClient;
@BeforeEach
void setUp() {
webTestClient = WebTestClient.bindToController(productKafkaController).build();
}
@Test
void publishProduct_Success() {
Product product = Product.builder().id("101").name("Laptop").price(1200.0).build();
when(producerService.sendProductToKafka(any(Product.class))).thenReturn(Mono.empty());
webTestClient.post().uri("/api/product/publish")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(product)
.exchange()
.expectStatus().isOk()
.expectBody(String.class)
.isEqualTo("Product sent successfully");
}
@Test
void publishProduct_Failure() {
Product product = Product.builder().id("101").name("Laptop").price(1200.0).build();
when(producerService.sendProductToKafka(any(Product.class)))
.thenReturn(Mono.error(new RuntimeException("Kafka error")));
webTestClient.post().uri("/api/product/publish")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(product)
.exchange()
.expectStatus().isBadRequest()
.expectBody(String.class)
.isEqualTo("Failed to publish: Kafka error");
}
}
package com.nisum.products_producer.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nisum.products_producer.model.Product;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.*;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderResult;
import reactor.test.StepVerifier;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;
public class KafkaProducerServiceTest {
@Mock
private ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate;
@Mock
private ObjectMapper objectMapper;
@InjectMocks
private KafkaProducerService kafkaProducerService;
@Captor
private ArgumentCaptor<ProducerRecord<String, String>> producerRecordCaptor;
private Product mockProduct;
@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
kafkaProducerService = new KafkaProducerService(kafkaProducerTemplate, objectMapper);
// Simulate property injection (normally injected by Spring)
java.lang.reflect.Field topicField;
try {
topicField = KafkaProducerService.class.getDeclaredField("topic");
topicField.setAccessible(true);
topicField.set(kafkaProducerService, "product_topic");
} catch (Exception e) {
throw new RuntimeException(e);
}
mockProduct = Product.builder()
.id("p123")
.name("TestProduct")
.price(99.99)
.build();
}
}
package com.nisum.products_producer.service;
import com.nisum.products_producer.dto.ProductDto;
import com.nisum.products_producer.model.Product;
import com.nisum.products_producer.repository.ProductRepository;
import com.nisum.products_producer.util.AppUtil;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.web.bind.support.WebExchangeBindException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.hamcrest.Matchers.any;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
public class ProductServiceTest {
@Mock
private ProductRepository productRepository;
@InjectMocks
private ProductService productService;
private ProductDto productDto;
private Product product;
@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
productService = new ProductServiceImpl(productRepository); // Inject mock manually
// Create Sample Product DTO
product = new Product();
product.setId("101");
product.setName("iPhone 14");
product.setPrice(999.99);
// Convert DTO to Entity
// product = AppUtil.toEntity(productDto);
}
@Test
void saveProductTest() {
// Mock repository save behavior
when(productRepository.save(Mockito.any(Product.class))).thenReturn(Mono.just(product));
// Call Service Method
Mono<Product> result = productService.saveProduct(Mono.just(product));
// Verify Result using StepVerifier
StepVerifier.create(result)
.expectNextMatches(savedProduct ->
savedProduct.getId().equals("101") &&
savedProduct.getName().equals("iPhone 14") &&
savedProduct.getPrice().equals(999.99)
)
.verifyComplete();
}
@Test
void testSaveProduct_FailsWhenNameIsNull() {
Product product = new Product();
product.setName(null); // Invalid: Name is null
product.setPrice(100.0);
Mono<Product> productMono = Mono.just(product);
StepVerifier.create(productService.saveProduct(productMono))
.expectErrorMatches(throwable -> throwable instanceof NullPointerException)
.verify();
}
@Test
void testSaveProduct_FailsWhenNameIsOnlyNumbers() {
Product product = new Product();
product.setName("123456"); // Invalid: Name contains only numbers
product.setPrice(200.0);
Mono<Product> productMono = Mono.just(product);
StepVerifier.create(productService.saveProduct(productMono))
.expectErrorMatches(throwable -> throwable instanceof Exception)
.verify();
}
@Test
void getAllProductsTest() {
// Mock repository findAll() behavior
when(productRepository.findAll()).thenReturn(Flux.just(product));
// Call Service Method
Flux<Product> result = productService.getAllProducts();
// Verify Result using StepVerifier
StepVerifier.create(result)
.expectNextMatches(product ->
product.getId().equals("101") &&
product.getName().equals("iPhone 14") &&
product.getPrice().equals(999.99)
)
.verifyComplete();
// Verify Repository Interaction
verify(productRepository, times(1)).findAll();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment