Commit 8bcb2c3c authored by Sarika Sama's avatar Sarika Sama

added KafkaToCsvProcess

parent 56894188
package com.nisum.javaSparkExample; package com.nisum.javaSparkExample;
import com.nisum.javaSparkExample.model.Customers; import com.nisum.javaSparkExample.model.Customers;
import com.nisum.javaSparkExample.model.Orders;
import com.nisum.javaSparkExample.model.Users;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import java.util.Arrays; import java.util.Arrays;
...@@ -16,6 +18,8 @@ public class JavaSparkExampleApplication { ...@@ -16,6 +18,8 @@ public class JavaSparkExampleApplication {
//.config("spark.ui.enabled",true) //.config("spark.ui.enabled",true)
.config("spark.sql.shuffle.partitions",2) .config("spark.sql.shuffle.partitions",2)
.getOrCreate(); .getOrCreate();
//read tha data from list
List<Customers> customers = Arrays.asList( List<Customers> customers = Arrays.asList(
new Customers(01,"sarika","female",1000), new Customers(01,"sarika","female",1000),
new Customers(02,"sony","female",2000) new Customers(02,"sony","female",2000)
...@@ -25,25 +29,76 @@ public class JavaSparkExampleApplication { ...@@ -25,25 +29,76 @@ public class JavaSparkExampleApplication {
Dataset<Row> df = sparkSession.createDataFrame(customers,Customers.class); Dataset<Row> df = sparkSession.createDataFrame(customers,Customers.class);
df.show(false); df.show(false);
Dataset<Customers> mongoData = sparkSession.read() Dataset<Customers> customermongoData = sparkSession.read()
.format("mongo") .format("mongo")
.option("uri", "mongodb://localhost:27017/mydb.customers") .option("uri", "mongodb://localhost:27017/mydb.customers")
.load().as(Encoders.bean(Customers.class)); .load().as(Encoders.bean(Customers.class));
// Show data from MongoDB
// Show data from MongoDB System.out.println(" Customers Data from MongoDB:");
System.out.println("Data from MongoDB:"); customermongoData.show(false);
mongoData.show(false);
Dataset df1 = df.filter(functions.col("id") //applied transformation to dataset and read the data in console
Dataset df1 = df.filter(functions.col("customerId")
.isNotNull()) .isNotNull())
.select("name","gender"); .select("name","gender");
df1.show(false); df1.show(false);
// write the data with write() and read in console
String outputPath = "D:/spark/customers1.csv";
df1.write().mode(SaveMode.Overwrite) df1.write().mode(SaveMode.Overwrite)
.format("csv") .format("csv")
.option("header",true) .option("header",true)
.save("D:/spark/customers.csv"); .save(outputPath);
// Dataset<Row> df2 = sparkSession.read()
// .format("csv")
// .option("header", true)
// .load(outputPath);
//create two collections and save into another collection
Dataset<Customers> customersData = sparkSession.read()
.format("mongo")
.option("uri", "mongodb://localhost:27017/mydb.customers")
.load().as(Encoders.bean(Customers.class));
// Show data from MongoDB
System.out.println(" Customers Data from MongoDB:");
customersData.show(false);
Dataset<Orders> ordersmongoData = sparkSession.read()
.format("mongo")
.option("uri", "mongodb://localhost:27017/mydb.orders")
.load().as(Encoders.bean(Orders.class));
// Show data from MongoDB
System.out.println(" Orders Data from MongoDB :");
ordersmongoData.show(false);
//read the data from mongodb
Dataset<Users> usersmongoData = sparkSession.read()
.format("mongo")
.option("uri", "mongodb://localhost:27017/mydb.users")
.load().as(Encoders.bean(Users.class));
// Show data from MongoDB
System.out.println(" users Data from MongoDB :");
usersmongoData.show(false);
Dataset<Row> userData = customersData.join(ordersmongoData,customersData.col("customerId")
.equalTo(ordersmongoData.col("customerId")), "inner");
userData.write()
.format("mongo")
.option("uri", "mongodb://localhost:27017/mydb.users")
.mode(SaveMode.Append)
.save();
userData.show(false);
sparkSession.stop();
} }
......
...@@ -11,7 +11,7 @@ import lombok.ToString; ...@@ -11,7 +11,7 @@ import lombok.ToString;
@ToString @ToString
public class Customers { public class Customers {
private int id; private int customerId;
private String name; private String name;
private String gender; private String gender;
private int transaction_amount; private int transaction_amount;
......
package com.nisum.javaSparkExample.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Orders {
private int orderId;
private int customerId;
private String orderName;
private String status;
}
package com.nisum.javaSparkExample.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Users {
private int userId;
private String userName;
private String email;
private String address;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment