chore: Spark Data accumulation while streaming

parent 746c39fd
...@@ -6,11 +6,13 @@ import lombok.Builder; ...@@ -6,11 +6,13 @@ import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data @Data
@Builder @Builder
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
public class Product { public class Product implements Serializable {
private String id; private String id;
private String name; private String name;
......
...@@ -6,11 +6,13 @@ import lombok.Builder; ...@@ -6,11 +6,13 @@ import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data @Data
@Builder @Builder
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
public class ProductOffers { public class ProductOffers implements Serializable {
private String id; private String id;
private String name; private String name;
......
package com.nisum.product.service; package com.nisum.product.service;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
import com.nisum.product.dto.FileUploadAttributes; import com.nisum.product.dto.FileUploadAttributes;
import com.nisum.product.dto.KafkaPublisherResponse; import com.nisum.product.dto.KafkaPublisherResponse;
import com.nisum.product.dto.ProductOffers; import com.nisum.product.dto.ProductOffers;
...@@ -12,6 +18,10 @@ import org.apache.spark.sql.streaming.StreamingQuery; ...@@ -12,6 +18,10 @@ import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger; import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.*; import org.apache.spark.sql.types.*;
import org.apache.spark.util.CollectionAccumulator;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.codec.multipart.FilePart; import org.springframework.http.codec.multipart.FilePart;
...@@ -20,6 +30,7 @@ import reactor.core.publisher.Flux; ...@@ -20,6 +30,7 @@ import reactor.core.publisher.Flux;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
...@@ -27,7 +38,7 @@ import java.util.concurrent.TimeoutException; ...@@ -27,7 +38,7 @@ import java.util.concurrent.TimeoutException;
@Slf4j @Slf4j
@Service @Service
public class ProductsOfferService { public class ProductsOfferService implements Serializable {
@Autowired @Autowired
...@@ -83,7 +94,7 @@ public class ProductsOfferService { ...@@ -83,7 +94,7 @@ public class ProductsOfferService {
private KafkaPublisherResponse readAndPublishDataToKafka(String batchId) { private KafkaPublisherResponse readAndPublishDataToKafka(String batchId) {
if (StringUtils.isNotEmpty(batchId)) { if (StringUtils.isNotEmpty(batchId)) {
String topicName = "test-product1"; String topicName = "test-product2";
try { try {
populateProductsOfferPriceUsingDFStruct(batchId, topicName); populateProductsOfferPriceUsingDFStruct(batchId, topicName);
return KafkaPublisherResponse.builder() return KafkaPublisherResponse.builder()
...@@ -167,7 +178,7 @@ public class ProductsOfferService { ...@@ -167,7 +178,7 @@ public class ProductsOfferService {
.readStream() .readStream()
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "HYD-LAP-00484.corp.nisum.com:9092") .option("kafka.bootstrap.servers", "HYD-LAP-00484.corp.nisum.com:9092")
.option("subscribe", "test-product1") .option("subscribe", "test-product2")
.option("startingOffsets", "earliest") .option("startingOffsets", "earliest")
.option("startingOffsets", "earliest") .option("startingOffsets", "earliest")
.option("failOnDataLoss", "false"). .option("failOnDataLoss", "false").
...@@ -196,9 +207,52 @@ public class ProductsOfferService { ...@@ -196,9 +207,52 @@ public class ProductsOfferService {
.outputMode(OutputMode.Append()) .outputMode(OutputMode.Append())
.start(); .start();
CollectionAccumulator<ProductOffers> offerProductList=sparkSession.sparkContext().collectionAccumulator("offerProductList");
StreamingQuery query=offersDataset
.writeStream()
.trigger(Trigger.Once())
.foreach(new ForeachWriter<ProductOffers>() {
@Override
public boolean open(long partitionId, long epochId) {
return true;
}
@Override
public void process(ProductOffers value) {
offerProductList.add(value);
}
@Override
public void close(Throwable errorOrNull) {
}
}).start();
query.awaitTermination();
streamingQuery.awaitTermination(); streamingQuery.awaitTermination();
return Flux.empty(); try {
log.info("Offer Product List : {}",offerProductList);
log.info("Awaiting main thread ......");
Thread.sleep(2000);
log.info("Resuming main thread ......");
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.fromIterable(offerProductList.value());
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment