chore: Converted to main method Invocation

parent 21520c62
package com.nisum.inventory; package com.nisum.inventory;
import com.nisum.inventory.service.InventoryService;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.springframework.boot.WebApplicationType; import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.builder.SpringApplicationBuilder;
import java.util.concurrent.TimeoutException;
@SpringBootApplication @SpringBootApplication
public class InventoryProcessApiApplication { public class InventoryProcessApiApplication {
public static void main(String[] args) { public static void main(String[] args) throws TimeoutException, StreamingQueryException {
new SpringApplicationBuilder(InventoryProcessApiApplication.class) new SpringApplicationBuilder(InventoryProcessApiApplication.class)
.web(WebApplicationType.REACTIVE) .web(WebApplicationType.REACTIVE)
.run(args); .run(args);
InventoryService inventoryService= new InventoryService();
String topicName="test-inventory3";
String batchId=inventoryService.readFromJsonAndPushToKafka(topicName);
inventoryService.readFromKafkaTransformIntoMongoDB(batchId,topicName);
} }
......
package com.nisum.inventory.controller;
import com.nisum.inventory.dto.KafkaPublisherResponse;
import com.nisum.inventory.dto.RawInventory;
import com.nisum.inventory.dto.Root;
import com.nisum.inventory.service.InventoryService;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import java.util.concurrent.TimeoutException;
@Slf4j
@RestController
@RequestMapping(path = "/inventory")
public class InventoryController {
@Autowired
private InventoryService inventoryService;
@PostMapping(value = "/upload-data", consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseStatus(value = HttpStatus.OK)
public Flux<KafkaPublisherResponse> upload(@RequestPart("files") Flux<FilePart> filePartFlux){
return inventoryService.saveAndProcess(filePartFlux);
}
@GetMapping(value = "/get",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseStatus(value = HttpStatus.OK)
public Flux<RawInventory> getProductData(@RequestParam("batchId") String batchId) throws TimeoutException, StreamingQueryException {
return inventoryService.getProcessedInventory(batchId);
}
}
package com.nisum.inventory.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class FileUploadAttributes {
private String fileName;
private String fileContent;
}
package com.nisum.inventory.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class KafkaPublisherResponse {
private String topicName;
private String batchId;
private String status;
}
...@@ -43,91 +43,19 @@ import java.util.concurrent.TimeoutException; ...@@ -43,91 +43,19 @@ import java.util.concurrent.TimeoutException;
public class InventoryService implements Serializable { public class InventoryService implements Serializable {
@Autowired
private SparkSession sparkSession;
@Autowired
private StructType schema;
public String readFromJsonAndPushToKafka(String topicName) throws TimeoutException, StreamingQueryException {
public Flux<KafkaPublisherResponse> saveAndProcess(Flux<FilePart> filePartFlux) {
return filePartFlux.flatMap(filePart ->
filePart.content().map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.capacity()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return FileUploadAttributes.builder()
.fileContent(new String(bytes, StandardCharsets.UTF_8))
.fileName(filePart.filename())
.build();
})
.map(content -> saveAsFile(content))
.map(fileName -> readAndPublishDataToKafka(fileName))
.map(kafkaPublisherResponse -> kafkaPublisherResponse));
}
private String saveAsFile(FileUploadAttributes fileUploadAttributes) {
try {
FileUtils.deleteDirectory(new File("src/main/resources/data/CheckPointLocation"));
} catch (IOException e) {
log.error("Error while deleting checkpoint location dir {}", e.getMessage());
}
String batchId= new SimpleDateFormat("HHmmssSSS").format(new Date()); String batchId= new SimpleDateFormat("HHmmssSSS").format(new Date());
String dir="src/main/resources/data/"+batchId;
String fileName = dir+ File.separator + fileUploadAttributes.getFileName();
try {
FileUtils.forceMkdir(new File(dir));
FileUtils.writeStringToFile(new File(fileName)
, fileUploadAttributes.getFileContent(), StandardCharsets.UTF_8, true);
return batchId;
} catch (IOException e) {
if (e instanceof IOException)
log.error("Error while creating CSV... {}", e.getMessage());
return StringUtils.EMPTY;
}
}
private KafkaPublisherResponse readAndPublishDataToKafka(String batchId) {
if (StringUtils.isNotEmpty(batchId)) {
String topicName = "test-inventory2";
try {
populateInventoryDataUsingDFStruct(batchId, topicName);
return KafkaPublisherResponse.builder()
.topicName(topicName)
.batchId(batchId)
.status("File data pushed to Kafka topic.").build();
} catch (TimeoutException | StreamingQueryException e) {
if (e instanceof TimeoutException)
log.error("TimeoutException... {}", e.getMessage());
if (e instanceof StreamingQueryException)
log.error("StreamingQueryException {}", e.getMessage());
return KafkaPublisherResponse.builder().topicName(topicName).status(e.getMessage()).build();
}
}else
{
return KafkaPublisherResponse.builder().build();
}
}
public void populateInventoryDataUsingDFStruct(String batchId,String topicName) throws TimeoutException, StreamingQueryException {
StructType structType = Encoders.bean(Root.class).schema(); StructType structType = Encoders.bean(Root.class).schema();
Dataset<Row> df=sparkSession.readStream() Dataset<Row> df=sparkSession().readStream()
.format("json") .format("json")
.option("multiline",true) .option("multiline",true)
.schema(structType) .schema(structType)
.load("src/main/resources/data/"+batchId); .load("src/main/resources/json");
//df.printSchema(); //df.printSchema();
...@@ -147,18 +75,18 @@ public class InventoryService implements Serializable { ...@@ -147,18 +75,18 @@ public class InventoryService implements Serializable {
.start() .start()
.awaitTermination(5000); .awaitTermination(5000);
return batchId;
} }
public Flux<RawInventory> getProcessedInventory(String batchId) throws TimeoutException, StreamingQueryException { public void readFromKafkaTransformIntoMongoDB(String batchId,String topicName) throws TimeoutException, StreamingQueryException {
Dataset<Row> df = sparkSession Dataset<Row> df = sparkSession()
.readStream() .readStream()
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "HYD-LAP-00484.corp.nisum.com:9092") .option("kafka.bootstrap.servers", "HYD-LAP-00484.corp.nisum.com:9092")
.option("subscribe", "test-inventory2") .option("subscribe", topicName)
.option("startingOffsets", "earliest") .option("startingOffsets", "earliest")
.option("startingOffsets", "earliest") .option("startingOffsets", "earliest")
.option("failOnDataLoss", "false") .option("failOnDataLoss", "false")
...@@ -184,7 +112,7 @@ public class InventoryService implements Serializable { ...@@ -184,7 +112,7 @@ public class InventoryService implements Serializable {
.as(Encoders.bean(RawInventory.class)); .as(Encoders.bean(RawInventory.class));
CollectionAccumulator<RawInventory> rawInventories=sparkSession.sparkContext().collectionAccumulator("accList"); CollectionAccumulator<RawInventory> rawInventories=sparkSession().sparkContext().collectionAccumulator("accList");
StreamingQuery streamingQuery=inventoryList StreamingQuery streamingQuery=inventoryList
.writeStream() .writeStream()
...@@ -227,18 +155,18 @@ public class InventoryService implements Serializable { ...@@ -227,18 +155,18 @@ public class InventoryService implements Serializable {
streamingQuery.awaitTermination(); streamingQuery.awaitTermination();
try {
log.info("rawInventories : {}",rawInventories);
log.info("Awaiting main thread ......");
Thread.sleep(2000);
log.info("Resuming main thread ......");
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.fromIterable(rawInventories.value());
} }
private SparkSession sparkSession()
{
return SparkSession
.builder()
.appName("Product info API")
.master("local[1]")
.getOrCreate();
}
} }
package com.nisum.inventory.utils;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
@Configuration
@EnableReactiveMongoRepositories(basePackages = "com.nisum.inventory")
public class ReactiveMongoConfig extends AbstractReactiveMongoConfiguration{
@Override
public MongoClient reactiveMongoClient() {
return MongoClients.create();
}
@Override
protected String getDatabaseName() {
return "orderdb";
}
}
\ No newline at end of file
package com.nisum.inventory.utils;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SchemaConfig {
@Bean
public StructType schema()
{
StructField[] structFields= new StructField[9];
structFields[0]= new StructField("imageUrl", DataTypes.StringType,true, Metadata.empty());
structFields[1]= new StructField("filename",DataTypes.StringType,true,Metadata.empty());
structFields[2]= new StructField("imageHash",DataTypes.StringType,true,Metadata.empty());
structFields[3]= new StructField("name",DataTypes.StringType,true,Metadata.empty());
structFields[4]= new StructField("description",DataTypes.StringType,true,Metadata.empty());
structFields[5]= new StructField("slug",DataTypes.StringType,true,Metadata.empty());
structFields[6]= new StructField("manufacturer",DataTypes.StringType,true,Metadata.empty());
structFields[7]= new StructField("itemType",DataTypes.StringType,true,Metadata.empty());
structFields[8]= new StructField("productImg",DataTypes.StringType,true,Metadata.empty());
return new StructType(structFields);
}
}
package com.nisum.inventory.utils;
import org.apache.spark.sql.SparkSession;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SparkConfiguaration {
@Bean
public SparkSession sparkSession()
{
return SparkSession
.builder()
.appName("Product info API")
.master("local[1]")
.getOrCreate();
}
}
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"8d4c9cc1-0b9c-4041-806e-06ec99fc1b98"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1687153256468,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}
\ No newline at end of file
v1
{"path":"file:///D:/Work/GitCodeBase/1606/inventory-process-api/src/main/resources/json/inventory_sample.json","timestamp":1686745257596,"batchId":0}
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment