Commit 27c6c8d6 authored by karthik's avatar karthik

for spark mongo connectivity POC

parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="SparkDemo" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="repository.jfrog.deployment" />
<option name="name" value="Safeway JFrog Repository" />
<option name="url" value="https://artifactory.albertsons.com/artifactory/digital-artifactory-cache/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" default="true" project-jdk-name="11" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.png" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"fb242c34-0eef-4fd3-aac9-64a081c9e981"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685023904258,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":4}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685025327533,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":0}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685025490044,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":4}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685082781359,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":8}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685082898019,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":9}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685082899539,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":12}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685082953467,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":13}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685082955063,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"UserInfoTopic":{"0":16}}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SparkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache SparkProducer SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Apache SparkProducer Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- Kafka Streaming -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.1</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
</project>
\ No newline at end of file
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;
public class SparkConsumer {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
//Create and get Spark session
SparkSession spark = getSparkSession();
//Read data from Kafka
Dataset<Row> rawJsonDataFrame = loadDatafromKafka(spark);
Dataset<Row> df = convertingJsonDatasetToSchemaDataset(rawJsonDataFrame);
//Filtering dataset based on visa cardType
Dataset<Row> filteredDataFrame = df.filter(functions.col("cardType").equalTo("Visa"));
//Converting DataFrame to Dataset
Dataset<UserCardInfo> userCardInfoDataset = filteredDataFrame.as(Encoders.bean(UserCardInfo.class));
//Writing data to mongo db
writeDataToMongoDB(userCardInfoDataset);
}
private static Dataset<Row> convertingJsonDatasetToSchemaDataset(Dataset<Row> rawItemDF) {
StructType structType = Encoders.bean(UserCardInfo.class).schema();
Dataset<Row> df = rawItemDF
.selectExpr("CAST(value as String) as message")
.select(functions.from_json(functions.col("message"), structType).as("data"))
.select("data.*")
.repartition(400);
return df;
}
private static void writeDataToMongoDB(Dataset<UserCardInfo> ds)
throws TimeoutException, StreamingQueryException {
StreamingQuery query = ds.writeStream().outputMode(OutputMode.Append())
.option("checkpointLocation", "checkpointlocaion/streamingjob")
.foreach(new ForeachWriter<UserCardInfo>() {
private MongoClient mongoClient;
private MongoDatabase database;
private MongoCollection<Document> collection;
@Override
public boolean open(long partitionId, long epochId) {
mongoClient = MongoClients.create("mongodb://localhost:27017");
database = mongoClient.getDatabase("userdb");
collection = database.getCollection("user_card_details");
return true;
}
@Override
public void process(UserCardInfo userCardInfo) {
// Create a document with the object data
Document document = new Document();
document.append("userId", userCardInfo.getUserId());
document.append("userName", userCardInfo.getUserName());
document.append("regMobileNo", userCardInfo.getRegMobileNo());
document.append("gender", userCardInfo.getGender());
document.append("cardId", userCardInfo.getCardId());
document.append("cardType", userCardInfo.getCardType());
document.append("cardNumber", userCardInfo.getCardNumber());
document.append("cardStatus", userCardInfo.getCardStatus());
// Insert the document into the collection
collection.insertOne(document);
}
@Override
public void close(Throwable errorOrNull) {
// Close the MongoDB connection
mongoClient.close();
}
}).start();
query.awaitTermination();
}
private static Dataset<Row> loadDatafromKafka(SparkSession spark) {
Dataset<Row> rawDataFrame = spark
.readStream()
.format("kafka")
.options(constructKafkaConfigMap())
.load();
return rawDataFrame;
}
private static Map<String, String> constructKafkaConfigMap() {
Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", "localhost:9092");
kafkaConfigMap.put("subscribe", "UserInfoTopic");
kafkaConfigMap.put("startingOffsets", "earliest");
kafkaConfigMap.put("failOnDataLoss", "false");
kafkaConfigMap.put("kafka.con.group.id", "local");
return kafkaConfigMap;
}
private static SparkSession getSparkSession() {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("SparkConsumer")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/userdb.users_info")
.getOrCreate();
return spark;
}
}
import com.mongodb.spark.config.ReadConfig;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.storage.StorageLevel;
public class SparkProducer {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
//Create and get Spark Session
SparkSession spark = getSparkSession();
//Load userInfo data from mongoDB
Dataset<Row> datasetUserInfo = loadUserInfoFromMongoDB(spark);
//Load cardInfo data from mongoDB
Dataset<Row> datasetUserCardInfo = loadUserCardInfoFromDB(spark);
//Combining user and card info using spark sql
Dataset<Row> userCardInfo = joiningUserAndCardInfo(spark);
//Converting combined dataset into json
Dataset<Row> userCardInfoJson = convertToJsonDataFrame(userCardInfo);
//Publish converted json dataset to kafka
publishToKaka(userCardInfoJson);
//Closing spark session
spark.close();
}
private static void publishToKaka(Dataset<Row> userCardInfoJson) {
userCardInfoJson
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "UserInfoTopic").save();
}
private static Dataset<Row> convertToJsonDataFrame(Dataset<Row> userCardInfo) {
Dataset<Row> userCardInfoJson = userCardInfo
.withColumn("value", functions.to_json(functions.struct(functions.col("gender"),
functions.col("regMobileNo"), functions.col("userName"),
functions.col("userId"), functions.col("cardId"),
functions.col("cardNumber"), functions.col("cardStatus"),
functions.col("cardType")))).alias("value").select("value")
.persist(StorageLevel.MEMORY_AND_DISK());
userCardInfoJson.show(false);
return userCardInfoJson;
}
private static Dataset<Row> joiningUserAndCardInfo(SparkSession spark) {
Dataset<Row> userCardInfo = spark
.sql("SELECT user.gender,user.regMobileNo,user.userName,card.userId as userId,card._id as cardId,card.cardNumber,card.cardStatus,card.cardType FROM user JOIN card ON user._id = card.userId");
userCardInfo.show();
return userCardInfo;
}
private static Dataset<Row> loadUserCardInfoFromDB(SparkSession spark) {
ReadConfig readConfigUserCardInfo = ReadConfig.create(spark)
.withOption("uri", "mongodb://localhost:27017")
.withOption("database", "userdb")
.withOption("collection", "users_cards_info");
Dataset<Row> datasetUserCardInfo = spark.read()
.format("mongo")
.options(readConfigUserCardInfo.asOptions())
.load();
datasetUserCardInfo.createOrReplaceTempView("card");
datasetUserCardInfo.show();
return datasetUserCardInfo;
}
private static Dataset<Row> loadUserInfoFromMongoDB(SparkSession spark) {
ReadConfig readConfigUserInfo = ReadConfig.create(spark)
.withOption("uri", "mongodb://localhost:27017")
.withOption("database", "userdb")
.withOption("collection", "users_info");
Dataset<Row> datasetUserInfo = spark.read()
.format("mongo")
.options(readConfigUserInfo.asOptions())
.load();
datasetUserInfo.createOrReplaceTempView("user");
datasetUserInfo.show();
return datasetUserInfo;
}
private static SparkSession getSparkSession() {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("SparkProducer")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/userdb.users_info")
.getOrCreate();
return spark;
}
}
public class UserCardInfo {
private String userId;
private String userName;
private String regMobileNo;
private String gender;
private String cardType;
private String cardNumber;
private String cardStatus;
private String cardId;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getCardId() {
return cardId;
}
public void setCardId(String cardId) {
this.cardId = cardId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getRegMobileNo() {
return regMobileNo;
}
public void setRegMobileNo(String regMobileNo) {
this.regMobileNo = regMobileNo;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public String getCardType() {
return cardType;
}
public void setCardType(String cardType) {
this.cardType = cardType;
}
public String getCardNumber() {
return cardNumber;
}
public void setCardNumber(String cardNumber) {
this.cardNumber = cardNumber;
}
public String getCardStatus() {
return cardStatus;
}
public void setCardStatus(String cardStatus) {
this.cardStatus = cardStatus;
}
@Override
public String toString() {
return "UserCardInfo{" +
"userId='" + userId + '\'' +
", userName='" + userName + '\'' +
", regMobileNo='" + regMobileNo + '\'' +
", gender='" + gender + '\'' +
", cardType='" + cardType + '\'' +
", cardNumber='" + cardNumber + '\'' +
", cardStatus='" + cardStatus + '\'' +
", cardId='" + cardId + '\'' +
'}';
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment