added loggers

parent e3addc7e
package com.nisum.spark.consumer; package com.nisum.spark.consumer;
import com.nisum.spark.dto.Product; import com.nisum.spark.dto.Product;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -12,11 +15,12 @@ import java.util.concurrent.TimeoutException; ...@@ -12,11 +15,12 @@ import java.util.concurrent.TimeoutException;
import static com.nisum.spark.util.Constants.*; import static com.nisum.spark.util.Constants.*;
public class Consumer { public class Consumer {
static Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) throws TimeoutException { public static void main(String[] args) throws TimeoutException {
SparkSession sparkSession = SparkSession SparkSession sparkSession = SparkSession
.builder().master("local[*]").appName("Spark Streaming Example 2").getOrCreate(); .builder().master("local[*]").appName("Spark Streaming Example 2").getOrCreate();
log.debug("spark session is created");
Map<String, String> kafkaConfigMap = new HashMap<>(); Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", HOST); kafkaConfigMap.put("kafka.bootstrap.servers", HOST);
kafkaConfigMap.put("subscribe", TOPIC); kafkaConfigMap.put("subscribe", TOPIC);
...@@ -29,7 +33,7 @@ public class Consumer { ...@@ -29,7 +33,7 @@ public class Consumer {
.format("kafka") .format("kafka")
.options(kafkaConfigMap) .options(kafkaConfigMap)
.load(); .load();
log.debug("kafka is loaded");
if (df != null) { if (df != null) {
StructType structType = Encoders.bean(Product.class).schema(); StructType structType = Encoders.bean(Product.class).schema();
...@@ -56,6 +60,7 @@ public class Consumer { ...@@ -56,6 +60,7 @@ public class Consumer {
} }
private static StreamingQuery writeStreamingData(Dataset<Product> ds) throws TimeoutException { private static StreamingQuery writeStreamingData(Dataset<Product> ds) throws TimeoutException {
log.debug("writing to CSV file");
return ds return ds
.writeStream() .writeStream()
.outputMode("append") .outputMode("append")
...@@ -66,5 +71,6 @@ public class Consumer { ...@@ -66,5 +71,6 @@ public class Consumer {
batchDF.write().format("csv").mode("append").save("src/main/resources/productDataOutPutCSV"); batchDF.write().format("csv").mode("append").save("src/main/resources/productDataOutPutCSV");
}) })
.start(); .start();
} }
} }
package com.nisum.spark.publisher; package com.nisum.spark.publisher;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.storage.StorageLevel; import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static com.nisum.spark.util.Constants.*; import static com.nisum.spark.util.Constants.*;
public class Publisher { public class Publisher {
static Logger log = LoggerFactory.getLogger(Publisher.class);
public static void main(String[] args) { public static void main(String[] args) {
SparkSession sparkSession = SparkSession SparkSession sparkSession = SparkSession
.builder().master("local[*]").appName("Spark Streaming Example").getOrCreate(); .builder().master("local[*]").appName("Spark Streaming Example").getOrCreate();
log.debug("spark session is created");
Dataset<Row> dataset = sparkSession.read(). Dataset<Row> dataset = sparkSession.read().
format("csv").option("header", true). format("csv").option("header", true).
load("src/main/resources/productData.csv"); load("src/main/resources/productData.csv");
...@@ -22,12 +26,14 @@ public class Publisher { ...@@ -22,12 +26,14 @@ public class Publisher {
} }
private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) { private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) {
if (null != rowDataset) { if (null != rowDataset) {
log.debug("reading from CSV file");
Dataset<Row> KafkaPublishJson = rowDataset Dataset<Row> KafkaPublishJson = rowDataset
.withColumn("value", functions.to_json(functions.struct(functions.col("productId"), .withColumn("value", functions.to_json(functions.struct(functions.col("productId"),
functions.col("productName"), functions.col("productPrice"), functions.col("productName"), functions.col("productPrice"),
functions.col("deliveryStatus"), functions.col("date")))) functions.col("deliveryStatus"), functions.col("date"))))
.alias("value").select("value") .alias("value").select("value")
.persist(StorageLevel.MEMORY_AND_DISK()); .persist(StorageLevel.MEMORY_AND_DISK());
log.debug("publishing to kafka");
KafkaPublishJson KafkaPublishJson
.write().format("kafka").options(KafkaCsvConfig()).save(); .write().format("kafka").options(KafkaCsvConfig()).save();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment