Commit d097359a authored by Sarika Sama's avatar Sarika Sama

added read the data from kafka and checking if ID is exist will update or will save to csv file

parent cef8ca43
......@@ -21,8 +21,8 @@ public class JavaSparkExampleApplication {
//read tha data from list
List<Customers> customers = Arrays.asList(
new Customers(01,"sarika","female",1000),
new Customers(02,"sony","female",2000)
//new Customers(01,"sarika","female",1000),
//new Customers(02,"sony","female",2000)
);
......@@ -41,7 +41,7 @@ public class JavaSparkExampleApplication {
//applied transformation to dataset and read the data in console
Dataset df1 = df.filter(functions.col("customerId")
Dataset<Row> df1 = df.filter(functions.col("customerId")
.isNotNull())
.select("name","gender");
df1.show(false);
......
package com.nisum.javaSparkExample.kafka;
import com.google.gson.Gson;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.*;
import com.nisum.javaSparkExample.model.Customers;
import lombok.val;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
......@@ -14,20 +17,12 @@ import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.conversions.Bson;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.TimeoutException;
import static com.mongodb.client.model.Filters.eq;
public class KafkaToCsvProcess {
@Autowired
static
SparkSession sparkSession;
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession sparkSession = SparkSession.builder()
......@@ -50,32 +45,48 @@ public class KafkaToCsvProcess {
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "customer-topic")
.option("group.id","customer-group")
.option("auto.offset.reset", "latest")
// .option("failOnDataLoss", "false")
.load();
//kafkaDataFrame.show(false);
Dataset<Row> kafkaData = kafkaDataFrame.selectExpr("CAST(value as String) as message");
StructType customer = Encoders.bean(Customers.class).schema();
// Dataset<Row> matchingRecords = kafkaDataFrame
// .filter(kafkaDataFrame.col("customerId").isNotNull())
// .select("customerId", "", "name", "gender", "transaction_amount");
//
// Dataset<Row> nonMatchingRecords = kafkaDataFrame
// .filter(kafkaDataFrame.col("customerId").isNull())
// .select("D:/spark1/customers.csv");
Dataset<Customers> dataset = kafkaData.select(functions.from_json(functions.col("message"), customer).as("data"))
.select(
"data.customerId","data.name","data.gender","data.transaction_amount"
)
.as(Encoders.bean(Customers.class));
// Define a User-Defined Function (UDF) to check if the ID exists in MongoDB
//
StreamingQuery strQuery =dataset.writeStream()
StreamingQuery strQuery =kafkaData.writeStream()
.outputMode(OutputMode.Append())
.foreach(new ForeachWriter<Customers>() {
//.option("checkpointLocation","/customerLocation")
.foreach(new ForeachWriter<Row>() {
private MongoCollection customerX;
private MongoClient client;
@Override
public void process(Customers value) {
public void process(Row value) {
Gson gson = new Gson();
Customers customers = gson.fromJson(value.getString(0),Customers.class);
System.out.println(customers);
try {
Bson runtimeProductsFilterQuery = (eq("customerId", value.getCustomerId()));
Document cust = (Document) customerX.find(runtimeProductsFilterQuery);
Bson runtimeProductsFilterQuery = (eq("customerId", customers.getCustomerId()));
Document cust = (Document) customerX.find(runtimeProductsFilterQuery).first();
if (cust == null) {
......@@ -83,6 +94,7 @@ public class KafkaToCsvProcess {
.format("csv")
.option("header", true)
.save("D:/spark/customers1.csv");
} else {
System.out.println("ID exist in the collection.");
}
......@@ -91,8 +103,8 @@ public class KafkaToCsvProcess {
System.out.println("error msg");
}
}
//
}
@Override
public void close(Throwable errorOrNull) {
......
package com.nisum.javaSparkExample.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nisum.javaSparkExample.model.Customers;
import org.apache.spark.sql.Row;
public class ObjectMapperTest {
public static void main(String[] args) throws JsonProcessingException {
ObjectMapper objectMapper= new ObjectMapper();
objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
Customers customers = objectMapper.readValue("{\"customerId\":1,\"name\":\"sarika\",\"gender\":\"female\",\"transaction_amount\":20000}", Customers.class);
System.out.println(customers);
}
}
package com.nisum.javaSparkExample.kafka;
import com.nisum.javaSparkExample.model.Customers;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.expr;
public class ReadKafkaToCsv {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.master("local[*]")
.appName("kafkaToCsvProcess")
.config("spark.ui.enabled", true)
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate();
Dataset<Customers> customermongoData = sparkSession.read()
.format("mongo")
.option("uri", "mongodb://localhost:27017/mydb.customers")
.load().as(Encoders.bean(Customers.class));
System.out.println(" Customers Data from MongoDB:");
customermongoData.select("customerId", "name", "gender", "transaction_amount").show(false);
Dataset<Row> kafkaDataFrame =sparkSession.read()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "customer-topic")
.option("group.id", "customer-group")
.option("auto.offset.reset", "latest")
.load();
Dataset<Row> kafkaData = kafkaDataFrame.selectExpr("CAST(value as String) as message");
StructType customer = Encoders.bean(Customers.class).schema();
Dataset<Customers> dataset = kafkaData.select(functions.from_json(functions.col("message"), customer).as("data"))
.select(
"data.customerId","data.name","data.gender","data.transaction_amount"
)
.as(Encoders.bean(Customers.class));
dataset.show(false);
// Dataset<Customers> kafkaDataFrame1 =dataset.as(Encoders.bean(Customers.class));
//
//
//// Dataset<Row> matchingRecords = customermongoData.join(kafkaDataFrame,customermongoData.col("customerId")
//// .equalTo(kafkaDataFrame.col("customerId")), "inner");
// kafkaDataFrame1.show(false);
Dataset<Row> joinedData = dataset.as("k")
.join(customermongoData.as("m"), expr("k.customerId = m.customerId"), "left_outer")
.select(
col("m.customerId"),
col("k.name").alias("name"),
col("m.gender"),
col("k.transaction_amount").alias("transaction_amount")
);
// Check if any records exist after joining
if (joinedData.count() > 0) {
// Records with matching "customerId" found, perform update operation
// For example, if you want to update the "name" and "transaction_amount" in customermongoData with values from kafkaDataFrame:
customermongoData = customermongoData.as("m")
.join(dataset.as("k"), col("m.customerId").equalTo(col("k.customerId")), "left_outer")
.select(
col("m.customerId"),
col("k.name").alias("name"),
col("m.gender"),
col("k.transaction_amount").alias("transaction_amount")
).as(Encoders.bean(Customers.class));
//.drop("customerId").as(Encoders.bean(Customers.class)); // Drop the duplicate customerId column from kafkaDataFrame
} else {
// Records with matching "customerId" not found, save to CSV file
customermongoData.write().csv("D:/kafkaspark/customers.csv");
}
// Show the updated customermongoData
System.out.println("Customers Data after update:");
customermongoData.show(false);
}
}
package com.nisum.javaSparkExample.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.*;
import org.apache.hadoop.shaded.org.codehaus.jackson.annotate.JsonProperty;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document(collection = "customers")
//@ToString
//@AllArgsConstructor
//@NoArgsConstructor
@Getter
@Setter
//@Builder
//@Document(collection = "customers")
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customers {
@Id
private Integer customerId;
@JsonProperty("customerId")
private String customerId;
@JsonProperty("name")
private String name;
@JsonProperty("gender")
private String gender;
private Integer transaction_amount;
@JsonProperty("transaction_amount")
private String transaction_amount;
/*
public Customers(Integer customerId, String name, String gender, Integer transaction_amount) {
this.customerId = customerId;
this.name = name;
this.gender = gender;
this.transaction_amount = transaction_amount;
}
public Integer getCustomerId() {
return customerId;
}
public void setCustomerId(Integer customerId) {
this.customerId = customerId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public Integer getTransaction_amount() {
return transaction_amount;
}
public void setTransaction_amount(Integer transaction_amount) {
this.transaction_amount = transaction_amount;
}
public Customers() {
}
@Override
public String toString() {
return "Customers{" +
"customerId=" + customerId +
", name='" + name + '\'' +
", gender='" + gender + '\'' +
", transaction_amount=" + transaction_amount +
'}';
}*/
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment