Commit 2402140f authored by Sai Teja Ejjigiri's avatar Sai Teja Ejjigiri

Commit for reading the file from local and woth spark and writing it into the csv

parent 6ed90cf0
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId> <artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version> <version>2.7.12</version>
<relativePath/> <!-- lookup parent from repository --> <relativePath/> <!-- lookup parent from repository -->
</parent> </parent>
<groupId>com.nisum</groupId>
<artifactId>sparkpoc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>sparkpoc</name>
<description>Demo project for Spark Poc</description>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version> <java.version>1.8</java.version>
</properties> </properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Logging dependency -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- For Apache Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
...@@ -42,9 +78,16 @@ ...@@ -42,9 +78,16 @@
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
package com.example.demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Product {
private String productId;
private String productName;
private String productPrice;
private String deliveryStatus;
private String date;
}
package com.example.demo;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.storage.StorageLevel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class SparkSamples {
public static void main(String[] args) throws StreamingQueryException, TimeoutException {
SparkSession sparkSession = SparkSession
.builder().master("local[*]").appName("Spark Streaming Example").getOrCreate();
Dataset<Row> dataset = sparkSession.read().
format("csv").option("header",true).
load("src/main/resources/productData.csv");
dataset.show();
publishCsvRecordsToKafkaTopic(dataset);
}
private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) throws TimeoutException {
if (null != rowDataset) {
Dataset<Row> KafkaPublishJson = rowDataset
.withColumn("value", functions.to_json(functions.struct(functions.col("productId"),
functions.col("productName"), functions.col("productPrice"),
functions.col("deliveryStatus"), functions.col("date"))))
.alias("value").select("value")
.persist(StorageLevel.MEMORY_AND_DISK());
KafkaPublishJson
.write().format("kafka").options(KafkaCsvConfig()).save();
}
}
private static Map<String,String> KafkaCsvConfig(){
Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers","127.0.0.1:9092");
kafkaConfigMap.put("topic", "SparkOutput");
return kafkaConfigMap;
}
}
package com.example.demo;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.StructType;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class SparkStreamingData {
public static void main(String[] args) throws TimeoutException {
SparkSession sparkSession = SparkSession
.builder().master("local[*]").appName("Spark Streaming Example 2").getOrCreate();
Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", "localhost:9092");
kafkaConfigMap.put("subscribe", "MyStreamingTopic");
kafkaConfigMap.put("startingOffsets", "earliest");
kafkaConfigMap.put("failOnDataLoss", "false");
kafkaConfigMap.put("maxOffsetsPerTrigger","100");
// kafkaConfigMap.put("kafka.con.group.id", "localStream");
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.options(kafkaConfigMap)
.load();
if(df != null){
StructType structType = Encoders.bean(Product.class).schema();
Dataset<Row> df1 = df
.selectExpr("CAST(value as String) as message")
.select(functions.from_json(functions.col("message"), structType).as("data"))
.select("data.*");
Dataset<Row> filteredDataFrame = df1.filter(functions.col("deliveryStatus").equalTo("shipped"))
.filter(functions.col("date").equalTo("25-05-2023"));
Dataset<Product> ds = filteredDataFrame.as(Encoders.bean(Product.class));
StreamingQuery streamingQuery = writeStreamingData(ds);
try{
streamingQuery.awaitTermination();
}catch (Exception e){
e.printStackTrace();
streamingQuery.stop();
sparkSession.stop();
}
}
}
private static StreamingQuery writeStreamingData(Dataset<Product> ds) throws TimeoutException {
return ds
.writeStream()
.outputMode("append")
.format("csv")
//.option("checkpointLocation","checkpointLocation/jobs")
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 seconds"))
.foreachBatch((batchDF, batchId) -> {
batchDF.write().format("csv").mode("append").save("src/main/resources/productDataOutPutCSV");
})
.start();
}
private static Dataset<Row> readKafkaCsvData(SparkSession sparkSession) {
Map<String, String> kafkaConfigData = new HashMap<>();
kafkaConfigData.put("kafka.bootstrap.servers","127.0.0.1:9092");
kafkaConfigData.put("subscribe","MyStreamingTopic");
kafkaConfigData.put("startingOffsets","earliest");
kafkaConfigData.put("failOnDataLoss","false");
kafkaConfigData.put("kafka.con.group.id","localStream");
//sparkSession.streams().addListener((StreamingQueryListener) kafkaConfigData);
Dataset<Row> csvData = sparkSession.readStream().format("kafka")
.options(kafkaConfigData).load();
return csvData.selectExpr("cast(value as String) as message");
}
}
In this code, we read data from a file named input.txt and count the frequency of each word using a HashMap. Then, we create a priority queue of Map.Entry objects, where the entries are sorted by decreasing value (i.e., frequency count). Finally, we print the top 5 most frequently occurring elements from the priority queue.
Note that this code assumes that words in the input file are separated by whitespace (i.e., spaces, tabs, or newlines). If your input file uses a different delimiter, you'll need to adjust the split() method accordingly.
\ No newline at end of file
productId,productName,productPrice,deliveryStatus,date
101,mobile,15000,ordered,10-05-2023
102,charger,25000,delivered,15-05-2023
103,cooker,35000,shipped,25-05-2023
104,tv,45000,shipped,25-05-2023
105,laptop,55000,delivered,25-05-2023
106,bottle,1500,ordered,12-05-2023
107,mouse,1000,delivered,15-05-2023
108,keyboard,500,shipped,25-05-2023
109,table,10000,shipped,22-05-2023
110,phone,12000,delivered,23-05-2023
111,shop,150000,ordered,16-05-2023
112,byke,75000,delivered,18-05-2023
113,car,20000,shipped,27-05-2023
114,cycle,33000,shipped,28-05-2023
115,bed,15000,delivered,26-05-2023
116,chairs,12000,ordered,15-05-2023
117,server,19555,delivered,18-05-2023
118,van,89200,shipped,25-05-2023
119,designing,14000,shipped,25-05-2023
120,interier,13000,delivered,25-05-2023
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment