Committed changes to configure Spark with Kafka to store the Orders in Mongo DB

parent 72b92fd7
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.nisum.omd</groupId>
<artifactId>omd-stream-processor</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>omd-stream-processor</name>
<description>Demo project for Spring Boot</description>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.nisum.omd</groupId>
<artifactId>omd-stream-processor</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>omd-stream-processor</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.nisum.omd.kafka.connection;
import com.nisum.omd.kafka.utility.OmdKafkaUtility;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.HashMap;
import java.util.Map;
public class OmdKafkaConnection {
public static Map<String ,Object> kakfkaconfig()
{
Map<String, Object> kafkaProperties = new HashMap<>();
kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, OmdKafkaUtility.KAFKA_BROKERS);
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, OmdKafkaUtility.KAFKA_GROUP);
kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OmdKafkaUtility.KAFKA_OFFSET_RESET_TYPE);
kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return kafkaProperties;
}
}
package com.nisum.omd.kafka.utility;
public class OmdKafkaUtility {
public static final String APPLICATION_NAME = "Streaming Order DStream";
public static final String HADOOP_HOME_DIR_VALUE = "C:/winutils";
public static final String RUN_LOCAL_WITH_AVAILABLE_CORES = "local[*]";
public static final int BATCH_DURATION_INTERVAL_MS = 50000;
public static final String KAFKA_BROKERS = "localhost:9092";
public static final String KAFKA_OFFSET_RESET_TYPE = "latest";
public static final String KAFKA_GROUP = "omdgroup";
public static final String KAFKA_TOPIC = "TOPIC_OMD_ORDER_DATA";
public static final String MONGODB_OUTPUT_URI = "mongodb://localhost/Orders.orders";
private OmdKafkaUtility() {
}
}
package com.nisum.omd.mongo.repository;
public interface OmdSparkStreamRepository {
void saveMongoData();
}
package com.nisum.omd.mongo.repository;
import com.mongodb.spark.MongoSpark;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.kafka010.CanCommitOffsets;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.bson.Document;
public class OmdSparkStreamRepositoryImpl {
public static void saveMongoData(JavaInputDStream<ConsumerRecord<String, String>> orderStream) {
JavaDStream<ConsumerRecord<String, String>> recordJavaDStream =
orderStream.filter(f -> !f.value().contains("\"guests\":0"));
recordJavaDStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> r) -> {
MongoSpark.save(
r.map(
e -> Document.parse(e.value())
)
);
});
orderStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> javaRDD) -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) javaRDD.rdd()).offsetRanges();
((CanCommitOffsets) orderStream.inputDStream())
.commitAsync(offsetRanges, new OrderOffsetCommitCallback());
});
}
}
package com.nisum.omd.mongo.repository;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public class OrderOffsetCommitCallback implements OffsetCommitCallback {
private static final Logger log = Logger.getLogger(OrderOffsetCommitCallback.class.getName());
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
log.info("---------------------------------------------------");
log.log(Level.INFO, "{0} | {1}", new Object[]{offsets, exception});
log.info("---------------------------------------------------");
}
}
package com.nisum.omd.spark.service;
import com.nisum.omd.kafka.utility.OmdKafkaUtility;
import com.nisum.omd.kafka.connection.OmdKafkaConnection;
import com.nisum.omd.mongo.repository.OmdSparkStreamRepositoryImpl;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
public class OmdSparkStreamService {
private static final Map<String, Object> KAFKA_CONSUMER_PROPERTIES;
static {
KAFKA_CONSUMER_PROPERTIES = Collections.unmodifiableMap(OmdKafkaConnection.kakfkaconfig());
}
private static final Collection<String> TOPICS =
Collections.unmodifiableList(Arrays.asList(OmdKafkaUtility.KAFKA_TOPIC));
public static void main(String[] args)throws InterruptedException {
JavaInputDStream<ConsumerRecord<String, String>> orderStream = null;
System.setProperty("hadoop.home.dir", OmdKafkaUtility.HADOOP_HOME_DIR_VALUE);
SparkConf conf = new SparkConf()
.setMaster(OmdKafkaUtility.RUN_LOCAL_WITH_AVAILABLE_CORES)
.setAppName(OmdKafkaUtility.APPLICATION_NAME)
.set("spark.mongodb.output.uri", OmdKafkaUtility.MONGODB_OUTPUT_URI);
JavaStreamingContext streamingContext
= new JavaStreamingContext(conf, new Duration(OmdKafkaUtility.BATCH_DURATION_INTERVAL_MS));
orderStream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
);
OmdSparkStreamRepositoryImpl.saveMongoData(orderStream);
streamingContext.start();
streamingContext.awaitTermination();
}
}
log4j.rootLogger=WARN, console
log4j.logger.com.example=INFO
log4j.logger.org.apache=WARN
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=[%-5level] %d{dd-MM-yyyy hh:mm:ss.SSS a} [%thread] %logger{1} - %msg%n
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment