Commit 7b86a9a5 authored by Sarika Sama's avatar Sarika Sama

added KafkaToCsvProcess

parent 8bcb2c3c
...@@ -18,10 +18,14 @@ ...@@ -18,10 +18,14 @@
<spark.version>3.2.1</spark.version> <spark.version>3.2.1</spark.version>
</properties> </properties>
<dependencies> <dependencies>
<!--<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.data</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-data-commons</artifactId>
</dependency>--> </dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
</dependency>
<!--<dependency> <!--<dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId> <artifactId>spring-boot-devtools</artifactId>
...@@ -73,6 +77,11 @@ ...@@ -73,6 +77,11 @@
<artifactId>mongo-spark-connector_2.12</artifactId> <artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.1</version> <version>3.0.1</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies> </dependencies>
......
...@@ -35,7 +35,9 @@ public class JavaSparkExampleApplication { ...@@ -35,7 +35,9 @@ public class JavaSparkExampleApplication {
.load().as(Encoders.bean(Customers.class)); .load().as(Encoders.bean(Customers.class));
// Show data from MongoDB // Show data from MongoDB
System.out.println(" Customers Data from MongoDB:"); System.out.println(" Customers Data from MongoDB:");
customermongoData.show(false); //customermongoData.show(false);
customermongoData.select("customerId","name","gender","transaction_amount").show(false);
//applied transformation to dataset and read the data in console //applied transformation to dataset and read the data in console
...@@ -64,7 +66,8 @@ public class JavaSparkExampleApplication { ...@@ -64,7 +66,8 @@ public class JavaSparkExampleApplication {
.load().as(Encoders.bean(Customers.class)); .load().as(Encoders.bean(Customers.class));
// Show data from MongoDB // Show data from MongoDB
System.out.println(" Customers Data from MongoDB:"); System.out.println(" Customers Data from MongoDB:");
customersData.show(false); //customersData.show(false);
customersData.select("customerId","name","gender","transaction_amount").show(false);
Dataset<Orders> ordersmongoData = sparkSession.read() Dataset<Orders> ordersmongoData = sparkSession.read()
.format("mongo") .format("mongo")
...@@ -72,7 +75,8 @@ public class JavaSparkExampleApplication { ...@@ -72,7 +75,8 @@ public class JavaSparkExampleApplication {
.load().as(Encoders.bean(Orders.class)); .load().as(Encoders.bean(Orders.class));
// Show data from MongoDB // Show data from MongoDB
System.out.println(" Orders Data from MongoDB :"); System.out.println(" Orders Data from MongoDB :");
ordersmongoData.show(false); //ordersmongoData.show(false);
ordersmongoData.select("orderId","customerId","orderName","status").show(false);
//read the data from mongodb //read the data from mongodb
Dataset<Users> usersmongoData = sparkSession.read() Dataset<Users> usersmongoData = sparkSession.read()
...@@ -81,12 +85,17 @@ public class JavaSparkExampleApplication { ...@@ -81,12 +85,17 @@ public class JavaSparkExampleApplication {
.load().as(Encoders.bean(Users.class)); .load().as(Encoders.bean(Users.class));
// Show data from MongoDB // Show data from MongoDB
System.out.println(" users Data from MongoDB :"); System.out.println(" users Data from MongoDB :");
usersmongoData.show(false); //usersmongoData.show(false);
usersmongoData.select("userId","userName","email","address").show(false);
Dataset<Row> userData = customersData.join(ordersmongoData,customersData.col("customerId") // Dataset<Row> userData = customersData.join(ordersmongoData,customersData.col("customerId")
.equalTo(ordersmongoData.col("customerId")), "inner"); // .equalTo(ordersmongoData.col("customerId")), "inner");
Dataset<Row> userData=customersData.as("cust").join(ordersmongoData.as("ord"),ordersmongoData.col("customerId").equalTo(customersData.col("customerId")))
.select(functions.col("cust.customerId"),functions.col("cust.name"),functions.col("cust.gender"),
functions.col("cust.transaction_amount"),functions.col("ord.orderId"),functions.col("ord.orderName"),
functions.col("ord.status"));
userData.write() userData.write()
...@@ -94,10 +103,15 @@ public class JavaSparkExampleApplication { ...@@ -94,10 +103,15 @@ public class JavaSparkExampleApplication {
.option("uri", "mongodb://localhost:27017/mydb.users") .option("uri", "mongodb://localhost:27017/mydb.users")
.mode(SaveMode.Append) .mode(SaveMode.Append)
.save(); .save();
userData.show(false); userData.show(false);
sparkSession.stop();
// Dataset<Row> updatedData = userData.withColumn("customerId", functions.col("_id")).drop("_id");
//
// updatedData.write().format("mongo").mode("overwrite").save();
// sparkSession.stop();
......
package com.nisum.javaSparkExample.kafka;
import com.nisum.javaSparkExample.model.Customers;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class KafkaToCsvProcess {
public static void main(String[] args) {
SparkSession sparkSession=SparkSession.builder()
.master("local[*]")
.appName("kafkaToCsvProcess")
//.config("spark.ui.enabled",true)
.config("spark.sql.shuffle.partitions",2)
.getOrCreate();
Dataset<Customers> kafkaDataFrame = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "customer-topic")
.load().as(Encoders.bean(Customers.class));
kafkaDataFrame .write()
.option("header", "true")
.format("csv")
.option("path", "/path/to/error/csv/files");
}
}
...@@ -4,13 +4,16 @@ import lombok.AllArgsConstructor; ...@@ -4,13 +4,16 @@ import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@ToString @ToString
@Document(collection = "customers")
public class Customers { public class Customers {
@Id
private int customerId; private int customerId;
private String name; private String name;
private String gender; private String gender;
......
...@@ -4,13 +4,16 @@ import lombok.AllArgsConstructor; ...@@ -4,13 +4,16 @@ import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@ToString @ToString
@Document(collection = "orders")
public class Orders { public class Orders {
@Id
private int orderId; private int orderId;
private int customerId; private int customerId;
private String orderName; private String orderName;
......
...@@ -4,12 +4,17 @@ import lombok.AllArgsConstructor; ...@@ -4,12 +4,17 @@ import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@ToString @ToString
@Document(collection = "users")
public class Users { public class Users {
@Id
private int userId; private int userId;
private String userName; private String userName;
private String email; private String email;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment