Moved to constants

parent b4ccd2e9
...@@ -9,6 +9,7 @@ import java.util.Map; ...@@ -9,6 +9,7 @@ import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import com.nisum.entity.UserCardInfo; import com.nisum.entity.UserCardInfo;
import com.nisum.utils.UserConstants;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQuery;
...@@ -16,18 +17,20 @@ import org.apache.spark.sql.streaming.StreamingQueryException; ...@@ -16,18 +17,20 @@ import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType;
import org.bson.Document; import org.bson.Document;
import static com.nisum.utils.UserConstants.*;
public class SparkConsumer { public class SparkConsumer {
static String TOPIC = "UserInfoTopic";
public static void main(String[] args) throws TimeoutException, StreamingQueryException { public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder() SparkSession spark = SparkSession.builder()
.master("local[*]") .master("local[*]")
.appName("MongoSparkConnectorIntro") .appName("MongoSparkConnectorIntro")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/UserDB.users_info") .config("spark.mongodb.output.uri", USERDB_HOST)
.getOrCreate(); .getOrCreate();
Map<String, String> kafkaConfigMap = new HashMap<>(); Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", "localhost:9092"); kafkaConfigMap.put("kafka.bootstrap.servers", KAFKA_HOST);
kafkaConfigMap.put("subscribe", TOPIC); kafkaConfigMap.put("subscribe", TOPIC);
kafkaConfigMap.put("startingOffsets", "earliest"); kafkaConfigMap.put("startingOffsets", "earliest");
kafkaConfigMap.put("failOnDataLoss", "false"); kafkaConfigMap.put("failOnDataLoss", "false");
...@@ -61,9 +64,9 @@ public class SparkConsumer { ...@@ -61,9 +64,9 @@ public class SparkConsumer {
@Override @Override
public boolean open(long partitionId, long epochId) { public boolean open(long partitionId, long epochId) {
mongoClient = MongoClients.create("mongodb://localhost:27017"); mongoClient = MongoClients.create(URI);
database = mongoClient.getDatabase("UserDB"); database = mongoClient.getDatabase(CARD_DETAILS_DB);
collection = database.getCollection("user_card_details"); collection = database.getCollection(USER_CARD_DETAILS);
return true; return true;
} }
......
...@@ -4,28 +4,29 @@ import com.mongodb.spark.config.ReadConfig; ...@@ -4,28 +4,29 @@ import com.mongodb.spark.config.ReadConfig;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import com.nisum.utils.UserConstants;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions; import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.storage.StorageLevel; import org.apache.spark.storage.StorageLevel;
import static com.nisum.utils.UserConstants.*;
public class SparkProducer { public class SparkProducer {
static String TOPIC = "UserInfoTopic";
public static void main(String[] args) throws TimeoutException, StreamingQueryException { public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder() SparkSession spark = SparkSession.builder()
.master("local") .master("local")
.appName("MongoSparkConnectorIntro") .appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/UserDB.users_info") .config("spark.mongodb.input.uri", USERDB_HOST)
.getOrCreate(); .getOrCreate();
ReadConfig readConfigUserInfo = ReadConfig.create(spark) ReadConfig readConfigUserInfo = ReadConfig.create(spark)
.withOption("uri", "mongodb://localhost:27017") .withOption("uri", URI)
.withOption("database", "UserDB") .withOption("database", USER_DB)
.withOption("collection", "users_info"); .withOption("collection", USERS_INFO);
Dataset<Row> datasetUserInfo = spark.read() Dataset<Row> datasetUserInfo = spark.read()
.format("mongo") .format("mongo")
...@@ -34,9 +35,9 @@ public class SparkProducer { ...@@ -34,9 +35,9 @@ public class SparkProducer {
datasetUserInfo.createOrReplaceTempView("user"); datasetUserInfo.createOrReplaceTempView("user");
ReadConfig readConfigUserCardInfo = ReadConfig.create(spark) ReadConfig readConfigUserCardInfo = ReadConfig.create(spark)
.withOption("uri", "mongodb://localhost:27017") .withOption("uri", URI)
.withOption("database", "UserDB") .withOption("database", USER_DB)
.withOption("collection", "users_cards_info"); .withOption("collection", USERS_CARDS_INFO);
Dataset<Row> datasetUserCardInfo = spark.read() Dataset<Row> datasetUserCardInfo = spark.read()
.format("mongo") .format("mongo")
...@@ -61,7 +62,7 @@ public class SparkProducer { ...@@ -61,7 +62,7 @@ public class SparkProducer {
userCardInfoJson userCardInfoJson
.write() .write()
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") .option("kafka.bootstrap.servers", KAFKA_HOST)
.option("topic", TOPIC).save(); .option("topic", TOPIC).save();
spark.close(); spark.close();
......
package com.nisum.utils;
public class UserConstants {
public static final String KAFKA_HOST = "localhost:9092";
public static final String TOPIC = "UserInfoTopic";
public static final String USERDB_HOST = "mongodb://127.0.0.1/UserDB.users_info";
public static final String URI = "mongodb://localhost:27017";
public static final String CARD_DETAILS_DB = "CardDetailsDB";
public static final String USER_CARD_DETAILS = "user_card_details";
public static final String USER_DB = "UserDB";
public static final String USERS_INFO = "users_info";
public static final String USERS_CARDS_INFO = "users_cards_info";
}
\ No newline at end of file
com.nisum.topicName = UserInfoTopic
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment