Commit 95184c6c authored by Krishnakanth Balla's avatar Krishnakanth Balla

Added validations while updating data to mongodb

20230529
parent 3d6c66bd
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685337385334,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"product-info-topic":{"0":24}}
\ No newline at end of file
[{"productId": "P2011","productName": "Mobile","productPrice": 1000.00,"deliveryStatus": "Purchased","timestamp": "20230524120000"},{"productId": "P2001","productName": "Laptop","productPrice": 1100.00,"deliveryStatus": "Shipped","timestamp": "20230520110000"},{"productId": "P2012","productName": "Mobile Case","productPrice": 500.00,"deliveryStatus": "Pending","timestamp": "20230524120000"},{"productId": "P2013","productName": "80W Adapter","productPrice": 499.00,"deliveryStatus": "Pending","timestamp": "20230524120000"}]
\ No newline at end of file
[
{"productId": "P3011","productName": "Mobile","productPrice": 1000.00,"deliveryStatus": "Purchased","timestamp": "20230524120000"},
{"productId": "P3001","productName": "Laptop","productPrice": 1100.00,"deliveryStatus": "Shipped","timestamp": "20230520110000"},
{"productId": "P3012","productName": "Mobile Case","productPrice": 500.00,"deliveryStatus": "Pending","timestamp": "20230524120000"},
{"productId": "P3013","productName": "80W Adapter","productPrice": 499.00,"deliveryStatus": "Pending","timestamp": "20230524120000"},
{"productId": "P4011","productName": "Mobile","productPrice": 1000.00,"deliveryStatus": "Shipped","timestamp": "20230524120000"},
{"productId": "P4001","productName": "Laptop","productPrice": 1100.00,"deliveryStatus": "Shipped","timestamp": "20230520110000"},
{"productId": "P4012","productName": "Mobile Case","productPrice": 500.00,"deliveryStatus": "Shipped","timestamp": "20230524120000"},
{"productId": "P4013","productName": "80W Adapter","productPrice": 499.00,"deliveryStatus": "Shipped","timestamp": "20230524120000"},
{"productId": "P5011","productName": "Mobile","productPrice": 1000.00,"deliveryStatus": "Purchased","timestamp": "20230524120000"},
{"productId": "P5001","productName": "Laptop","productPrice": 1100.00,"deliveryStatus": "Shipped","timestamp": "20230520110000"},
{"productId": "P5012","productName": "Mobile Case","productPrice": 500.00,"deliveryStatus": "Pending","timestamp": "20230524120000"},
{"productId": "P5013","productName": "80W Adapter","productPrice": 499.00,"deliveryStatus": "Pending","timestamp": "20230524120000"},
{"productId": "P6011","productName": "Mobile","productPrice": 1000.00,"deliveryStatus": "Purchased","timestamp": "20230524120000"},
{"productId": "P6001","productName": "Laptop","productPrice": 1100.00,"deliveryStatus": "Shipped","timestamp": "20230520110000"},
{"productId": "P6012","productName": "Mobile Case","productPrice": 500.00,"deliveryStatus": "Pending","timestamp": "20230524120000"},
{"productId": "P6013","productName": "80W Adapter","productPrice": 499.00,"deliveryStatus": "Pending","timestamp": "20230524120000"}
]
\ No newline at end of file
......@@ -20,7 +20,7 @@ public class SparkDataProcessor {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("Structural Streaming")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/test.products-info")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/test.spark-product-info")
.getOrCreate();
Dataset<Row> fromKafka = spark.readStream()
......@@ -32,7 +32,6 @@ public class SparkDataProcessor {
.option("startingOffsets", "earliest")
.load();
StructType schema = Encoders.bean(ProductInfo.class).schema();
Dataset<Row> dataset = fromKafka.selectExpr("cast(value as String) as message")
......@@ -44,8 +43,6 @@ public class SparkDataProcessor {
Dataset<ProductInfo> ds = filteredDataset.as(Encoders.bean(ProductInfo.class));
// StreamingQuery query1 = filteredDataset.writeStream().format("console").outputMode(OutputMode.Update()).start();
StreamingQuery query = ds.writeStream().outputMode(OutputMode.Append())
.option("checkpointLocation", "checkpointlocaion/streamingjob")
.foreach(new ForeachWriter<ProductInfo>() {
......@@ -57,22 +54,31 @@ public class SparkDataProcessor {
public boolean open(long partitionId, long epochId) {
mongoClient = MongoClients.create("mongodb://localhost:27017");
database = mongoClient.getDatabase("test");
collection = database.getCollection("products-info");
collection = database.getCollection("spark-product-info");
return true;
}
@Override
public void process(ProductInfo productInfo) {
// Create a document with the object data
Document document = new Document();
document.append("productId", productInfo.getProductId());
document.append("productName", productInfo.getProductName());
document.append("productPrice", productInfo.getProductPrice());
document.append("deliveryStatus", productInfo.getDeliveryStatus());
document.append("timestamp", productInfo.getTimestamp());
// Insert the document into the collection
collection.insertOne(document);
Document query = new Document();
query.append("productId", productInfo.getProductId());
Document product = collection.find(query).first();
if( (product == null) || (product.isEmpty()) ) {
// Create a document with the object data
Document document = new Document();
document.append("productId", productInfo.getProductId());
document.append("productName", productInfo.getProductName());
document.append("productPrice", productInfo.getProductPrice());
document.append("deliveryStatus", productInfo.getDeliveryStatus());
document.append("timestamp", productInfo.getTimestamp());
// Insert the document into the collection
collection.insertOne(document);
}
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment