Initial commit

parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="SparkDemo" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="repository.jfrog.deployment" />
<option name="name" value="Safeway JFrog Repository" />
<option name="url" value="https://artifactory.albertsons.com/artifactory/digital-artifactory-cache/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" default="true" project-jdk-name="11" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.png" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>
\ No newline at end of file
Case # 3: MongoDB Spark Connector -> (MongoDB - Database#1) -> kafka -> Spark Streaming -> (MongoDB - Database#2)
--------
Story :- To persist the list of b2c user information with card details who have opted "VISA" cards for purchasing the products.
Algorithm:
a.) Read user information from the MongoDB (DB#1) collection named "users_info" and the collection named "users_cards_info"
b.) Join the two collections information - collection named "users_info" and the collection named "users_cards_info" and then publish the
information into kafka topic.
c.) Spark Structured Streaming as a consumer consumes the information from the kafka topic.
d.) Apply the transformation at the spark layer to filter the user data who are having only of type VISA cards.
e.) Via, Mongo Spark Helper, Filtered user data would be persisted into separate collection of different MongoDB (#2).
Requirement :-
List of b2c user with cards information to be retrieved from MongoDB collection(s) - from the (MongoDB - Database#1) collection named "users_info" and the collection named "users_cards_info". Then, Persist the output (user data who are having only of type VISA cards) into (MongoDB - Database#2 ) collection named - "users_cards_info_list"
Collection details:- (MongoDB - Database#1)
1.) Collection name: "users_info"
For Eg:-
[userId : P1001, userName : Raju, regMobileNo : 91111111111, gender : M]
[userId : P1002, userName : Deepak, regMobileNo : 92222222222, gender : M]
[userId : P1003, userName : Amar, regMobileNo : 93333333333, gender : M]
[userId : P1004, userName : Arun, regMobileNo : 94444444444, gender : M]
2.) Collection name: "users_cards_info"
For Eg:-
[userId : P1001, cardId : c001, cardType : Visa, cardNumber : 23456908710, cardStatus : A]
[userId : P1002, cardId : c002, cardType : Master, cardNumber : 23456908711, cardStatus : A]
[userId : P1003, cardId : c003, cardType : Visa, cardNumber : 23456908712, cardStatus : A]
[userId : P1004, cardId : c004, cardType : Rupay, cardNumber : 23456908730, cardStatus : A]
2.) Collection details:- (MongoDB - Database#2) [After Persist - Output)
For Eg:-
[userId : P1001, userName : Raju, regMobileNo : 91111111111, gender : M, cardType : Visa, cardNumber : 23456908710, cardStatus : A]
[userId : P1003, userName : Amar, regMobileNo : 93333333333, gender : M, cardType : Visa, cardNumber : 23456908712, cardStatus : A]
==============================================================================================================================================
Provided algorithm outlines the steps for achieving the desired result of persisting user information with VISA card details from MongoDB Database #1 to MongoDB Database #2 using Kafka and Spark Streaming. Here's a breakdown of each step:
a.) Read user information from MongoDB (DB#1) collection named "users_info" and the collection named "users_cards_info":
In this step, you retrieve user information from two collections in MongoDB Database #1. These collections are "users_info" and "users_cards_info". The user information likely includes details such as names, addresses, and other relevant data.
b.) Join the two collections' information and publish it into a Kafka topic:
After retrieving the user information from both collections, you perform a join operation to combine the relevant data. The purpose of this join is to associate the user information with their corresponding card details. Once the join is complete, you publish the combined information into a Kafka topic. This allows other systems or processes to consume the data from the Kafka topic.
c.) Spark Structured Streaming consumes the information from the Kafka topic:
Spark Structured Streaming, which is a real-time processing engine, acts as a consumer in this step. It consumes the data that was published to the Kafka topic in the previous step. Spark Structured Streaming provides the capability to process the data in a streaming fashion, allowing for real-time analysis and transformations.
d.) Apply transformations at the Spark layer to filter the user data with only VISA cards:
In this step, you apply transformations to the data received from the Kafka topic using Spark. The purpose is to filter out users who only have VISA cards. The filtering condition is likely based on a specific field or attribute in the user's card details. Once the filtering is applied, the resulting dataset will only contain users who meet the criteria of having VISA cards.
e.) Persist the filtered user data into a separate collection in MongoDB Database #2:
Finally, using the Mongo Spark Helper library, you persist the filtered user data into a separate collection in MongoDB Database #2. This separate collection will contain the user information of individuals who have opted for VISA cards for their purchases. The Mongo Spark Helper library provides a convenient way to interact with MongoDB from Spark and enables seamless data transfer between the two systems.
By following these steps, you can achieve the goal of persisting user information with VISA card details from MongoDB Database #1 to MongoDB Database #2, leveraging Kafka for data transportation and Spark Streaming for real-time processing and filtering.
code:
================
// Step 1: Create a SparkSession
val spark = SparkSession.builder()
.appName("MongoDB Spark Connector Example")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/DB1.users_info")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/DB2.filtered_users_info")
.getOrCreate()
// Step 2: Read user information from MongoDB collection "users_info"
val usersInfoDF = spark.read.format("mongo").load()
// Step 3: Read user card information from MongoDB collection "users_cards_info"
val usersCardsInfoDF = spark.read.format("mongo").option("collection", "users_cards_info").load()
// Step 4: Join the two collections based on a common key (e.g., user ID)
val joinedDF = usersInfoDF.join(usersCardsInfoDF, Seq("user_id"))
// Step 5: Publish the joined information into a Kafka topic
joinedDF
.select(to_json(struct("*")).alias("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "user_info_topic")
.save()
// Step 6: Spark Structured Streaming as a consumer
val kafkaDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user_info_topic")
.load()
// Step 7: Apply transformations to filter user data with VISA cards
val filteredDF = kafkaDF
.select(from_json(col("value").cast("string"), joinedDF.schema).alias("data"))
.select("data.*")
.filter(col("card_type") === "VISA")
// Step 8: Persist the filtered user data into MongoDB collection "filtered_users_info"
filteredDF.writeStream
.format("mongo")
.option("collection", "filtered_users_info")
.outputMode("append")
.option("checkpointLocation", "/path/to/checkpoint")
.start()
.awaitTermination()
MongoDB configurations, Kafka setup, and data schema. Make sure to replace the placeholder values (localhost:27017, DB1, DB2, /path/to/checkpoint, etc.) with the appropriate values for your setup.
This code assumes that you have the necessary dependencies and libraries set up correctly, including the MongoDB Spark Connector and Kafka integration with Spark. You may need to add the required dependencies to your project configuration.
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"fb242c34-0eef-4fd3-aac9-64a081c9e981"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685023904258,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":4}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685025327533,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":0}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685025490044,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":4}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685085689998,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":8}}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SparkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache com.nisum.producer.SparkProducer SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache com.nisum.producer.SparkProducer Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- Kafka Streaming -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>compile</scope>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
</project>
\ No newline at end of file
package com.nisum.consumer;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.nisum.entity.UserCardInfo;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;
public class SparkConsumer {
static String TOPIC = "UserInfoTopic";
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/UserDB.users_info")
.getOrCreate();
Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", "localhost:9092");
kafkaConfigMap.put("subscribe", TOPIC);
kafkaConfigMap.put("startingOffsets", "earliest");
kafkaConfigMap.put("failOnDataLoss", "false");
kafkaConfigMap.put("kafka.con.group.id", "local");
Dataset<Row> rawItemDF = spark
.readStream()
.format("kafka")
.options(kafkaConfigMap)
.load();
StructType structType = Encoders.bean(UserCardInfo.class).schema();
Dataset<Row> df = rawItemDF
.selectExpr("CAST(value as String) as message")
.select(functions.from_json(functions.col("message"), structType).as("data"))
.select("data.*")
.repartition(400);
Dataset<Row> filteredDataFrame = df.filter(functions.col("cardType").equalTo("Visa"));
Dataset<UserCardInfo> ds = filteredDataFrame.as(Encoders.bean(UserCardInfo.class));
StreamingQuery query = ds.writeStream().outputMode(OutputMode.Append())
.option("checkpointLocation", "checkpointlocaion/streamingjob")
.foreach(new ForeachWriter<UserCardInfo>() {
private MongoClient mongoClient;
private MongoDatabase database;
private MongoCollection<Document> collection;
@Override
public boolean open(long partitionId, long epochId) {
mongoClient = MongoClients.create("mongodb://localhost:27017");
database = mongoClient.getDatabase("UserDB");
collection = database.getCollection("user_card_details");
return true;
}
@Override
public void process(UserCardInfo userCardInfo) {
// Create a document with the object data
Document document = new Document();
document.append("userId", userCardInfo.getUserId());
document.append("userName", userCardInfo.getUserName());
document.append("regMobileNo", userCardInfo.getRegMobileNo());
document.append("gender", userCardInfo.getGender());
document.append("cardId", userCardInfo.getCardId());
document.append("cardType", userCardInfo.getCardType());
document.append("cardNumber", userCardInfo.getCardNumber());
document.append("cardStatus", userCardInfo.getCardStatus());
// Insert the document into the collection
collection.insertOne(document);
}
@Override
public void close(Throwable errorOrNull) {
// Close the MongoDB connection
mongoClient.close();
}
}).start();
query.awaitTermination();
}
}
package com.nisum.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserCardInfo {
private String userId;
private String userName;
private String regMobileNo;
private String gender;
private String cardType;
private String cardNumber;
private String cardStatus;
private String cardId;
@Override
public String toString() {
return "com.nisum.entity.UserCardInfo{" +
"userId='" + userId + '\'' +
", userName='" + userName + '\'' +
", regMobileNo='" + regMobileNo + '\'' +
", gender='" + gender + '\'' +
", cardType='" + cardType + '\'' +
", cardNumber='" + cardNumber + '\'' +
", cardStatus='" + cardStatus + '\'' +
", cardId='" + cardId + '\'' +
'}';
}
}
package com.nisum.producer;
import com.mongodb.spark.config.ReadConfig;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.storage.StorageLevel;
public class SparkProducer {
static String TOPIC = "UserInfoTopic";
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/UserDB.users_info")
.getOrCreate();
ReadConfig readConfigUserInfo = ReadConfig.create(spark)
.withOption("uri", "mongodb://localhost:27017")
.withOption("database", "UserDB")
.withOption("collection", "users_info");
Dataset<Row> datasetUserInfo = spark.read()
.format("mongo")
.options(readConfigUserInfo.asOptions())
.load();
datasetUserInfo.createOrReplaceTempView("user");
ReadConfig readConfigUserCardInfo = ReadConfig.create(spark)
.withOption("uri", "mongodb://localhost:27017")
.withOption("database", "UserDB")
.withOption("collection", "users_cards_info");
Dataset<Row> datasetUserCardInfo = spark.read()
.format("mongo")
.options(readConfigUserCardInfo.asOptions())
.load();
datasetUserCardInfo.createOrReplaceTempView("card");
datasetUserInfo.show();
datasetUserCardInfo.show();
Dataset<Row> userCardInfo = spark
.sql("SELECT user.gender,user.regMobileNo,user.userName,card.userId as userId,card._id as cardId,card.cardNumber,card.cardStatus,card.cardType FROM user JOIN card ON user._id = card.userId");
userCardInfo.show();
Dataset<Row> userCardInfoJson = userCardInfo.withColumn("value", functions.to_json(functions.struct(functions.col("gender"),
functions.col("regMobileNo"), functions.col("userName"),
functions.col("userId"), functions.col("cardId"),
functions.col("cardNumber"), functions.col("cardStatus"),
functions.col("cardType")))).alias("value").select("value").persist(StorageLevel.MEMORY_AND_DISK());
userCardInfoJson.show(false);
userCardInfoJson
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", TOPIC).save();
spark.close();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment