chore: Spark streaming data into Mongo DB.

parent 59c5745f
package com.nisum.inventory.repository;
import com.nisum.inventory.collection.ProcessedInventory;
import com.nisum.inventory.dto.RawInventory;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.factory.Mappers;
@Mapper
public interface InventoryMapper {
InventoryMapper INSTANCE = Mappers.getMapper(InventoryMapper.class);
@Mapping(target = "id", source = "id")
ProcessedInventory dtoToEntity(RawInventory rawInventory, String id);
}
package com.nisum.inventory.service; package com.nisum.inventory.service;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
import com.nisum.inventory.collection.ProcessedInventory;
import com.nisum.inventory.dto.*; import com.nisum.inventory.dto.*;
import com.nisum.inventory.repository.InventoryMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -11,8 +19,12 @@ import org.apache.spark.sql.streaming.StreamingQuery; ...@@ -11,8 +19,12 @@ import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger; import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.*; import org.apache.spark.sql.types.*;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.http.codec.multipart.FilePart; import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
...@@ -36,6 +48,7 @@ public class InventoryService implements Serializable { ...@@ -36,6 +48,7 @@ public class InventoryService implements Serializable {
@Autowired @Autowired
private StructType schema; private StructType schema;
public Flux<KafkaPublisherResponse> saveAndProcess(Flux<FilePart> filePartFlux) { public Flux<KafkaPublisherResponse> saveAndProcess(Flux<FilePart> filePartFlux) {
return filePartFlux.flatMap(filePart -> return filePartFlux.flatMap(filePart ->
...@@ -156,18 +169,57 @@ public class InventoryService implements Serializable { ...@@ -156,18 +169,57 @@ public class InventoryService implements Serializable {
df.printSchema(); df.printSchema();
Dataset<Row> inventoryList=df Dataset<RawInventory> inventoryList=df
.filter(functions.col("key").equalTo(batchId)) .filter(functions.col("key").equalTo(batchId))
.selectExpr("CAST(value as String) as message") .selectExpr("CAST(value as String) as message")
.select(functions.from_json(functions.col("message"),schema ).as("data")) .select(functions.from_json(functions.col("message"),schema ).as("data"))
.select(functions.explode(functions.col("data.inventoryList")).as("list")) .select(functions.explode(functions.col("data.inventoryList")).as("list"))
.filter(functions.col("list.name").equalTo("Handcrafted Trees Mug")) .filter(
.select("list.*"); functions.col("list.price").leq(15)
.and(functions.col("list.itemType").contains("mug"))
.and(functions.col("list.name").contains("Coffee"))
)
.select("list.*")
.as(Encoders.bean(RawInventory.class));
StreamingQuery streamingQuery=inventoryList
.writeStream()
.trigger(Trigger.Once())
.foreach(new ForeachWriter<RawInventory>() {
private MongoCollection<ProcessedInventory> processedInventoryMongoCollection;
private MongoClient client;
@Override
public boolean open(long partitionId, long epochId) {
String uri = "mongodb://127.0.0.1:27017";
client = MongoClients.create(uri);
CodecRegistry pojoCodecRegistry = CodecRegistries.fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build()));
// Accessing the database
MongoDatabase mongoDatabase = client.getDatabase("inventorydb").withCodecRegistry(pojoCodecRegistry);
processedInventoryMongoCollection = mongoDatabase.getCollection("inventory",ProcessedInventory.class);
return true;
}
StreamingQuery streamingQuery=inventoryList @Override
public void process(RawInventory value) {
InsertOneResult insertOneResult=processedInventoryMongoCollection.insertOne(InventoryMapper.INSTANCE.dtoToEntity(value,UUID.randomUUID().toString()));
log.info("Inserted Result : {} ",insertOneResult);
}
@Override
public void close(Throwable errorOrNull) {
client.close();
}
}).start();
/*StreamingQuery streamingQuery=inventoryList
.writeStream() .writeStream()
.trigger(Trigger.Once()) .trigger(Trigger.Once())
.format("json") .format("json")
...@@ -175,7 +227,7 @@ public class InventoryService implements Serializable { ...@@ -175,7 +227,7 @@ public class InventoryService implements Serializable {
.option("checkpointLocation", "src/main/resources/data/checkpointLocation/StreamingJob") .option("checkpointLocation", "src/main/resources/data/checkpointLocation/StreamingJob")
.option("path","src/main/resources/data/"+batchId) .option("path","src/main/resources/data/"+batchId)
.outputMode(OutputMode.Append()) .outputMode(OutputMode.Append())
.start(); .start();*/
streamingQuery.awaitTermination(); streamingQuery.awaitTermination();
...@@ -185,6 +237,4 @@ public class InventoryService implements Serializable { ...@@ -185,6 +237,4 @@ public class InventoryService implements Serializable {
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment