loggers added

parent bb46c5d2
......@@ -15,11 +15,13 @@ import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.nisum.utils.UserConstants.*;
public class SparkConsumer {
static Logger log = LoggerFactory.getLogger(SparkConsumer.class);
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder()
......@@ -27,7 +29,7 @@ public class SparkConsumer {
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.output.uri", USERDATA_HOST)
.getOrCreate();
log.debug("spark session is created");
Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", KAFKA_HOST);
kafkaConfigMap.put("subscribe", TOPIC);
......@@ -40,7 +42,7 @@ public class SparkConsumer {
.format("kafka")
.options(kafkaConfigMap)
.load();
log.debug("kafka is loaded");
StructType structType = Encoders.bean(UserCardInfo.class).schema();
Dataset<Row> df = rawItemDF
......@@ -48,9 +50,9 @@ public class SparkConsumer {
.select(functions.from_json(functions.col("message"), structType).as("data"))
.select("data.*")
.repartition(400);
log.debug("consumed data is type casting to string");
Dataset<Row> filteredDataFrame = df.filter(functions.col("cardType").equalTo("Visa"));
log.debug("data filtering is done with visa card type");
Dataset<UserCardInfo> ds = filteredDataFrame.as(Encoders.bean(UserCardInfo.class));
......@@ -84,6 +86,7 @@ public class SparkConsumer {
// Insert the document into the collection
collection.insertOne(document);
log.debug("document inserted" + document);
}
@Override
......
......@@ -7,10 +7,13 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.nisum.utils.UserConstants.*;
public class SparkProducer {
static Logger log = LoggerFactory.getLogger(SparkProducer.class);
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
......@@ -40,12 +43,14 @@ public class SparkProducer {
.options(readConfigUserCardInfo.asOptions())
.load();
datasetUserCardInfo.createOrReplaceTempView("card");
log.debug("datasetUserInfo {}", datasetUserInfo.toJSON());
datasetUserInfo.show();
log.debug("datasetUserCardInfo {}", datasetUserCardInfo.toJSON());
datasetUserCardInfo.show();
Dataset<Row> userCardInfo = spark
.sql("SELECT user.gender,user.regMobileNo,user.userName,card.userId as userId,card._id as cardId,card.cardNumber,card.cardStatus,card.cardType FROM user JOIN card ON user._id = card.userId");
log.debug("userCardInfo ",datasetUserCardInfo.toJSON());
userCardInfo.show();
Dataset<Row> userCardInfoJson = userCardInfo.withColumn("value", functions.to_json(functions.struct(functions.col("gender"),
......@@ -54,7 +59,7 @@ public class SparkProducer {
functions.col("cardNumber"), functions.col("cardStatus"),
functions.col("cardType")))).alias("value").select("value").persist(StorageLevel.MEMORY_AND_DISK());
userCardInfoJson.show(false);
log.debug("userCardInfo ",userCardInfoJson.toJSON());
userCardInfoJson
.write()
.format("kafka")
......
......@@ -4,7 +4,7 @@ public class UserConstants {
public static final String KAFKA_HOST = "localhost:9092";
public static final String TOPIC = "UserInfoTopic";
public static final String URI = "mongodb://localhost:27017";
public static final String USERDATA_HOST = URI+"UserDB.users_info";
public static final String USERDATA_HOST = "mongodb://127.0.0.1/UserDB.users_info";
public static final String CARD_DETAILS_DB = "CardDetailsDB";
public static final String USER_CARD_DETAILS = "user_card_details";
public static final String USER_DB = "UserDB";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment