package com.sparkpoc.caseone;

import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;

import java.util.concurrent.TimeoutException;

public class SparkReadFrmKafkaServiceProcessor {

    public static  void main(String args[]) throws TimeoutException, StreamingQueryException {
        SparkSession spark = SparkSession.builder()
                .master("local[*]")
                .appName("Java Spark Example")
                .getOrCreate();
        // functions.to_csv(functions.struct("*")).alias("value"))
        Dataset<Row> outPutDSFrmKafka = spark.readStream().format("kafka")
                //spark.select(functions.from_csv(functions.struct("*")).alias("value"))
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "kafkaTopic333")
                .option("startingOffsets", "earliest")
                .option("failOnDataLoss", "false")
                .option("maxOffsetsPerTrigger", "100")
                .load();

        if (outPutDSFrmKafka !=null) {
            StructType productStructSchema = Encoders.bean(ProductInfo.class).schema();
            //.select(functions.from_csv(functions.col("message"), productStructSchema).as("data"))
            Dataset<Row> outPutDSFrmKafkaMsg = outPutDSFrmKafka.selectExpr("CAST(value AS STRING) as message")
                    .select(functions.from_json(functions.col("message"), productStructSchema).as("data"))
                    .select("data.*");

            Dataset<Row> filteredDS = outPutDSFrmKafkaMsg.filter(functions.col("deliveryStatus").equalTo("Shipped"))
                    .filter(functions.col("date").equalTo("10-07-2023"));

            Dataset<ProductInfo> productDS = filteredDS.as(Encoders.bean(ProductInfo.class));
            StreamingQuery streamQuery = writefetchedDataToLocal(productDS);
            try{
                streamQuery.awaitTermination();
            }catch (Exception e){
                e.printStackTrace();
                streamQuery.stop();
                spark.stop();
            }
        }
    }
    private static StreamingQuery writefetchedDataToLocal(Dataset<ProductInfo> dataProduct) throws TimeoutException {
        return dataProduct.writeStream()
                .format("csv")
                .outputMode("append")
                .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 seconds"))
                .foreachBatch((batchDf,batchId) -> {
                        batchDf.write().format("csv").mode("append").save("src/main/resources/productOutPutCsv");
                })
                .start();
               // .awaitTermination();
    }
}