initial commit

parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="SparkStreamingPOC" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"68449a32-d8aa-4e1e-850f-f85f3b6f6e9b"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685079604942,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":30}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685082497616,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":40}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685099275385,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":220}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685099530009,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":240}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685103227472,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":260}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685104243503,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":300}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685084864674,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":60}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685092846391,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":80}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685093160018,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":100}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685093650004,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":120}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685096736482,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":140}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685096808089,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":160}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685096920004,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":180}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685096990006,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":200}}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"3db90caf-5993-4588-a744-256ae2f6eae7"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1685097607478,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"MyStreamingTopic":{"0":200}}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SparkStreamingPOC</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>3.2.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.5.2</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.nisum.spark.consumer;
import com.nisum.spark.dto.Product;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.StructType;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static com.nisum.spark.util.Constants.*;
public class Consumer {
public static void main(String[] args) throws TimeoutException {
SparkSession sparkSession = SparkSession
.builder().master("local[*]").appName("Spark Streaming Example 2").getOrCreate();
Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", HOST);
kafkaConfigMap.put("subscribe", TOPIC);
kafkaConfigMap.put("startingOffsets", "earliest");
kafkaConfigMap.put("failOnDataLoss", "false");
kafkaConfigMap.put("maxOffsetsPerTrigger", "100");
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.options(kafkaConfigMap)
.load();
if (df != null) {
StructType structType = Encoders.bean(Product.class).schema();
Dataset<Row> df1 = df
.selectExpr("CAST(value as String) as message")
.select(functions.from_json(functions.col("message"), structType).as("data"))
.select("data.*");
Dataset<Row> filteredDataFrame = df1.filter(functions.col("deliveryStatus").equalTo("shipped"))
.filter(functions.col("date").equalTo("25-05-2023"));
Dataset<Product> ds = filteredDataFrame.as(Encoders.bean(Product.class));
StreamingQuery streamingQuery = writeStreamingData(ds);
try {
streamingQuery.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
streamingQuery.stop();
sparkSession.stop();
}
}
}
private static StreamingQuery writeStreamingData(Dataset<Product> ds) throws TimeoutException {
return ds
.writeStream()
.outputMode("append")
.format("csv")
//.option("checkpointLocation","checkpointLocation/jobs")
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 seconds"))
.foreachBatch((batchDF, batchId) -> {
batchDF.write().format("csv").mode("append").save("src/main/resources/productDataOutPutCSV");
})
.start();
}
}
package com.nisum.spark.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Product {
private String productId;
private String productName;
private String productPrice;
private String deliveryStatus;
private String date;
}
package com.nisum.spark.publisher;
import org.apache.spark.sql.*;
import org.apache.spark.storage.StorageLevel;
import java.util.HashMap;
import java.util.Map;
import static com.nisum.spark.util.Constants.*;
public class Publisher {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder().master("local[*]").appName("Spark Streaming Example").getOrCreate();
Dataset<Row> dataset = sparkSession.read().
format("csv").option("header", true).
load("src/main/resources/productData.csv");
dataset.show();
publishCsvRecordsToKafkaTopic(dataset);
}
private static void publishCsvRecordsToKafkaTopic(Dataset<Row> rowDataset) {
if (null != rowDataset) {
Dataset<Row> KafkaPublishJson = rowDataset
.withColumn("value", functions.to_json(functions.struct(functions.col("productId"),
functions.col("productName"), functions.col("productPrice"),
functions.col("deliveryStatus"), functions.col("date"))))
.alias("value").select("value")
.persist(StorageLevel.MEMORY_AND_DISK());
KafkaPublishJson
.write().format("kafka").options(KafkaCsvConfig()).save();
}
}
private static Map<String, String> KafkaCsvConfig() {
Map<String, String> kafkaConfigMap = new HashMap<>();
kafkaConfigMap.put("kafka.bootstrap.servers", HOST);
kafkaConfigMap.put("topic", TOPIC);
return kafkaConfigMap;
}
}
package com.nisum.spark.util;
public class Constants {
public static final String TOPIC = "CSVFileTopic";
public static final String HOST = "localhost:9092";
}
productId,productName,productPrice,deliveryStatus,date
101,mobile,15000,ordered,10-05-2023
102,charger,25000,delivered,15-05-2023
103,cooker,35000,shipped,25-05-2023
104,tv,45000,shipped,25-05-2023
105,laptop,55000,delivered,25-05-2023
106,bottle,1500,ordered,12-05-2023
107,mouse,1000,delivered,15-05-2023
108,keyboard,500,shipped,25-05-2023
109,table,10000,shipped,22-05-2023
110,phone,12000,delivered,23-05-2023
111,shop,150000,ordered,16-05-2023
112,byke,75000,delivered,18-05-2023
113,car,20000,shipped,27-05-2023
114,cycle,33000,shipped,28-05-2023
115,bed,15000,delivered,26-05-2023
116,chairs,12000,ordered,15-05-2023
117,server,19555,delivered,18-05-2023
118,van,89200,shipped,25-05-2023
119,designing,14000,shipped,25-05-2023
120,interier,13000,delivered,25-05-2023
productId,productName,productPrice,deliveryStatus,date
101,mobile,15000,ordered,10-05-2023
102,charger,25000,delivered,15-05-2023
103,cooker,35000,shipped,25-05-2023
104,tv,45000,shipped,25-05-2023
105,laptop,55000,delivered,25-05-2023
106,bottle,1500,ordered,12-05-2023
107,mouse,1000,delivered,15-05-2023
108,keyboard,500,shipped,25-05-2023
109,table,10000,shipped,22-05-2023
110,phone,12000,delivered,23-05-2023
111,shop,150000,ordered,16-05-2023
112,byke,75000,delivered,18-05-2023
113,car,20000,shipped,27-05-2023
114,cycle,33000,shipped,28-05-2023
115,bed,15000,delivered,26-05-2023
116,chairs,12000,ordered,15-05-2023
117,server,19555,delivered,18-05-2023
118,van,89200,shipped,25-05-2023
119,designing,14000,shipped,25-05-2023
120,interier,13000,delivered,25-05-2023
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"103","productName":"cooker","productPrice":"35000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"104","productName":"tv","productPrice":"45000"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"108","productName":"keyboard","productPrice":"500"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"118","productName":"van","productPrice":"89200"}
{"date":"25-05-2023","deliveryStatus":"shipped","productId":"119","productName":"designing","productPrice":"14000"}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment