Commit cef8ca43 authored by Sarika Sama's avatar Sarika Sama

added KafkaToCsvProcess

parent c0c7bc49
...@@ -16,6 +16,9 @@ ...@@ -16,6 +16,9 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<spark.version>3.2.1</spark.version> <spark.version>3.2.1</spark.version>
<scala.version.major>2.12</scala.version.major>
<scala.version.minor>11</scala.version.minor>
<scala.version>${scala.version.major}.${scala.version.minor}</scala.version>
</properties> </properties>
<dependencies> <dependencies>
<dependency> <dependency>
...@@ -67,6 +70,13 @@ ...@@ -67,6 +70,13 @@
<groupId>org.codehaus.janino</groupId> <groupId>org.codehaus.janino</groupId>
<version>3.0.16</version> <version>3.0.16</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version.major}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency> <dependency>
<artifactId>commons-compiler</artifactId> <artifactId>commons-compiler</artifactId>
<groupId>org.codehaus.janino</groupId> <groupId>org.codehaus.janino</groupId>
...@@ -80,11 +90,38 @@ ...@@ -80,11 +90,38 @@
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <artifactId>kafka-clients</artifactId>
<version>3.1.2</version> <version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version.major}</artifactId>
<version>${spark.version}</version>
</dependency> </dependency>
</dependencies> <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
<build> <build>
<plugins> <plugins>
<!-- <plugin> <!-- <plugin>
......
package com.nisum.javaSparkExample.kafka; package com.nisum.javaSparkExample.kafka;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.*;
import com.nisum.javaSparkExample.model.Customers; import com.nisum.javaSparkExample.model.Customers;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.*;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.conversions.Bson;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.TimeoutException;
import static com.mongodb.client.model.Filters.eq;
public class KafkaToCsvProcess { public class KafkaToCsvProcess {
public static void main(String[] args) {
SparkSession sparkSession=SparkSession.builder() @Autowired
static
SparkSession sparkSession;
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession sparkSession = SparkSession.builder()
.master("local[*]") .master("local[*]")
.appName("kafkaToCsvProcess") .appName("kafkaToCsvProcess")
//.config("spark.ui.enabled",true) .config("spark.ui.enabled", true)
.config("spark.sql.shuffle.partitions",2) .config("spark.sql.shuffle.partitions", 2)
.getOrCreate(); .getOrCreate();
Dataset<Customers> customermongoData = sparkSession.read()
.format("mongo")
.option("uri", "mongodb://localhost:27017/mydb.customers")
.load().as(Encoders.bean(Customers.class));
System.out.println(" Customers Data from MongoDB:");
customermongoData.select("customerId", "name", "gender", "transaction_amount").show(false);
Dataset<Customers> kafkaDataFrame = sparkSession
Dataset<Row> kafkaDataFrame = sparkSession
.readStream() .readStream()
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") .option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "customer-topic") .option("subscribe", "customer-topic")
.load().as(Encoders.bean(Customers.class)); .option("auto.offset.reset", "latest")
.load();
kafkaDataFrame .write() Dataset<Row> kafkaData = kafkaDataFrame.selectExpr("CAST(value as String) as message");
.option("header", "true") StructType customer = Encoders.bean(Customers.class).schema();
.format("csv")
.option("path", "/path/to/error/csv/files"); Dataset<Customers> dataset = kafkaData.select(functions.from_json(functions.col("message"), customer).as("data"))
} .select(
"data.customerId","data.name","data.gender","data.transaction_amount"
)
.as(Encoders.bean(Customers.class));
// Define a User-Defined Function (UDF) to check if the ID exists in MongoDB
//
StreamingQuery strQuery =dataset.writeStream()
.outputMode(OutputMode.Append())
.foreach(new ForeachWriter<Customers>() {
private MongoCollection customerX;
private MongoClient client;
@Override
public void process(Customers value) {
try {
Bson runtimeProductsFilterQuery = (eq("customerId", value.getCustomerId()));
Document cust = (Document) customerX.find(runtimeProductsFilterQuery);
if (cust == null) {
kafkaData.write().mode(SaveMode.Overwrite)
.format("csv")
.option("header", true)
.save("D:/spark/customers1.csv");
} else {
System.out.println("ID exist in the collection.");
}
} catch (Exception e) {
System.out.println("error msg");
}
}
@Override
public void close(Throwable errorOrNull) {
client.close();
}
@Override
public boolean open(long partitionId, long version) {
try {
String uri = "mongodb://localhost:27017/mydb.customers";
CodecRegistry pojoCodecRegistry = CodecRegistries.fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build()));
//MongoClientURI clientURI = new MongoClientURI(uri);
client = MongoClients.create(uri);
// Accessing the database
MongoDatabase mongoDatabaseRunTime = client.getDatabase("mydb").withCodecRegistry(pojoCodecRegistry);
customerX = mongoDatabaseRunTime.getCollection("customers");
} catch (Exception e) {
System.out.println("error msg");
}
return true;
}
}).start();
strQuery.awaitTermination();
}
} }
...@@ -14,8 +14,10 @@ import org.springframework.data.mongodb.core.mapping.Document; ...@@ -14,8 +14,10 @@ import org.springframework.data.mongodb.core.mapping.Document;
@Document(collection = "customers") @Document(collection = "customers")
public class Customers { public class Customers {
@Id @Id
private int customerId; private Integer customerId;
private String name; private String name;
private String gender; private String gender;
private int transaction_amount; private Integer transaction_amount;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment