package com.sparkpoc.caseone; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.StructType; import java.util.concurrent.TimeoutException; public class SparkReadFrmKafkaServiceProcessor { public static void main(String args[]) throws TimeoutException, StreamingQueryException { SparkSession spark = SparkSession.builder() .master("local[*]") .appName("Java Spark Example") .getOrCreate(); // functions.to_csv(functions.struct("*")).alias("value")) Dataset<Row> outPutDSFrmKafka = spark.readStream().format("kafka") //spark.select(functions.from_csv(functions.struct("*")).alias("value")) .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "kafkaTopic333") .option("startingOffsets", "earliest") .option("failOnDataLoss", "false") .option("maxOffsetsPerTrigger", "100") .load(); if (outPutDSFrmKafka !=null) { StructType productStructSchema = Encoders.bean(ProductInfo.class).schema(); //.select(functions.from_csv(functions.col("message"), productStructSchema).as("data")) Dataset<Row> outPutDSFrmKafkaMsg = outPutDSFrmKafka.selectExpr("CAST(value AS STRING) as message") .select(functions.from_json(functions.col("message"), productStructSchema).as("data")) .select("data.*"); Dataset<Row> filteredDS = outPutDSFrmKafkaMsg.filter(functions.col("deliveryStatus").equalTo("Shipped")) .filter(functions.col("date").equalTo("10-07-2023")); Dataset<ProductInfo> productDS = filteredDS.as(Encoders.bean(ProductInfo.class)); StreamingQuery streamQuery = writefetchedDataToLocal(productDS); try{ streamQuery.awaitTermination(); }catch (Exception e){ e.printStackTrace(); streamQuery.stop(); spark.stop(); } } } private static StreamingQuery writefetchedDataToLocal(Dataset<ProductInfo> dataProduct) throws TimeoutException { return dataProduct.writeStream() .format("csv") .outputMode("append") .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 seconds")) .foreachBatch((batchDf,batchId) -> { batchDf.write().format("csv").mode("append").save("src/main/resources/productOutPutCsv"); }) .start(); // .awaitTermination(); } }