Commit 1edee5cc authored by Sai Teja Ejjigiri's avatar Sai Teja Ejjigiri

Commit for reading the file from local and publishing to mongo and writing it...

Commit for reading the file from local and publishing to mongo and writing it into the csv using Spark
parent e5603847
...@@ -64,6 +64,12 @@ ...@@ -64,6 +64,12 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency> <dependency>
<groupId>com.opencsv</groupId> <groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId> <artifactId>opencsv</artifactId>
......
package com.example.demo; package com.example.demo;
import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
public class SparkSamples { public class SparkKafka {
public static void main(String[] args) throws StreamingQueryException, TimeoutException { public static void main(String[] args) throws StreamingQueryException, TimeoutException {
SparkSession sparkSession = SparkSession.builder() SparkSession sparkSession = SparkSession.builder()
.master("local[*]") .master("local[*]")
...@@ -23,6 +24,7 @@ public class SparkSamples { ...@@ -23,6 +24,7 @@ public class SparkSamples {
dataset.show(); dataset.show();
publishCsvRecordsToKafkaTopic(dataset); publishCsvRecordsToKafkaTopic(dataset);
} }
private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) throws TimeoutException { private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) throws TimeoutException {
...@@ -47,4 +49,5 @@ public class SparkSamples { ...@@ -47,4 +49,5 @@ public class SparkSamples {
kafkaConfigMap.put("topic", "SparkOutput"); kafkaConfigMap.put("topic", "SparkOutput");
return kafkaConfigMap; return kafkaConfigMap;
} }
} }
package com.example.demo;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class SparkMongo {
public static void main(String[] args) throws StreamingQueryException, TimeoutException {
SparkSession sparkSession = SparkSession.builder()
.master("local[*]")
.appName("Spark Streaming Example")
.getOrCreate();
Dataset<Row> dataset = sparkSession.read()
.option("header", true)
.csv("src/main/resources/productData.csv");
dataset.show();
saveToMongoDB(dataset);
}
private static Map<String, String> getMongoConfig() {
Map<String, String> mongoConfigMap = new HashMap<>();
mongoConfigMap.put("spark.mongodb.output.uri", "mongodb://localhost:27017/SparkOutput.SparkOutput");
return mongoConfigMap;
}
// Create a method to save the data into a collection in MongoDB using the Spark-Mongo connector
private static void saveToMongoDB(Dataset<Row> rowDataset) {
if (rowDataset != null) {
Dataset<Row> mongoSaveJson = rowDataset
.selectExpr("to_json(named_struct('productId', productId, " +
"'productName', productName, 'productPrice', productPrice, " +
"'deliveryStatus', deliveryStatus, 'date', date)) AS value")
.persist();
mongoSaveJson
.write()
.format("mongo")
.options(getMongoConfig())
.mode("append")
.save();
}
}
}
package com.example.demo;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class SparkStreamingData {
public static void main(String[] args) throws TimeoutException {
SparkSession sparkSession = SparkSession.builder()
.master("local[*]")
.appName("Spark Streaming Example 2")
.getOrCreate();
Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", "localhost:9092");
kafkaConfigMap.put("subscribe", "SparkOutput");
kafkaConfigMap.put("startingOffsets", "earliest");
kafkaConfigMap.put("failOnDataLoss", "false");
kafkaConfigMap.put("maxOffsetsPerTrigger", "100");
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.options(kafkaConfigMap)
.load();
if (df != null) {
StructType structType = Encoders.bean(Product.class).schema();
Dataset<Row> df1 = df
.selectExpr("CAST(value as String) as message")
.select(functions.from_json(functions.col("message"), structType).as("data"))
.select("data.*");
Dataset<Row> filteredDataFrame = df1
.filter(functions.col("deliveryStatus").equalTo("shipped"))
.filter(functions.col("date").equalTo("25-05-2023"));
Dataset<Product> ds = filteredDataFrame.as(Encoders.bean(Product.class));
StreamingQuery streamingQuery = writeStreamingData(ds);
try {
streamingQuery.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
streamingQuery.stop();
sparkSession.stop();
}
}
}
private static StreamingQuery writeStreamingData(Dataset<Product> ds) throws TimeoutException {
return ds.writeStream()
.outputMode("append")
.format("csv")
.trigger(Trigger.ProcessingTime("10 seconds"))
.foreachBatch((batchDF, batchId) -> {
batchDF.write()
.format("csv")
.mode("append")
.save("src/main/resources/productDataOutPutCSV");
})
.start();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment