Commit 3d6c66bd authored by Krishnakanth Balla's avatar Krishnakanth Balla

Initial Commit : Spark POC

parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="spark-poc" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"bb56ce99-58e6-4071-931b-4904868d58f4"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685334782622,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"product-info-topic":{"0":8}}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark-poc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache com.nisum.producer.SparkProducer SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache com.nisum.producer.SparkProducer Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- Kafka Streaming -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
[{"productId": "P2011","productName": "Mobile","productPrice": 1000.00,"deliveryStatus": "Purchased","timestamp": "20230524120000"},{"productId": "P2001","productName": "Laptop","productPrice": 1100.00,"deliveryStatus": "Shipped","timestamp": "20230520110000"},{"productId": "P2012","productName": "Mobile Case","productPrice": 500.00,"deliveryStatus": "Pending","timestamp": "20230524120000"},{"productId": "P2013","productName": "80W Adapter","productPrice": 499.00,"deliveryStatus": "Pending","timestamp": "20230524120000"}]
\ No newline at end of file
package io.nisum.spark.consumer;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import io.nisum.spark.entity.ProductInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;
@Slf4j
public class SparkDataProcessor {
public static void main(String[] args) {
try {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("Structural Streaming")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/test.products-info")
.getOrCreate();
Dataset<Row> fromKafka = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "product-info-topic")
.option("kafka.consumer.group.id", "localStream")
.option("failOnDataLoss", "false")
.option("startingOffsets", "earliest")
.load();
StructType schema = Encoders.bean(ProductInfo.class).schema();
Dataset<Row> dataset = fromKafka.selectExpr("cast(value as String) as message")
.select(functions.from_json(functions.col("message"), schema).as("data"))
.select("data.*")
.repartition(100);
Dataset<Row> filteredDataset = dataset.filter(functions.col("deliveryStatus").equalTo("Shipped"));
Dataset<ProductInfo> ds = filteredDataset.as(Encoders.bean(ProductInfo.class));
// StreamingQuery query1 = filteredDataset.writeStream().format("console").outputMode(OutputMode.Update()).start();
StreamingQuery query = ds.writeStream().outputMode(OutputMode.Append())
.option("checkpointLocation", "checkpointlocaion/streamingjob")
.foreach(new ForeachWriter<ProductInfo>() {
private MongoClient mongoClient;
private MongoDatabase database;
private MongoCollection<Document> collection;
@Override
public boolean open(long partitionId, long epochId) {
mongoClient = MongoClients.create("mongodb://localhost:27017");
database = mongoClient.getDatabase("test");
collection = database.getCollection("products-info");
return true;
}
@Override
public void process(ProductInfo productInfo) {
// Create a document with the object data
Document document = new Document();
document.append("productId", productInfo.getProductId());
document.append("productName", productInfo.getProductName());
document.append("productPrice", productInfo.getProductPrice());
document.append("deliveryStatus", productInfo.getDeliveryStatus());
document.append("timestamp", productInfo.getTimestamp());
// Insert the document into the collection
collection.insertOne(document);
}
@Override
public void close(Throwable errorOrNull) {
// Close the MongoDB connection
mongoClient.close();
}
}).start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
package io.nisum.spark.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class ProductInfo {
String productId;
String productName;
double productPrice;
String deliveryStatus;
String timestamp;
@Override
public String toString() {
return "ProductInfo{" +
"productId='" + productId + '\'' +
", productName='" + productName + '\'' +
", productPrice=" + productPrice +
", deliveryStatus='" + deliveryStatus + '\'' +
", timestamp='" + timestamp + '\'' +
'}';
}
}
package io.nisum.spark.producer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import java.util.HashMap;
import java.util.Map;
public class KafkaMessageProducer {
public static void main(String[] args) {
try {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("Produce Product Info to Kafka Topic")
.getOrCreate();
Dataset<Row> productInfoDf = spark.read()
.format("json")
.option("header", true )
.option("multiline", "true")
.load("src/data/product-info.json");
productInfoDf.printSchema();
Map<String, String> kafkaConfigMap = new HashMap<String, String>();
kafkaConfigMap.put("kafka.bootstrap.servers", "127.0.0.1:9092");
kafkaConfigMap.put("topic", "product-info-topic");
productInfoDf
.select(
functions.to_json(
functions.struct(
productInfoDf.col("deliveryStatus"),
productInfoDf.col("productId"),
productInfoDf.col("productName"),
productInfoDf.col("productPrice"),
productInfoDf.col("timestamp")
)
)
.alias("value")
).write()
.options(kafkaConfigMap)
.format("kafka")
.save();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment