package com.example.demo; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQueryException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; public class SparkKafka { public static void main(String[] args) throws StreamingQueryException, TimeoutException { SparkSession sparkSession = SparkSession.builder() .master("local[*]") .appName("Spark Streaming Example") .getOrCreate(); Dataset<Row> dataset = sparkSession.read() .option("header", true) .csv("src/main/resources/productData.csv"); dataset.show(); publishCsvRecordsToKafkaTopic(dataset); } private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) throws TimeoutException { if (rowDataset != null) { Dataset<Row> kafkaPublishJson = rowDataset .selectExpr("to_json(named_struct('productId', productId, " + "'productName', productName, 'productPrice', productPrice, " + "'deliveryStatus', deliveryStatus, 'date', date)) AS value") .persist(); kafkaPublishJson .write() .format("kafka") .options(getKafkaCsvConfig()) .save(); } } private static Map<String, String> getKafkaCsvConfig() { Map<String, String> kafkaConfigMap = new HashMap<>(); kafkaConfigMap.put("kafka.bootstrap.servers", "127.0.0.1:9092"); kafkaConfigMap.put("topic", "SparkOutput"); return kafkaConfigMap; } }