Commit aeddac57 authored by Chiranjeevi Pampana's avatar Chiranjeevi Pampana

Spark - POC-2 commits

parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="sparkKafkaDbPoc-2" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>sparkKafkaDbPoc-2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>3.2.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<!-- <scope>compile</scope>-->
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.5.2</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-data-mongodb</artifactId>-->
<!-- </dependency>-->
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.0.5</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.12.8</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-maven-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <excludes>-->
<!-- <exclude>-->
<!-- <groupId>org.projectlombok</groupId>-->
<!-- <artifactId>lombok</artifactId>-->
<!-- </exclude>-->
<!-- </excludes>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- </plugins>-->
</build>
</project>
\ No newline at end of file
package com.sparkpoc.casetwo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class ProductInfo {
private String productId;
private String productName;
private String productPrice;
private String deliveryStatus;
private String timeStamp;
}
\ No newline at end of file
package com.sparkpoc.casetwo;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;
public class SparkConsumeFrmKafkaServiceProcessor {
public static void main(String args[]) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.appName("Java Spark Example")
.config("spark.mongodb.input.uri","mongodb://localhost:27017")
.getOrCreate();
// functions.to_csv(functions.struct("*")).alias("value"))
Dataset<Row> outPutDSFrmKafka = spark.read().format("kafka")
//spark.select(functions.from_csv(functions.struct("*")).alias("value"))
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "kafkaTopicDB01")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "100")
.load();
if (outPutDSFrmKafka !=null) {
StructType productStructSchema = Encoders.bean(ProductInfo.class).schema();
//.select(functions.from_csv(functions.col("message"), productStructSchema).as("data"))
Dataset<Row> outPutDSFrmKafkaMsg = outPutDSFrmKafka.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"), productStructSchema).as("data"))
.select("data.*");
Dataset<Row> filteredDS = outPutDSFrmKafkaMsg.filter(functions.col("deliveryStatus").equalTo("Shipped"))
.filter(functions.col("timeStamp").equalTo("10-07-2023"));
Dataset<ProductInfo> productDS = filteredDS.as(Encoders.bean(ProductInfo.class));
// StreamingQuery streamQuery =
// writefetchedDataToMongoDB(productDS);
// DataStreamWriter<ProductInfo> prodDSWriter =
productDS.write()
// .format("mongodb")
.format("com.mongodb.spark.sql.DefaultSource")
// .option("checkpointLocation", "/tmp/")
// .option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.output.uri","mongodb://localhost:27017/NisumDB.Product23June")
// .option("spark.mongodb.connection.uri", "localhost:27017")
// .option("spark.mongodb.database", "NisumDB")
// .option("spark.mongodb.collection", "ProductInfo")
.save();
// run the query
// StreamingQuery query = prodDSWriter.start();
// try{
// //streamQuery.awaitTermination();
// }catch (Exception e){
// e.printStackTrace();
// // streamQuery.stop();
// spark.stop();
// }
}
}
// private static void writefetchedDataToMongoDB(Dataset<ProductInfo> dataProduct) throws TimeoutException {
// DataStreamWriter<ProductInfo> prodDSWriter = dataProduct.writeStream()
// .format("mongodb")
//// .option("checkpointLocation", "/tmp/")
//// .option("forceDeleteTempCheckpointLocation", "true")
// .option("spark.mongodb.connection.uri", "localhost:9092")
// .option("spark.mongodb.database", "NisumDB")
// .option("spark.mongodb.collection", "ProductInfo")
// .outputMode("append");
//// run the query
// StreamingQuery query = prodDSWriter.start();
//// .format("csv")
//// .outputMode("append")
//// .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 seconds"))
//// .foreachBatch((batchDf,batchId) -> {
//// batchDf.write().format("csv").mode("append").save("src/main/resources/productOutPutCsv");
//// })
// // .awaitTermination();
// }
}
package com.sparkpoc.casetwo;
import org.apache.spark.sql.*;
import java.util.concurrent.TimeoutException;
public class SparkWriteCsvFileToKafkaController {
public static void main(String[] args) throws TimeoutException {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("Java Spark DB")
// .config("spark.ui.enabled", true)
// .config("spark.sql.shuffle.partitions", 2)
.getOrCreate();
Dataset<Row> ds_product = spark.read().format("csv").option("header", true)
.load("src/main/resources/productInfo.csv");
ds_product.show();
if (!ds_product.isEmpty()) {
ds_product.select(functions.to_json(functions.struct("*"))
.alias("value"))
.write()
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","kafkaTopicDB01")
.save();
}
}
}
\ No newline at end of file
productId,productName,productPrice,deliveryStatus,timeStamp
P1001,Mobile01,1000.00,Purchased,10-04-2023
P1002,Mobile02,2000.00,Shipped,10-02-2023
P1003,Mobile03,3000.00,Pending,10-03-2023
P1004,Mobile04,4000.00,Shipped,10-04-2023
P1005,Mobile05,5000.00,Shipped,10-04-2023
P1006,Mobile06,6000.00,Shipped,10-05-2023
P1007,Mobile07,7000.00,Pending,10-05-2023
P1008,Mobile08,8000.00,Shipped,10-07-2023
P1009,Mobile09,9000.00,Pending,10-07-2023
P1004,Mobile04,4000.00,Shipped,10-07-2023
productId,productName,productPrice,deliveryStatus,timeStamp
P1001,Mobile01,1000.00,Purchased,10-04-2023
P1002,Mobile02,2000.00,Shipped,10-02-2023
P1003,Mobile03,3000.00,Pending,10-03-2023
P1004,Mobile04,4000.00,Shipped,10-04-2023
P1005,Mobile05,5000.00,Shipped,10-04-2023
P1006,Mobile06,6000.00,Shipped,10-05-2023
P1007,Mobile07,7000.00,Pending,10-05-2023
P1008,Mobile08,8000.00,Shipped,10-07-2023
P1009,Mobile09,9000.00,Pending,10-07-2023
P1004,Mobile04,4000.00,Shipped,10-07-2023
com\sparkpoc\casetwo\SparkWriteCsvFileToKafkaController.class
com\sparkpoc\casetwo\ProductInfo.class
com\sparkpoc\casetwo\SparkConsumeFrmKafkaServiceProcessor.class
D:\WS-April\sparkKafkaDbPoc-2\src\main\java\com\sparkpoc\casetwo\ProductInfo.java
D:\WS-April\sparkKafkaDbPoc-2\src\main\java\com\sparkpoc\casetwo\SparkConsumeFrmKafkaServiceProcessor.java
D:\WS-April\sparkKafkaDbPoc-2\src\main\java\com\sparkpoc\casetwo\SparkWriteCsvFileToKafkaController.java
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment