chore: Converted to main method invocation.

parent 423649e6
package com.nisum.product;
import com.nisum.product.service.ProductsOfferService;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import java.util.concurrent.TimeoutException;
@SpringBootApplication
public class ProductInfoApiApplication {
public static void main(String[] args) {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
new SpringApplicationBuilder(ProductInfoApiApplication.class)
.web(WebApplicationType.REACTIVE)
.run(args);
ProductsOfferService productsOfferService= new ProductsOfferService();
String batchId=productsOfferService.readFromCsvAndPushToKafka("test-product3");
productsOfferService.readFromKafkaTransformIntoCSV(batchId);
}
......
package com.nisum.product.controller;
import com.nisum.product.dto.KafkaPublisherResponse;
import com.nisum.product.dto.ProductOffers;
import com.nisum.product.service.ProductsOfferService;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import java.util.concurrent.TimeoutException;
@Slf4j
@RestController
@RequestMapping(path = "/product")
public class ProductController {
@Autowired
private ProductsOfferService productsOfferService;
@PostMapping(value = "/upload-data", consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseStatus(value = HttpStatus.OK)
public Flux<KafkaPublisherResponse> upload(@RequestPart("files") Flux<FilePart> filePartFlux){
return productsOfferService.saveAndProcess(filePartFlux);
}
@GetMapping(value = "/get", consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseStatus(value = HttpStatus.OK)
public Flux<ProductOffers> getProductData(@RequestParam("batchId") String batchId) throws TimeoutException, StreamingQueryException {
return productsOfferService.getProductOffers(batchId);
}
}
package com.nisum.product.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class FileUploadAttributes {
private String fileName;
private String fileContent;
}
package com.nisum.product.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class KafkaPublisherResponse {
private String topicName;
private String batchId;
private String status;
}
package com.nisum.product.service;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
import com.nisum.product.dto.FileUploadAttributes;
import com.nisum.product.dto.KafkaPublisherResponse;
import com.nisum.product.dto.ProductOffers;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
......@@ -19,9 +11,6 @@ import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.*;
import org.apache.spark.util.CollectionAccumulator;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.codec.multipart.FilePart;
......@@ -41,98 +30,13 @@ import java.util.concurrent.TimeoutException;
public class ProductsOfferService implements Serializable {
@Autowired
SparkSession sparkSession;
public Flux<KafkaPublisherResponse> saveAndProcess(Flux<FilePart> filePartFlux) {
return filePartFlux.flatMap(filePart ->
filePart.content().map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.capacity()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
public String readFromCsvAndPushToKafka(String topicName) throws TimeoutException, StreamingQueryException {
return FileUploadAttributes.builder()
.fileContent(new String(bytes, StandardCharsets.UTF_8))
.fileName(filePart.filename())
.build();
})
.map(content -> {
return saveAsFile(content);
})
.map(fileName -> {
return readAndPublishDataToKafka(fileName);
})
.map(kafkaPublisherResponse -> kafkaPublisherResponse));
}
private String saveAsFile(FileUploadAttributes fileUploadAttributes) {
try {
FileUtils.deleteDirectory(new File("src/main/resources/data/CheckPointLocation"));
} catch (IOException e) {
log.error("Error while deleting checkpoint location dir {}", e.getMessage());
}
String batchId= new SimpleDateFormat("HHmmssSSS").format(new Date());
String dir="src/main/resources/data/"+batchId;
String fileName = dir+ File.separator + fileUploadAttributes.getFileName();
try {
FileUtils.forceMkdir(new File(dir));
FileUtils.writeStringToFile(new File(fileName)
, fileUploadAttributes.getFileContent(), StandardCharsets.UTF_8, true);
return batchId;
} catch (IOException e) {
if (e instanceof IOException)
log.error("Error while creating CSV... {}", e.getMessage());
return StringUtils.EMPTY;
}
}
private KafkaPublisherResponse readAndPublishDataToKafka(String batchId) {
if (StringUtils.isNotEmpty(batchId)) {
String topicName = "test-product2";
try {
populateProductsOfferPriceUsingDFStruct(batchId, topicName);
return KafkaPublisherResponse.builder()
.topicName(topicName)
.batchId(batchId)
.status("File data pushed to Kafka topic").build();
} catch (TimeoutException | StreamingQueryException e) {
if (e instanceof TimeoutException)
log.error("TimeoutException... {}", e.getMessage());
if (e instanceof StreamingQueryException)
log.error("StreamingQueryException {}", e.getMessage());
return KafkaPublisherResponse.builder().topicName(topicName).status(e.getMessage()).build();
}
}else
{
return KafkaPublisherResponse.builder().build();
}
}
public void populateProductsOfferPriceUsingDF()
{
sparkSession.sqlContext().udf()
.register("calculateOfferPrice",(Double price, Double discountPercentage)->
((100-discountPercentage)*price)/100, DataTypes.DoubleType);
Dataset<Row> df=sparkSession.read().csv("src/main/resources/data/products_without_header.csv").toDF();
df.withColumn("discount(%)",functions.lit(10.50))
.withColumn("OfferPrice",functions.callUDF("calculateOfferPrice",
functions.col("_c3").cast(DataTypes.DoubleType)
,functions.col("discount(%)").cast(DataTypes.DoubleType)))
.write()
.mode(SaveMode.Overwrite)
.csv("src/main/resources/result/df/product_offer_price");
}
public void populateProductsOfferPriceUsingDFStruct(String batchId,String topicName) throws TimeoutException, StreamingQueryException {
sparkSession.sqlContext().udf()
sparkSession().sqlContext().udf()
.register("calculateOfferPrice",(Double price, Double discountPercentage)->
((100-discountPercentage)*price)/100, DataTypes.DoubleType);
......@@ -144,11 +48,11 @@ public class ProductsOfferService implements Serializable {
StructType structType= new StructType(structFields);
Dataset<Row> df=sparkSession.readStream()
Dataset<Row> df=sparkSession().readStream()
.format("csv")
.option("header",true)
.schema(structType)
.load("src/main/resources/data/"+batchId);
.load("src/main/resources/csv");
Dataset<Row> rowTransformDf=df.withColumn("discountPercentage",functions.lit(10.50))
.withColumn("offerPrice",functions.callUDF("calculateOfferPrice",
......@@ -158,7 +62,7 @@ public class ProductsOfferService implements Serializable {
.withColumn("batchId",functions.lit(batchId));
rowTransformDf.selectExpr("CAST(BatchId AS STRING) AS key", "to_json(struct(*)) AS value")
rowTransformDf.selectExpr("CAST(batchId AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream()
.format("kafka")
.trigger(Trigger.Once())
......@@ -167,18 +71,20 @@ public class ProductsOfferService implements Serializable {
.option("checkpointLocation", "src/main/resources/data/CheckPointLocation")
.start()
.awaitTermination(5000);
return batchId;
}
public Flux<ProductOffers> getProductOffers(String batchId) throws TimeoutException, StreamingQueryException {
public void readFromKafkaTransformIntoCSV(String batchId) throws TimeoutException, StreamingQueryException {
Dataset<Row> df = sparkSession
Dataset<Row> df = sparkSession()
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "HYD-LAP-00484.corp.nisum.com:9092")
.option("subscribe", "test-product2")
.option("subscribe", "test-product3")
.option("startingOffsets", "earliest")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false").
......@@ -203,61 +109,24 @@ public class ProductsOfferService implements Serializable {
.trigger(Trigger.Once())
.format("csv")
.option("checkpointLocation", "src/main/resources/data/checkpointLocation/StreamingJob")
.option("path","src/main/resources/data/"+batchId)
.option("path","src/main/resources/processed/"+batchId)
.outputMode(OutputMode.Append())
.start();
CollectionAccumulator<ProductOffers> offerProductList=sparkSession.sparkContext().collectionAccumulator("offerProductList");
StreamingQuery query=offersDataset
.writeStream()
.trigger(Trigger.Once())
.foreach(new ForeachWriter<ProductOffers>() {
@Override
public boolean open(long partitionId, long epochId) {
return true;
}
@Override
public void process(ProductOffers value) {
offerProductList.add(value);
}
@Override
public void close(Throwable errorOrNull) {
}
}).start();
query.awaitTermination();
streamingQuery.awaitTermination();
try {
log.info("Offer Product List : {}",offerProductList);
log.info("Awaiting main thread ......");
Thread.sleep(2000);
log.info("Resuming main thread ......");
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.fromIterable(offerProductList.value());
}
private SparkSession sparkSession()
{
return SparkSession
.builder()
.appName("Product info API")
.master("local[1]")
.getOrCreate();
}
......
package com.nisum.product.utils;
import org.apache.spark.sql.SparkSession;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SparkConfiguaration {
@Bean
public SparkSession sparkSession()
{
return SparkSession
.builder()
.appName("Product info API")
.master("local[1]")
.getOrCreate();
}
}
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"c8412340-50f9-4a63-9853-dbbdedab3464"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1687152081277,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"test-product3":{"0":12}}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"ca864002-2ff4-4e9c-b99d-c5e643218d72"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1687152074645,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}
\ No newline at end of file
v1
{"path":"file:///D:/Work/GitCodeBase/1606/product-info-api/src/main/resources/csv/products.csv","timestamp":1686309281654,"batchId":0}
\ No newline at end of file
{
"info": {
"_postman_id": "b42414db-7bec-46fa-840f-6b3940a5391e",
"name": "Product Info API",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"item": [
{
"name": "CreateProductData",
"request": {
"method": "POST",
"header": [],
"body": {
"mode": "formdata",
"formdata": [
{
"key": "files",
"type": "file",
"src": "/D:/data/product.csv"
}
],
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "http://localhost:8080/product/upload-data",
"protocol": "http",
"host": [
"localhost"
],
"port": "8080",
"path": [
"product",
"upload-data"
]
}
},
"response": []
},
{
"name": "getProcessedProductData",
"protocolProfileBehavior": {
"disableBodyPruning": true
},
"request": {
"method": "GET",
"header": [],
"body": {
"mode": "formdata",
"formdata": [
{
"key": "files",
"type": "file",
"src": "/D:/data/products.csv"
}
],
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "http://localhost:8080/product/get/?batchId=145930354",
"protocol": "http",
"host": [
"localhost"
],
"port": "8080",
"path": [
"product",
"get",
""
],
"query": [
{
"key": "batchId",
"value": "145930354"
}
]
}
},
"response": []
}
]
}
\ No newline at end of file
v1
{"path":"file:///D:/Work/GitCodeBase/1606/product-info-api/src/main/resources/processed/105106662/part-00000-b705b0c8-27bb-4966-8a6b-2f49a5ddf6a0-c000.csv","size":225,"isDir":false,"modificationTime":1687152083597,"blockReplication":1,"blockSize":33554432,"action":"add"}
\ No newline at end of file
105106662,Apple Inc;Ipad category,10.5,01,Apple IPad,35800.0,40000.0,48225fdb-b0e1-46ee-9517-1f1826c92c5e
105106662,Apple Inc;Laptop Category,10.5,02,Apple Mac Book Pro,89500.0,100000.0,48225fdb-b0e1-46ee-9517-1f1826c92c5e
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment