Commit fc564bcc authored by Krishnakanth Balla's avatar Krishnakanth Balla

Initial Commit: Case#1

- As a ecommerce website analyst, To verify the list of products which have been shipped.
parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="spark-poc-case-1" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"03719800-9340-4e8e-90f1-c6f6444e245a"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685352858013,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"product-csv-topic-1":{"0":20}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685353733816,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"product-csv-topic-1":{"0":40}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685354061366,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"product-csv-topic-1":{"0":60}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685354599383,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"product-csv-topic-1":{"0":80}}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark-poc-case-1</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache com.nisum.producer.SparkProducer Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
Case # 1: Spark Streaming -> Source (File) -> Kafka -> Spark -> Sink (File)
Story :- As a ecommerce website analyst, To verify the list of products which have been shipped.
Algorithm:
a.) Read data from multiple file(s) which are in different format or from single file (.csv (or) .txt (or) .json) and then publish the data into Kafka Topic.
b.) Spark Structured Streaming as a consumer that connects to this kafka topic with the confgiured interval (10 secs) and sends the data to Spark job.
c.) Spark job then transforms the data and persist the output (sink) into another file (of type csv).
Requirement :- Product status Report - Real Time Processing
1.) The source file should contain product information with status
For Eg:-
productId,productName,productPrice,deliveryStatus,date
P1001,mobile,15000,ordered,10-05-2023
P1002,charger,25000,delivered,15-05-2023
P1003,cooker,35000,shipped,25-05-2023
P1004,tv,45000,shipped,25-05-2023
2.) Then, Spark job should filter the product(s) data based on the status - Shipped [within the time range in hours for eg : from 10 am to 12 pm, filtered products with the status "Shipped"]
3.) Once Spark job filters the prodcuts data within the time range then persist the output,
[productId : P1001, productName : Mobile, productPrice : 1000.00, deliveryStatus : Shipped, timestamp] to another file (.csv) in where this file cotains only
the prodcuts with the status as Shipped within the time range based on the timestamp)
\ No newline at end of file
productId,productName,productPrice,deliveryStatus,date
P4001,mobile,15000.00,ordered,10-05-2023
P4002,charger,25000.00,delivered,15-05-2023
P4003,cooker,35000.00,shipped,25-05-2023
P4004,tv,45000.00,shipped,25-05-2023
P4005,laptop,55000.00,delivered,25-05-2023
P4006,bottle,1500.00,ordered,12-05-2023
P4007,mouse,1000.00,delivered,15-05-2023
P4008,keyboard,500.00,shipped,25-05-2023
P4009,table,10000.00,shipped,22-05-2023
P4010,phone,12000.00,delivered,23-05-2023
P4011,shop,150000.00,ordered,16-05-2023
P4020,byke,75000.00,delivered,18-05-2023
P4021,car,20000.00,shipped,27-05-2023
P4022,cycle,33000.00,shipped,28-05-2023
P4023,bed,15000.00,delivered,26-05-2023
P4024,chairs,12000.00,ordered,15-05-2023
P4025,server,19555.00,delivered,18-05-2023
P4030,van,89200.00,shipped,25-05-2023
P4031,designing,14000.00,shipped,25-05-2023
P4032,interier,13000.00,delivered,25-05-2023
\ No newline at end of file
v1
{"path":"file:///D:/workplace/spark-poc-case-1/src/data/result/part-00000-1558008b-ca04-455d-897c-9a9556b85829-c000.csv","size":281,"isDir":false,"modificationTime":1685352862397,"blockReplication":1,"blockSize":33554432,"action":"add"}
\ No newline at end of file
v1
{"path":"file:///D:/workplace/spark-poc-case-1/src/data/result/part-00000-949979b6-6ad8-425b-b5da-522d98f01d50-c000.csv","size":24,"isDir":false,"modificationTime":1685353737713,"blockReplication":1,"blockSize":33554432,"action":"add"}
\ No newline at end of file
v1
{"path":"file:///D:/workplace/spark-poc-case-1/src/data/result/part-00000-6c71dcbd-8555-4fad-9213-aa1a9583d441-c000.csv","size":213,"isDir":false,"modificationTime":1685354065168,"blockReplication":1,"blockSize":33554432,"action":"add"}
\ No newline at end of file
v1
{"path":"file:///D:/workplace/spark-poc-case-1/src/data/result/part-00000-d64167f9-ba8d-4912-8036-3a7a2d7ef0f7-c000.csv","size":247,"isDir":false,"modificationTime":1685354603253,"blockReplication":1,"blockSize":33554432,"action":"add"}
\ No newline at end of file
25-05-2023,shipped,P1003,cooker,""
25-05-2023,shipped,P1004,tv,""
25-05-2023,shipped,P1008,keyboard,""
22-05-2023,shipped,P1009,table,""
27-05-2023,shipped,P1021,car,""
28-05-2023,shipped,P1022,cycle,""
25-05-2023,shipped,P1030,van,""
25-05-2023,shipped,P1031,designing,""
25-05-2023,shipped,P3003,cooker,""
25-05-2023,shipped,P3004,tv,""
25-05-2023,shipped,P3008,keyboard,""
22-05-2023,shipped,P3009,table,""
25-05-2023,shipped,P3030,van,""
25-05-2023,shipped,P3031,designing,""
25-05-2023,shipped,P4003,cooker,35000.00
25-05-2023,shipped,P4004,tv,45000.00
25-05-2023,shipped,P4008,keyboard,500.00
22-05-2023,shipped,P4009,table,10000.00
25-05-2023,shipped,P4030,van,89200.00
25-05-2023,shipped,P4031,designing,14000.00
package io.nisum.spark.consumer;
import io.nisum.spark.entity.ProductInfo;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;
public class StreamingConsumer {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("Data processing")
.getOrCreate();
Dataset<Row> rowDataset = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "product-csv-topic-1")
.option("kafka.consumer.group.id", "localStream")
.option("failOnDataLoss", "false")
.option("startingOffsets", "earliest")
.load();
rowDataset.printSchema();
StructType schema = Encoders.bean(ProductInfo.class).schema();
String startTime = "20-05-2023";
String endTime = "25-05-2023";
Dataset<Row> filteredSet = rowDataset
.selectExpr("cast(value as string) as data")
.select(functions.from_json(functions.col("data"), schema ).alias("jsonData") )
.select("jsonData.*")
.filter(functions.col("deliveryStatus").$eq$eq$eq("shipped") )
.filter(functions.col("date").geq(startTime).and(functions.col("date").leq(endTime)));
filteredSet.printSchema();
StreamingQuery query = filteredSet
.writeStream()
.outputMode(OutputMode.Append())
.format("csv")
.option("checkpointLocation", "checkpointlocaion/streamingjob")
.option("path", "src/data/result")
.start();
query.awaitTermination();
}
}
package io.nisum.spark.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class ProductInfo {
String productId;
String productName;
String productPrice;
String deliveryStatus;
String date;
}
package io.nisum.spark.producer;
import org.apache.spark.sql.*;
public class KafkaMessageProducer {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("Produce CSV Product Info to Kafka Topic")
.getOrCreate();
Dataset<Row> productInfo = spark.read()
.format("csv")
.option("header", true)
.option("multiline", "true")
.csv("src/data/product-info.csv");
productInfo.printSchema();
productInfo.show();
productInfo
.selectExpr( "cast(productId as string) as key", "to_json(struct(*)) as value")
.write()
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("topic", "product-csv-topic-1")
.option("header", true)
.format("kafka")
.save();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment