Commit 5a1a8e68 authored by karthik's avatar karthik

Intial commit of spark poc

parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="SparkTest" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="repository.jfrog.deployment" />
<option name="name" value="Safeway JFrog Repository" />
<option name="url" value="https://artifactory.albertsons.com/artifactory/digital-artifactory-cache/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" default="true" project-jdk-name="11" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SparkTest</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache SparkMongoProducer SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache SparkMongoProducer Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- Kafka Streaming -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.1</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
</project>
\ No newline at end of file
public class ProductInfo {
private String productId;
private String productName;
private String productPrice;
private String deliveryStatus;
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public String getProductPrice() {
return productPrice;
}
public void setProductPrice(String productPrice) {
this.productPrice = productPrice;
}
public String getDeliveryStatus() {
return deliveryStatus;
}
public void setDeliveryStatus(String deliveryStatus) {
this.deliveryStatus = deliveryStatus;
}
}
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;
public class SparkCSVConsumer {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
//Create and get Spark session
SparkSession spark = getSparkSession();
//Read data from Kafka
Dataset<Row> rawJsonDataFrame = loadDatafromKafka(spark);
Dataset<Row> df = convertingJsonDatasetToSchemaDataset(rawJsonDataFrame);
//Filtering dataset based on visa cardType
Dataset<Row> filteredDataFrame = df.filter(functions.col("deliveryStatus").equalTo("Shipped"));
//Converting DataFrame to Dataset
Dataset<ProductInfo> productInfoDataset = filteredDataFrame.as(Encoders.bean(ProductInfo.class));
//Writing data to mongo db
writeDataToMongoDB(productInfoDataset);
}
private static Dataset<Row> loadDatafromKafka(SparkSession spark) {
Dataset<Row> rawDataFrame = spark
.readStream()
.format("kafka")
.options(constructKafkaConfigMap())
.load();
return rawDataFrame;
}
private static Map<String, String> constructKafkaConfigMap() {
Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", "localhost:9092");
kafkaConfigMap.put("subscribe", "ProductInfoTopic");
kafkaConfigMap.put("startingOffsets", "earliest");
kafkaConfigMap.put("failOnDataLoss", "true");
kafkaConfigMap.put("kafka.con.group.id", "local");
return kafkaConfigMap;
}
private static SparkSession getSparkSession() {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("SparkConsumer")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/userdb.users_info")
.getOrCreate();
return spark;
}
private static Dataset<Row> convertingJsonDatasetToSchemaDataset(Dataset<Row> rawItemDF) {
StructType structType = Encoders.bean(ProductInfo.class).schema();
Dataset<Row> df = rawItemDF
.selectExpr("CAST(value as String) as message")
.select(functions.from_json(functions.col("message"), structType).as("data"))
.select("data.*");
return df;
}
private static void writeDataToMongoDB(Dataset<ProductInfo> ds)
throws TimeoutException, StreamingQueryException {
StreamingQuery query = ds.writeStream().outputMode(OutputMode.Append())
.option("checkpointLocation", "checkpointlocaion/streamingjob")
.trigger(Trigger.Continuous("10 second"))
.foreach(new ForeachWriter<ProductInfo>() {
private MongoClient mongoClient;
private MongoDatabase database;
private MongoCollection<Document> collection;
@Override
public boolean open(long partitionId, long epochId) {
mongoClient = MongoClients.create("mongodb://localhost:27017");
database = mongoClient.getDatabase("userdb");
collection = database.getCollection("productinfo");
return true;
}
@Override
public void process(ProductInfo productInfo) {
Document document = new Document();
document.append("productId", productInfo.getProductId());
document.append("productName", productInfo.getProductName());
document.append("productPrice", productInfo.getProductPrice());
document.append("deliveryStatus", productInfo.getDeliveryStatus());
// Insert the document into the collection
collection.insertOne(document);
}
@Override
public void close(Throwable errorOrNull) {
// Close the MongoDB connection
mongoClient.close();
}
}).start();
query.awaitTermination();
}
}
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.storage.StorageLevel;
public class SparkCSVProducer {
public static void main(String[] args){
//Create and get Spark Session
SparkSession spark = getSparkSession();
//Load data from CSV
Dataset<Row> csvDataFrame = loadDataFromCSV(spark);
//Convert dataFrame to Json
Dataset<Row> productInfoJson = convertToJsonDataFrame(csvDataFrame);
//publish product data to Kafka
publishToKaka(productInfoJson);
}
private static Dataset<Row> loadDataFromCSV(SparkSession spark) {
Dataset<Row> df = spark.read()
.format("csv")
.option("header", "true")
.load("src/main/resources/data.csv");
df.show();
return df;
}
private static SparkSession getSparkSession() {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("SparkCSVProducer")
.getOrCreate();
return spark;
}
private static Dataset<Row> convertToJsonDataFrame(Dataset<Row> productInfoDataFrame) {
Dataset<Row> productInfoJson = productInfoDataFrame
.withColumn("value", functions.to_json(functions.struct(functions.col("productId"),
functions.col("productName"), functions.col("productPrice"),
functions.col("deliveryStatus")))).alias("value").select("value")
.persist(StorageLevel.MEMORY_AND_DISK());
productInfoJson.show(false);
return productInfoJson;
}
private static void publishToKaka(Dataset<Row> userCardInfoJson) {
userCardInfoJson
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "ProductInfoTopic").save();
}
}
productId,productName,productPrice,deliveryStatus
P1001,Mobile,1000.00,Puchased
P1002,Laptop,1000.00,Shipped
P1003,Watch,1000.00,Puchased
P1004,Desktop,1000.00,Shipped
P1005,SmartTV,1000.00,Puchased
P1006,Google,1000.00,Shipped
P1007,Pixcel,1000.00,Puchased
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment