Commit 14bea66c authored by Krishnakanth Balla's avatar Krishnakanth Balla

Products with status as "Shipped" to be persisted in MongoDB

26052023
parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="spark-poc-product" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"53f9baec-b5ed-4b5f-8d2f-d6fea4da9885"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685089661036,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"product-info-topic":{"0":8}}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark-poc-product</artifactId>
<version>1.0-SNAPSHOT</version>
<name>spark-poc-product</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache com.nisum.producer.SparkProducer SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache com.nisum.producer.SparkProducer Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- Kafka Streaming -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
[{"productId": "P2011","productName": "Mobile","productPrice": 1000.00,"deliveryStatus": "Purchased","timestamp": "20230524120000"},{"productId": "P2001","productName": "Laptop","productPrice": 1100.00,"deliveryStatus": "Shipped","timestamp": "20230520110000"},{"productId": "P2012","productName": "Mobile Case","productPrice": 500.00,"deliveryStatus": "Pending","timestamp": "20230524120000"},{"productId": "P2013","productName": "80W Adapter","productPrice": 499.00,"deliveryStatus": "Pending","timestamp": "20230524120000"}]
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
package org.example;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import java.util.HashMap;
import java.util.Map;
public class KafkaMessageProducer {
public static void main(String[] args) {
try {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("Produce Product Info to Kafka Topic")
.getOrCreate();
Dataset<Row> productInfoDf = spark.read()
.format("json")
.option("header", true )
.option("multiline", "true")
.load("src/data/product-info.json");
productInfoDf.printSchema();
Map<String, String> kafkaConfigMap = new HashMap<String, String>();
kafkaConfigMap.put("kafka.bootstrap.servers", "127.0.0.1:9092");
kafkaConfigMap.put("topic", "product-info-topic");
productInfoDf
.select(
functions.to_json(
functions.struct(
productInfoDf.col("deliveryStatus"),
productInfoDf.col("productId"),
productInfoDf.col("productName"),
productInfoDf.col("productPrice"),
productInfoDf.col("timestamp")
)
)
.alias("value")
).write()
.options(kafkaConfigMap)
.format("kafka")
.save();
} catch (Exception e) {
e.printStackTrace();
}
}
}
package org.example;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class ProductInfo implements Serializable {
String productId;
String productName;
double productPrice;
String deliveryStatus;
String timestamp;
}
package org.example;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;
@Slf4j
public class SparkDataProcessor {
public static void main(String[] args) {
try {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("Structural Streaming")
.getOrCreate();
Dataset<Row> fromKafka = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "product-info-topic")
.option("kafka.consumer.group.id", "localStream")
.option("failOnDataLoss", "false")
.option("startingOffsets", "earliest")
.load();
StructType schema = Encoders.bean(ProductInfo.class).schema();
Dataset<Row> dataset = fromKafka.selectExpr("cast(value as String) as message")
.select(functions.from_json(functions.col("message"), schema).as("data"))
.select("data.*")
.repartition(100);
Dataset<Row> filteredDataset = dataset.filter(functions.col("deliveryStatus").equalTo("Shipped"));
// log.info("Total rows = " + filteredDataset.count() );
Dataset<ProductInfo> ds = filteredDataset.as(Encoders.bean(ProductInfo.class));
// StreamingQuery query1 = filteredDataset.writeStream().format("console").outputMode(OutputMode.Update()).start();
StreamingQuery query = ds.writeStream().outputMode(OutputMode.Append())
.option("checkpointLocation", "checkpointlocaion/streamingjob")
.foreach(new ForeachWriter<ProductInfo>() {
private MongoClient mongoClient;
private MongoDatabase database;
private MongoCollection<Document> collection;
@Override
public boolean open(long partitionId, long epochId) {
mongoClient = MongoClients.create("mongodb://localhost:27017");
database = mongoClient.getDatabase("UserDB");
collection = database.getCollection("products");
return true;
}
@Override
public void process(ProductInfo productInfo) {
// Create a document with the object data
Document document = new Document();
document.append("productId", productInfo.getProductId());
document.append("productName", productInfo.getProductName());
document.append("productPrice", productInfo.getProductPrice());
document.append("deliveryStatus", productInfo.getDeliveryStatus());
document.append("timestamp", productInfo.getTimestamp());
// Insert the document into the collection
collection.insertOne(document);
}
@Override
public void close(Throwable errorOrNull) {
// Close the MongoDB connection
mongoClient.close();
}
}).start();
query.awaitTermination();
/*
StreamingQuery query = ds.writeStream()
.outputMode(OutputMode.Append())
.option("checkpointLocation", "checkpointlocaion/streamingjob")
.foreach(new ForeachWriter<ProductInfo>() {
private MongoCollection<Document> productInfoCollection;
private MongoClient client;
private MongoDatabase mongoDatabase;
@Override
public boolean open(long partitionId, long epochId) {
client = MongoClients.create("mongodb://localhost:27017");
mongoDatabase = client.getDatabase("test");
productInfoCollection = mongoDatabase.getCollection("product-info");
return true;
}
@Override
public void process(ProductInfo value) {
Document document = new Document();
document.append("productId", value.getProductId());
document.append("productName", value.getProductName());
document.append("productPrice", value.getProductPrice());
document.append("deliveryStatus", value.getDeliveryStatus());
document.append("timestamp", value.getTimestamp());
productInfoCollection.insertOne(document);
}
@Override
public void close(Throwable errorOrNull) {
client.close();
}
}).start();
*/
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
package org.example;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
/**
* Unit test for simple App.
*/
public class AppTest
{
/**
* Rigorous Test :-)
*/
@Test
public void shouldAnswerWithTrue()
{
assertTrue( true );
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment