Commit c0c7bc49 authored by Sarika Sama's avatar Sarika Sama

added KafkaToCsvProcess

parent 7b86a9a5
......@@ -53,11 +53,6 @@ public class JavaSparkExampleApplication {
.option("header",true)
.save(outputPath);
// Dataset<Row> df2 = sparkSession.read()
// .format("csv")
// .option("header", true)
// .load(outputPath);
//create two collections and save into another collection
Dataset<Customers> customersData = sparkSession.read()
......@@ -97,7 +92,6 @@ public class JavaSparkExampleApplication {
functions.col("cust.transaction_amount"),functions.col("ord.orderId"),functions.col("ord.orderName"),
functions.col("ord.status"));
userData.write()
.format("mongo")
.option("uri", "mongodb://localhost:27017/mydb.users")
......@@ -105,14 +99,7 @@ public class JavaSparkExampleApplication {
.save();
userData.show(false);
// Dataset<Row> updatedData = userData.withColumn("customerId", functions.col("_id")).drop("_id");
//
// updatedData.write().format("mongo").mode("overwrite").save();
// sparkSession.stop();
sparkSession.stop();
}
......
......@@ -19,6 +19,7 @@ public class KafkaToCsvProcess {
Dataset<Customers> kafkaDataFrame = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
......@@ -26,6 +27,7 @@ public class KafkaToCsvProcess {
.load().as(Encoders.bean(Customers.class));
kafkaDataFrame .write()
.option("header", "true")
.format("csv")
.option("path", "/path/to/error/csv/files");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment