Commit e5603847 authored by Sai Teja Ejjigiri's avatar Sai Teja Ejjigiri

Commit for reading the file from local and publishing to Kafka and writing it...

Commit for reading the file from local and publishing to Kafka and writing it into the csv using Spark
parent 5b3fc20e
package com.example.demo; package com.example.demo;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.storage.StorageLevel; import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -11,35 +11,39 @@ import java.util.concurrent.TimeoutException; ...@@ -11,35 +11,39 @@ import java.util.concurrent.TimeoutException;
public class SparkSamples { public class SparkSamples {
public static void main(String[] args) throws StreamingQueryException, TimeoutException { public static void main(String[] args) throws StreamingQueryException, TimeoutException {
SparkSession sparkSession = SparkSession SparkSession sparkSession = SparkSession.builder()
.builder().master("local[*]").appName("Spark Streaming Example").getOrCreate(); .master("local[*]")
.appName("Spark Streaming Example")
.getOrCreate();
Dataset<Row> dataset = sparkSession.read(). Dataset<Row> dataset = sparkSession.read()
format("csv").option("header",true). .option("header", true)
load("src/main/resources/productData.csv"); .csv("src/main/resources/productData.csv");
dataset.show(); dataset.show();
publishCsvRecordsToKafkaTopic(dataset); publishCsvRecordsToKafkaTopic(dataset);
} }
private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) throws TimeoutException { private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) throws TimeoutException {
if (null != rowDataset) { if (rowDataset != null) {
Dataset<Row> KafkaPublishJson = rowDataset Dataset<Row> kafkaPublishJson = rowDataset
.withColumn("value", functions.to_json(functions.struct(functions.col("productId"), .selectExpr("to_json(named_struct('productId', productId, " +
functions.col("productName"), functions.col("productPrice"), "'productName', productName, 'productPrice', productPrice, " +
functions.col("deliveryStatus"), functions.col("date")))) "'deliveryStatus', deliveryStatus, 'date', date)) AS value")
.alias("value").select("value") .persist();
.persist(StorageLevel.MEMORY_AND_DISK());
KafkaPublishJson kafkaPublishJson
.write().format("kafka").options(KafkaCsvConfig()).save(); .write()
.format("kafka")
.options(getKafkaCsvConfig())
.save();
} }
} }
private static Map<String,String> KafkaCsvConfig(){ private static Map<String, String> getKafkaCsvConfig() {
Map<String, String> kafkaConfigMap = new HashMap<>(); Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers","127.0.0.1:9092"); kafkaConfigMap.put("kafka.bootstrap.servers", "127.0.0.1:9092");
kafkaConfigMap.put("topic", "SparkOutput"); kafkaConfigMap.put("topic", "SparkOutput");
return kafkaConfigMap; return kafkaConfigMap;
} }
......
package com.example.demo; package com.example.demo;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType;
import java.util.HashMap; import java.util.HashMap;
...@@ -12,16 +11,17 @@ import java.util.concurrent.TimeoutException; ...@@ -12,16 +11,17 @@ import java.util.concurrent.TimeoutException;
public class SparkStreamingData { public class SparkStreamingData {
public static void main(String[] args) throws TimeoutException { public static void main(String[] args) throws TimeoutException {
SparkSession sparkSession = SparkSession SparkSession sparkSession = SparkSession.builder()
.builder().master("local[*]").appName("Spark Streaming Example 2").getOrCreate(); .master("local[*]")
.appName("Spark Streaming Example 2")
.getOrCreate();
Map<String, String> kafkaConfigMap = new HashMap<>(); Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", "localhost:9092"); kafkaConfigMap.put("kafka.bootstrap.servers", "localhost:9092");
kafkaConfigMap.put("subscribe", "SparkOutput"); kafkaConfigMap.put("subscribe", "SparkOutput");
kafkaConfigMap.put("startingOffsets", "earliest"); kafkaConfigMap.put("startingOffsets", "earliest");
kafkaConfigMap.put("failOnDataLoss", "false"); kafkaConfigMap.put("failOnDataLoss", "false");
kafkaConfigMap.put("maxOffsetsPerTrigger","100"); kafkaConfigMap.put("maxOffsetsPerTrigger", "100");
// kafkaConfigMap.put("kafka.con.group.id", "localStream");
Dataset<Row> df = sparkSession Dataset<Row> df = sparkSession
.readStream() .readStream()
...@@ -29,8 +29,7 @@ public class SparkStreamingData { ...@@ -29,8 +29,7 @@ public class SparkStreamingData {
.options(kafkaConfigMap) .options(kafkaConfigMap)
.load(); .load();
if (df != null) {
if(df != null){
StructType structType = Encoders.bean(Product.class).schema(); StructType structType = Encoders.bean(Product.class).schema();
Dataset<Row> df1 = df Dataset<Row> df1 = df
...@@ -38,49 +37,35 @@ public class SparkStreamingData { ...@@ -38,49 +37,35 @@ public class SparkStreamingData {
.select(functions.from_json(functions.col("message"), structType).as("data")) .select(functions.from_json(functions.col("message"), structType).as("data"))
.select("data.*"); .select("data.*");
Dataset<Row> filteredDataFrame = df1.filter(functions.col("deliveryStatus").equalTo("shipped")) Dataset<Row> filteredDataFrame = df1
.filter(functions.col("deliveryStatus").equalTo("shipped"))
.filter(functions.col("date").equalTo("25-05-2023")); .filter(functions.col("date").equalTo("25-05-2023"));
Dataset<Product> ds = filteredDataFrame.as(Encoders.bean(Product.class)); Dataset<Product> ds = filteredDataFrame.as(Encoders.bean(Product.class));
StreamingQuery streamingQuery = writeStreamingData(ds); StreamingQuery streamingQuery = writeStreamingData(ds);
try{ try {
streamingQuery.awaitTermination(); streamingQuery.awaitTermination();
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
streamingQuery.stop(); streamingQuery.stop();
sparkSession.stop(); sparkSession.stop();
} }
} }
} }
private static StreamingQuery writeStreamingData(Dataset<Product> ds) throws TimeoutException { private static StreamingQuery writeStreamingData(Dataset<Product> ds) throws TimeoutException {
return ds return ds.writeStream()
.writeStream()
.outputMode("append") .outputMode("append")
.format("csv") .format("csv")
//.option("checkpointLocation","checkpointLocation/jobs") .trigger(Trigger.ProcessingTime("10 seconds"))
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 seconds"))
.foreachBatch((batchDF, batchId) -> { .foreachBatch((batchDF, batchId) -> {
batchDF.write().format("csv").mode("append").save("src/main/resources/productDataOutPutCSV"); batchDF.write()
.format("csv")
.mode("append")
.save("src/main/resources/productDataOutPutCSV");
}) })
.start(); .start();
} }
private static Dataset<Row> readKafkaCsvData(SparkSession sparkSession) {
Map<String, String> kafkaConfigData = new HashMap<>();
kafkaConfigData.put("kafka.bootstrap.servers","127.0.0.1:9092");
kafkaConfigData.put("subscribe","SparkOutput");
kafkaConfigData.put("startingOffsets","earliest");
kafkaConfigData.put("failOnDataLoss","false");
kafkaConfigData.put("kafka.con.group.id","localStream");
//sparkSession.streams().addListener((StreamingQueryListener) kafkaConfigData);
Dataset<Row> csvData = sparkSession.readStream().format("kafka")
.options(kafkaConfigData).load();
return csvData.selectExpr("cast(value as String) as message");
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment