chore: Spark Data accumulation while streaming

parent 52e2752a
package com.nisum.inventory.controller; package com.nisum.inventory.controller;
import com.nisum.inventory.dto.KafkaPublisherResponse; import com.nisum.inventory.dto.KafkaPublisherResponse;
import com.nisum.inventory.dto.RawInventory;
import com.nisum.inventory.dto.Root; import com.nisum.inventory.dto.Root;
import com.nisum.inventory.service.InventoryService; import com.nisum.inventory.service.InventoryService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -33,7 +34,7 @@ public class InventoryController { ...@@ -33,7 +34,7 @@ public class InventoryController {
@GetMapping(value = "/get",produces = MediaType.TEXT_EVENT_STREAM_VALUE) @GetMapping(value = "/get",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseStatus(value = HttpStatus.OK) @ResponseStatus(value = HttpStatus.OK)
public Flux<Root> getProductData(@RequestParam("batchId") String batchId) throws TimeoutException, StreamingQueryException { public Flux<RawInventory> getProductData(@RequestParam("batchId") String batchId) throws TimeoutException, StreamingQueryException {
return inventoryService.getProcessedInventory(batchId); return inventoryService.getProcessedInventory(batchId);
} }
......
...@@ -6,11 +6,13 @@ import lombok.Builder; ...@@ -6,11 +6,13 @@ import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data @Data
@Builder @Builder
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
public class ImageCredit { public class ImageCredit implements Serializable {
private String artist; private String artist;
private String link; private String link;
} }
...@@ -19,6 +19,7 @@ import org.apache.spark.sql.streaming.StreamingQuery; ...@@ -19,6 +19,7 @@ import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger; import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.*; import org.apache.spark.sql.types.*;
import org.apache.spark.util.CollectionAccumulator;
import org.bson.codecs.configuration.CodecRegistries; import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry; import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider; import org.bson.codecs.pojo.PojoCodecProvider;
...@@ -150,7 +151,7 @@ public class InventoryService implements Serializable { ...@@ -150,7 +151,7 @@ public class InventoryService implements Serializable {
} }
public Flux<Root> getProcessedInventory(String batchId) throws TimeoutException, StreamingQueryException { public Flux<RawInventory> getProcessedInventory(String batchId) throws TimeoutException, StreamingQueryException {
Dataset<Row> df = sparkSession Dataset<Row> df = sparkSession
...@@ -183,6 +184,8 @@ public class InventoryService implements Serializable { ...@@ -183,6 +184,8 @@ public class InventoryService implements Serializable {
.as(Encoders.bean(RawInventory.class)); .as(Encoders.bean(RawInventory.class));
CollectionAccumulator<RawInventory> rawInventories=sparkSession.sparkContext().collectionAccumulator("accList");
StreamingQuery streamingQuery=inventoryList StreamingQuery streamingQuery=inventoryList
.writeStream() .writeStream()
.trigger(Trigger.Once()) .trigger(Trigger.Once())
...@@ -209,6 +212,7 @@ public class InventoryService implements Serializable { ...@@ -209,6 +212,7 @@ public class InventoryService implements Serializable {
@Override @Override
public void process(RawInventory value) { public void process(RawInventory value) {
rawInventories.add(value);
InsertOneResult insertOneResult=processedInventoryMongoCollection.insertOne(InventoryMapper.INSTANCE.dtoToEntity(value,UUID.randomUUID().toString())); InsertOneResult insertOneResult=processedInventoryMongoCollection.insertOne(InventoryMapper.INSTANCE.dtoToEntity(value,UUID.randomUUID().toString()));
log.info("Inserted Result : {} ",insertOneResult); log.info("Inserted Result : {} ",insertOneResult);
} }
...@@ -219,19 +223,19 @@ public class InventoryService implements Serializable { ...@@ -219,19 +223,19 @@ public class InventoryService implements Serializable {
} }
}).start(); }).start();
/*StreamingQuery streamingQuery=inventoryList
.writeStream()
.trigger(Trigger.Once())
.format("json")
.option("multiline",true)
.option("checkpointLocation", "src/main/resources/data/checkpointLocation/StreamingJob")
.option("path","src/main/resources/data/"+batchId)
.outputMode(OutputMode.Append())
.start();*/
streamingQuery.awaitTermination(); streamingQuery.awaitTermination();
return Flux.empty();
try {
log.info("rawInventories : {}",rawInventories);
log.info("Awaiting main thread ......");
Thread.sleep(2000);
log.info("Resuming main thread ......");
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.fromIterable(rawInventories.value());
} }
......
v1
{"path":"file:///D:/Work/events/inventory-process-api/src/main/resources/data/194409539/part-00000-b646badc-bfc7-4cec-b945-b25a5202338b-c000.json","size":607,"isDir":false,"modificationTime":1686752103529,"blockReplication":1,"blockSize":33554432,"action":"add"}
\ No newline at end of file
{"added":1485723766805,"description":"enim corporis voluptatibus laudantium possimus alias dolorem voluptatem similique aut aliquam voluptatem voluptatem omnis id consequatur","filename":"400_002e1ecb8bd2.jpg","imageCredit":{"artist":"Martin Wessely","link":"http://www.resplashed.com/photographer/martin_wessely/"},"imageHash":"2b1d5b5ef4b37b4f5d0dccade1b69987","imageUrl":"http://www.resplashed.com/img/400_002e1ecb8bd2.jpg","itemType":"mug","manufacturer":"OHara-Group","name":"Handcrafted Trees Mug","price":10.99,"productImg":"mug-400_002e1ecb8bd2.jpg","slug":"Handcrafted-Trees-Mug","tags":["Trees"]}
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"0f6e3ac4-360a-43f1-a92f-a99285b9bfcd"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1686752099801,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"test-inventory2":{"0":2}}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"60f5933f-85d3-4135-bc9f-f20f13af463b"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1686752058281,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}
\ No newline at end of file
v1
{"path":"file:///D:/Work/events/inventory-process-api/src/main/resources/data/194409539/inventory_sample.json","timestamp":1686752049545,"batchId":0}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment