Initial commit

parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="spring-file-streaming-with-mongo" />
</profile>
</annotationProcessing>
</component>
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_OVERRIDE">
<module name="spring-file-streaming-with-mongo" options="-parameters" />
</option>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" default="true" project-jdk-name="JDK_11" project-jdk-type="JavaSDK" />
</project>
\ No newline at end of file
**Case # 2: Spark Mongo Connector -> Source (File) -> Kafka -> Spark -> MongoDB**<br />
**Story :- To Persist the list of products with timestamp which have been shipped in MongoDB.**<br />
**Algorithm:<br />**
a.) Read data from multiple file(s) which are in different format or from single file (.csv (or) .txt (or) .json) and then publish the data into Kafka Topic.<br />
b.) Spark Structured Streaming as a consumer that connects to this kafka topic with the configured interval (10 secs) and sends the data to Spark job.<br />
c.) Spark job then transforms the data and persist the output (sink) into MongoDB.<br />
**Requirement :- List of Products with status as "Shipped" to be persisted in MongoDB - Real Time Processing<br />**
1.) The source file should contain product information with different status<br />
For Eg:- <br />
[productId : P1001, productName : Mobile, productPrice : 1000.00, deliveryStatus : Puchased, timestamp] <br />
[productId : P1001, productName : Mobile, productPrice : 1000.00, deliveryStatus : Shipped, timestamp] <br />
[productId : P1001, productName : Mobile, productPrice : 1000.00, deliveryStatus : Pending, timestamp] <br />
2.) Then, Spark job should filter the product(s) data based on the status - Shipped.<br />
3.) Once Spark job filters the prodcuts data then persist the output into MongoDB collection named "**product_list_status_shipped**"<br />
[productId : P1001, productName : Mobile, productPrice : 1000.00, deliveryStatus : Shipped, timestamp]<br />
(com.mongodb.spark.api.java.MongoSpark helper to establish the connectivity from spark to MongoDB)<br />
**Coding Steps:-**<br />
1) Initialize a Spark session object - the primary entry point for Spark application<br />
SparkSession spark = SparkSession.builder()...
2) Read CSV file<br />
Dataset<Row> csvData = spark.read()...
3) Read JSON file<br />
Dataset<Row> jsonData = spark.read()
4) Combine data from both the files (JSON and CSV)<br />
Dataset<Row> mergedStream = csvData.union(jsonData)
5) Start streaming query to write data to Kafka topic<br />
mergedStream.selectExpr(...
6) Read data streams as a streaming DataFrame<br />
Dataset<Row> dataFrame = spark.readStream().format("kafka")...
7) Filtering the fields from dataframe to fetch required data<br />
Dataset<Row> readWrite = dataFrame.selectExpr("CAST(value AS STRING) AS json")
.select(functions.from_json(functions.col("json"), schema)
.as("data"))
.select("data.*");
8) Filter the DataFrame based on the status and timestamp<br />
Dataset<Row> filteredData = readWrite.filter(readWrite.col("deliveryStatus")
.equalTo("Shipped"))
.filter(readWrite.col("timestamp")
.geq(startTime)
.and(dataFrame.col("timestamp")
.leq(endTime)));
9) Persist the filtered data to MongoDB<br />
StreamingQuery query = filteredData.writeStream()...<br />
10) Wait for the streaming query to finish<br />
query.awaitTermination();<br />
11) Catch if any exceptions occur<br />
e.printStackTrace();
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.11</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.spring.recativekafka.mongo</groupId>
<artifactId>spring-file-streaming-with-mongo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-file-streaming-with-mongo</name>
<description>Spring Recative Kafka with Mongo</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.nisum.spark.dem.Consumer;
import com.nisum.spark.dem.ProductInfo.ProductsDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;
/**
* @author Vmuthav
*/
@Slf4j
public class StreamingConsumer {
public static final String MONGODB_DOCUMENT_COLLECTION = "mongodb://127.0.0.1/ProductsDB.product_list_status_shipped";
private static String kafkaBootstrapServers = "localhost:9092";
private static String kafkaTopic = "TP.WEBFLUX.KAFKA1";
public static void main(String[] args) {
log.info("Entered into main method to start StreamingConsumer: ");
SparkSession spark = getSparkSession();
Dataset<Row> readData = consumeDatafromKafka(spark);
Dataset<Row> filteredDataFrame = filteredData();
persistConsumedDataToMongoDB();
}
private static SparkSession getSparkSession() {
log.info("Create and get Spark session: ");
SparkSession spark = SparkSession.builder()
.appName("DataStreamingWithSparkMongoDB")
.master("local[*]")
.config("spark.mongodb.input.uri", MONGODB_DOCUMENT_COLLECTION)
.config("spark.mongodb.output.uri", MONGODB_DOCUMENT_COLLECTION)
.getOrCreate();
return spark;
}
private static Dataset<Row> consumeDatafromKafka(SparkSession spark) {
log.info("Read data streams as a streaming DataFrame: ");
Dataset<Row> dataFrame = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 100)
.load();
log.debug("Read data streams: "+dataFrame.toString());
return dataFrame;
}
private static Dataset<Row> filteredData() {
log.info("Filtering dataframe to fetch required data: ");
StructType schema = Encoders.bean(ProductsDto.class)
.schema();
Dataset<Row> readWrite = consumeDatafromKafka(getSparkSession()).selectExpr("CAST(value AS STRING) AS json")
.select(functions.from_json(functions.col("json"), schema)
.as("data"))
.select("data.*");
//Dataset<Row> filteredData = readWrite.filter(readWrite.col("deliveryStatus").equalTo("Shipped"));
String startTime = "2023-05-26T10:24:00.000Z";
String endTime = "2023-05-26T12:24:00.000Z";
log.info("Filter the DataFrame based on the status and timeStamp: ");
Dataset<Row> filteredData = readWrite.filter(readWrite.col("deliveryStatus")
.equalTo("Shipped"))
.filter(readWrite.col("timeStamp")
.geq(startTime)
.and(readWrite.col("timeStamp")
.leq(endTime)));
return filteredData;
}
private static void persistConsumedDataToMongoDB() {
log.info("Persist the filtered data to MongoDB: ");
StreamingQuery query = null;
try {
query = filteredData().writeStream()
.foreachBatch((Dataset<Row> batch, Long batchId) -> {
log.info("Write each batch to MongoDB: ");
batch.write()
.format("com.mongodb.spark.sql.DefaultSource")
.mode("append") // Specify the write mode as needed (e.g., append, overwrite)
.option("database", "ProductsDB") // MongoDB database name
.option("collection", "product_list_status_shipped") // MongoDB collection name
.save();
})
.start();
query.awaitTermination();
} catch (TimeoutException | StreamingQueryException exception) {
exception.printStackTrace();
}
}
}
package com.nisum.spark.dem.Producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* @author Vmuthav
*/
@Slf4j
public class StreamingProducer {
private static String csvFilePath = "src/main/resources/dataformats/FileToKafka.csv";
private static String jsonFilePath = "src/main/resources/dataformats/FileToKafka.json";
private static String kafkaBootstrapServers = "localhost:9092";
private static String kafkaTopic = "TP.WEBFLUX.KAFKA1";
public static void main(String[] args) {
log.info("Entered into main method to start StreamingProducer: ");
SparkSession spark = getSparkSession();
Dataset<Row> mergedStream = getCombinedData(spark);
publishToKafka(mergedStream);
spark.close();
}
private static SparkSession getSparkSession() {
log.info("Initialize SparkSession: ");
SparkSession spark = SparkSession.builder()
.appName("DataStreamingWithSparkKafka")
.master("local[*]")
.getOrCreate();
return spark;
}
private static Dataset<Row> getCombinedData(SparkSession spark){
log.info("Read CSV file: ");
Dataset<Row> csvData = spark.read()
.format("csv")
.option("header", "true")
.load(csvFilePath); // Replace with your CSV file path
log.info("Read JSON file: ");
Dataset<Row> jsonData = spark.read()
.format("json")
.option("header", "true")
.load(jsonFilePath); // Replace with your JSON file path
log.info("Combine data from both the files (JSON and CSV): ");
Dataset<Row> mergedStream = csvData.select("productId", "productName", "productPrice", "deliveryStatus", "timestamp").union(jsonData.select("productId", "productName", "productPrice", "deliveryStatus", "timestamp"));
log.debug("mergedStream: ", mergedStream.toJSON());
mergedStream.show();
return mergedStream;
}
private static void publishToKafka(Dataset<Row> mergedStream){
log.debug("Start streaming query to write data to Kafka topic: "+kafkaTopic);
mergedStream.selectExpr("CAST(productId AS STRING) AS key", "to_json(struct(*)) AS value")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("topic", kafkaTopic)
.save();
}
}
package com.nisum.spark.dem.ProductInfo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
/**
* @author Vmuthav
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "product_list_status_shipped")
public class ProductsDto {
@Id
private String productId;
private String productName;
private String productPrice;
private String deliveryStatus;
private String timeStamp;
}
csv.file.Path=src/main/resources/dataformats/FileToKafka.csv
json.file.path=src/main/resources/dataformats/FileToKafka.json
output.path=src/main/resources/output
kafka.bootstrap.servers=localhost:9092
kafka.streaming.topic=TP.WEBFLUX.KAFKA1
productId,productName,productPrice,deliveryStatus,timeStamp
P1001,Mobile,1000,Purchased,2023-05-26T10:24:00.000Z
P1001,Mobile,1000,Shipped,2023-05-26T11:24:00.000Z
P1001,Mobile,1000,Pending,2023-05-26T12:24:00.000Z
\ No newline at end of file
{"productId":"P1001","productName":"Mobile","productPrice":1000,"deliveryStatus":"Purchased","timeStamp":"2023-05-26T10:24:00.000Z"}
{"productId":"P1001","productName":"Mobile","productPrice":1000,"deliveryStatus":"Shipped","timeStamp":"2023-05-26T11:24:00.000Z"}
{"productId":"P1001","productName":"Mobile","productPrice":1000,"deliveryStatus":"Pending","timeStamp":"2023-05-26T12:24:00.000Z"}
package com.nisum.spark.dem.Consumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import static org.junit.jupiter.api.Assertions.*;
class StreamingConsumerTest {
@BeforeEach
void setUp() {
}
@AfterEach
void tearDown() {
}
}
\ No newline at end of file
package com.nisum.spark.dem.Producer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class StreamingProducerTest {
@BeforeEach
void setUp() {
}
@AfterEach
void tearDown() {
}
@Test
void main() {
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment