package com.example.demo;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class SparkKafka {

    public static void main(String[] args) throws StreamingQueryException, TimeoutException {
        SparkSession sparkSession = SparkSession.builder()
                .master("local[*]")
                .appName("Spark Streaming Example")
                .getOrCreate();

        Dataset<Row> dataset = sparkSession.read()
                .option("header", true)
                .csv("src/main/resources/productData.csv");

        dataset.show();

        publishCsvRecordsToKafkaTopic(dataset);

    }

    private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) throws TimeoutException {
        if (rowDataset != null) {
            Dataset<Row> kafkaPublishJson = rowDataset
                    .selectExpr("to_json(named_struct('productId', productId, " +
                            "'productName', productName, 'productPrice', productPrice, " +
                            "'deliveryStatus', deliveryStatus, 'date', date)) AS value")
                    .persist();

            kafkaPublishJson
                    .write()
                    .format("kafka")
                    .options(getKafkaCsvConfig())
                    .save();
        }
    }

    private static Map<String, String> getKafkaCsvConfig() {
        Map<String, String> kafkaConfigMap = new HashMap<>();
        kafkaConfigMap.put("kafka.bootstrap.servers", "127.0.0.1:9092");
        kafkaConfigMap.put("topic", "SparkOutput");
        return kafkaConfigMap;
    }

}