package com.nisum.javaSparkExample.kafka; import com.google.gson.Gson; import com.mongodb.MongoClientSettings; import com.mongodb.client.*; import com.nisum.javaSparkExample.model.Customers; import lombok.val; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.StructType; import org.bson.Document; import org.bson.codecs.configuration.CodecRegistries; import org.bson.codecs.configuration.CodecRegistry; import org.bson.codecs.pojo.PojoCodecProvider; import org.bson.conversions.Bson; import java.util.concurrent.TimeoutException; import static com.mongodb.client.model.Filters.eq; public class KafkaToCsvProcess { public static void main(String[] args) throws TimeoutException, StreamingQueryException { SparkSession sparkSession = SparkSession.builder() .master("local[*]") .appName("kafkaToCsvProcess") .config("spark.ui.enabled", true) .config("spark.sql.shuffle.partitions", 2) .getOrCreate(); Dataset customermongoData = sparkSession.read() .format("mongo") .option("uri", "mongodb://localhost:27017/mydb.customers") .load().as(Encoders.bean(Customers.class)); System.out.println(" Customers Data from MongoDB:"); customermongoData.select("customerId", "name", "gender", "transaction_amount").show(false); Dataset kafkaDataFrame = sparkSession .readStream() .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "customer-topic") .option("group.id","customer-group") .option("auto.offset.reset", "latest") // .option("failOnDataLoss", "false") .load(); //kafkaDataFrame.show(false); Dataset kafkaData = kafkaDataFrame.selectExpr("CAST(value as String) as message"); StructType customer = Encoders.bean(Customers.class).schema(); // Dataset matchingRecords = kafkaDataFrame // .filter(kafkaDataFrame.col("customerId").isNotNull()) // .select("customerId", "", "name", "gender", "transaction_amount"); // // Dataset nonMatchingRecords = kafkaDataFrame // .filter(kafkaDataFrame.col("customerId").isNull()) // .select("D:/spark1/customers.csv"); Dataset dataset = kafkaData.select(functions.from_json(functions.col("message"), customer).as("data")) .select( "data.customerId","data.name","data.gender","data.transaction_amount" ) .as(Encoders.bean(Customers.class)); StreamingQuery strQuery =kafkaData.writeStream() .outputMode(OutputMode.Append()) //.option("checkpointLocation","/customerLocation") .foreach(new ForeachWriter() { private MongoCollection customerX; private MongoClient client; @Override public void process(Row value) { Gson gson = new Gson(); Customers customers = gson.fromJson(value.getString(0),Customers.class); System.out.println(customers); try { Bson runtimeProductsFilterQuery = (eq("customerId", customers.getCustomerId())); Document cust = (Document) customerX.find(runtimeProductsFilterQuery).first(); if (cust == null) { kafkaData.write().mode(SaveMode.Overwrite) .format("csv") .option("header", true) .save("D:/spark/customers1.csv"); } else { System.out.println("ID exist in the collection."); } } catch (Exception e) { System.out.println("error msg"); } // } @Override public void close(Throwable errorOrNull) { client.close(); } @Override public boolean open(long partitionId, long version) { try { String uri = "mongodb://localhost:27017/mydb.customers"; CodecRegistry pojoCodecRegistry = CodecRegistries.fromRegistries( MongoClientSettings.getDefaultCodecRegistry(), CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build())); //MongoClientURI clientURI = new MongoClientURI(uri); client = MongoClients.create(uri); // Accessing the database MongoDatabase mongoDatabaseRunTime = client.getDatabase("mydb").withCodecRegistry(pojoCodecRegistry); customerX = mongoDatabaseRunTime.getCollection("customers"); } catch (Exception e) { System.out.println("error msg"); } return true; } }).start(); strQuery.awaitTermination(); } }