Commit 0a74b7ae authored by Ramakanth Dhane's avatar Ramakanth Dhane

Added server ports and removed spark main method

parent aa7f2c24
server.port=8081
app.currency=INR app.currency=INR
app.shipMethod=1 app.shipMethod=1
app.customerType=01 app.customerType=01
......
server.port = 8089 server.port = 8083
bootstrap-server = localhost:9092 bootstrap-server = localhost:9092
group-id = my-order-json-data-group group-id = my-order-json-data-group
auto-offset-reset-config = earliest auto-offset-reset-config = earliest
......
package com.nisum.omd; package com.nisum.omd;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import com.nisum.omd.spark.service.OmdSparkStreamService;
@SpringBootApplication @SpringBootApplication
public class OmdStreamProcessorApplication { public class OmdStreamProcessorApplication {
...@@ -9,5 +14,17 @@ public class OmdStreamProcessorApplication { ...@@ -9,5 +14,17 @@ public class OmdStreamProcessorApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(OmdStreamProcessorApplication.class, args); SpringApplication.run(OmdStreamProcessorApplication.class, args);
} }
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
System.out.println("Let's inspect the beans provided by Spring Boot:");
OmdSparkStreamService sparkService = ctx.getBean(OmdSparkStreamService.class);
sparkService.sparkInvokation();
};
}
} }
...@@ -3,7 +3,8 @@ package com.nisum.omd.kafka.utility; ...@@ -3,7 +3,8 @@ package com.nisum.omd.kafka.utility;
public class OmdKafkaUtility { public class OmdKafkaUtility {
public static final String APPLICATION_NAME = "Streaming Order DStream"; public static final String APPLICATION_NAME = "Streaming Order DStream";
public static final String HADOOP_HOME_DIR_VALUE = "C:/winutils"; //public static final String HADOOP_HOME_DIR_VALUE = "C:/winutils";
public static final String HADOOP_HOME_DIR_VALUE = "/usr/rdhane/server/spark-3.0.0-preview2-bin-hadoop2.7/bin";
public static final String RUN_LOCAL_WITH_AVAILABLE_CORES = "local[*]"; public static final String RUN_LOCAL_WITH_AVAILABLE_CORES = "local[*]";
public static final int BATCH_DURATION_INTERVAL_MS = 50000; public static final int BATCH_DURATION_INTERVAL_MS = 50000;
public static final String KAFKA_BROKERS = "localhost:9092"; public static final String KAFKA_BROKERS = "localhost:9092";
......
...@@ -11,12 +11,14 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; ...@@ -11,12 +11,14 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.springframework.stereotype.Service;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@Service
public class OmdSparkStreamService { public class OmdSparkStreamService {
private static final Map<String, Object> KAFKA_CONSUMER_PROPERTIES; private static final Map<String, Object> KAFKA_CONSUMER_PROPERTIES;
static { static {
...@@ -27,13 +29,12 @@ public class OmdSparkStreamService { ...@@ -27,13 +29,12 @@ public class OmdSparkStreamService {
private static final Collection<String> TOPICS = private static final Collection<String> TOPICS =
Collections.unmodifiableList(Arrays.asList(OmdKafkaUtility.KAFKA_TOPIC)); Collections.unmodifiableList(Arrays.asList(OmdKafkaUtility.KAFKA_TOPIC));
public static void main(String[] args)throws InterruptedException { public void sparkInvokation()throws InterruptedException {
JavaInputDStream<ConsumerRecord<String, String>> orderStream = null; JavaInputDStream<ConsumerRecord<String, String>> orderStream = null;
// System.setProperty("hadoop.home.dir", OmdKafkaUtility.HADOOP_HOME_DIR_VALUE);
System.setProperty("hadoop.home.dir", OmdKafkaUtility.HADOOP_HOME_DIR_VALUE);
SparkConf conf = new SparkConf() SparkConf conf = new SparkConf()
.setMaster(OmdKafkaUtility.RUN_LOCAL_WITH_AVAILABLE_CORES) .setMaster(OmdKafkaUtility.RUN_LOCAL_WITH_AVAILABLE_CORES)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment