Commit eb2d0384 authored by uday's avatar uday

spark

parent 90b86d1f
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
v1
{"nextBatchWatermarkMs":0}
\ No newline at end of file
{"id":"98848c01-91ad-46c0-9740-43c0bb6decde"}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1591351583078,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"promotion_spark":{"0":70}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1591351640056,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"promotion_spark":{"0":80}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1591352520691,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"promotion_spark":{"0":90}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1591352687685,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"promotion_spark":{"0":100}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1591352709061,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"promotion_spark":{"0":110}}
\ No newline at end of file
v1
{"batchWatermarkMs":0,"batchTimestampMs":1591353237533,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"promotion_spark":{"0":120}}
\ No newline at end of file
import com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask
plugins {
id 'java'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
maven {
url = 'https://packages.confluent.io/maven/'
}
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.5'
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.5'
// https://mvnrepository.com/artifact/org.projectlombok/lombok
compile group: 'org.projectlombok', name: 'lombok', version: '1.18.12'
implementation 'org.apache.spark:spark-avro_2.11:2.4.5'
implementation('io.confluent:kafka-avro-serializer:5.4.0')
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.4.5'
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10
compile group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: '2.4.5'
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10
compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: '2.4.5'
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.5'
implementation('com.fasterxml.jackson.core:jackson-databind:2.6.7.1') {
force = true
}
}
avro {
createSetters = false
fieldVisibility = "PRIVATE"
stringType = "String"
outputCharacterEncoding = "UTF-8"
}
task generateAvro(type: GenerateAvroJavaTask) {
source("src/main/resources/avro/")
outputDir = file("build/generated/java/main")
}
sourceSets {
main {
java.srcDirs += generateAvro.outputs
}
}
jar {
enabled = true
}
#Wed Jun 03 13:33:22 IST 2020
distributionUrl=https\://services.gradle.org/distributions/gradle-6.1-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
#!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn () {
echo "$*"
}
die () {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=`expr $i + 1`
done
case $i in
0) set -- ;;
1) set -- "$args0" ;;
2) set -- "$args0" "$args1" ;;
3) set -- "$args0" "$args1" "$args2" ;;
4) set -- "$args0" "$args1" "$args2" "$args3" ;;
5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
exec "$JAVACMD" "$@"
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto init
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:init
@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2
:win9xME_args_slurp
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
rootProject.name = 'spark-example'
package com.safeway.epe;
import com.google.common.collect.ImmutableList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
public class CustomObjectsRDD {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Print Elements of RDD")
.setMaster("local[*]").set("spark.executor.memory", "2g");
// start a spark context
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// prepare list of objects
List<Promotion> personList = ImmutableList.of(
new Promotion(123,"big billion"),
new Promotion(124,"Special Offers"),
new Promotion(125,"bank Offers"),
new Promotion(126,"Exrta discount"),
new Promotion(127,"BOGO")
);
List<Offers> offersList =
ImmutableList.of(new Offers(123456,"Diwali Offers",personList));
// parallelize the list using SparkContext
JavaRDD<Promotion> perJavaRDD = sc.parallelize(personList);
JavaRDD<Offers> offersJavaRDD = sc.parallelize(offersList);
JavaRDD<String> mapJavaRDD = perJavaRDD.map(promo -> promo.getPromoType().toUpperCase());
JavaRDD<String> flatMapJavaRDD = offersJavaRDD.flatMap(offers -> Arrays.asList(offers.getPromotion().get(0).getPromoType()).iterator());
for (Promotion promotion : perJavaRDD.collect()) {
System.out.println(promotion.getPromoId() + " \t "+promotion.getPromoType());
}
for(String promo : mapJavaRDD.collect())
{
System.out.println("toUpperCase :"+promo);
}
sc.close();
}
}
package com.safeway.epe;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
public class FirstSparkExample {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Print Elements of RDD")
.setMaster("local[*]").set("spark.executor.memory", "2g");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
List<String> data = Arrays.asList("Learn","Apache","Spark","with","Tutorial Kart");
JavaRDD<String> items = sc.parallelize(data,1);
// apply a function for each element of RDD
items.foreach(item -> {
System.out.println("* "+item);
});
}
}
package com.safeway.epe;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
public class FlatMapExample {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<String> stringList = Arrays.asList(
"WARN: TUESDAY 4 September 0540",
"WARN: Wednesday 5 September 0541",
"ERROR: Thursday 6 September 0542",
"INFO: Friday 7 September 0543",
"FATAL: Saturday 8 September 0544");
SparkConf sparkConf = new SparkConf().setAppName("FlatMapExample").setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
javaSparkContext.parallelize(stringList)
.flatMap(value-> Arrays.asList(value.split(" "))
.iterator()).collect()
.forEach(System.out::println);
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
javaSparkContext.close();
}
}
package com.safeway.epe;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class FluentAPIExample {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<String> stringList = Arrays.asList(
"WARN: TUESDAY 4 September 0540",
"WARN: Wednesday 5 September 0541",
"ERROR: Thursday 6 September 0542",
"INFO: Friday 7 September 0543",
"FATAL: Saturday 8 September 0544");
SparkConf sparkConf = new SparkConf().setAppName("RDDExample").setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
javaSparkContext.parallelize(stringList)
.mapToPair(value ->new Tuple2<>(value.split(":") [0] ,1L))
.reduceByKey((value1 , value2)->value1+value2)
.collect()
.forEach(System.out::println);
javaSparkContext.close();
}
}
package com.safeway.epe;
import com.google.common.collect.Iterables;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class GroupByKeyExample {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<String> stringList = Arrays.asList(
"WARN: TUESDAY 4 September 0540",
"WARN: Wednesday 5 September 0541",
"ERROR: Thursday 6 September 0542",
"INFO: Friday 7 September 0543",
"FATAL: Saturday 8 September 0544");
SparkConf sparkConf = new SparkConf().setAppName("RDDExample").setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
javaSparkContext.parallelize(stringList)
.mapToPair(value->new Tuple2<>(value.split(":")[0],1L))
.groupByKey()
.collect()
.forEach(val-> System.out.println(val._1+"\t"+ Iterables.size(val._2)));
javaSparkContext.close();
}
}
package com.safeway.epe;
import java.util.List;
public class Offers {
private int offerId;
private String offerType;
private List<Promotion> promotion;
public Offers(int offerId, String offerType, List<Promotion> promotion) {
this.offerId = offerId;
this.offerType = offerType;
this.promotion = promotion;
}
public int getOfferId() {
return offerId;
}
public void setOfferId(int offerId) {
this.offerId = offerId;
}
public String getOfferType() {
return offerType;
}
public void setOfferType(String offerType) {
this.offerType = offerType;
}
public List<Promotion> getPromotion() {
return promotion;
}
public void setPromotion(List<Promotion> promotion) {
this.promotion = promotion;
}
@Override
public String toString() {
return "Offers{" +
"offerId=" + offerId +
", offerType='" + offerType + '\'' +
", promotion=" + promotion +
'}';
}
}
package com.safeway.epe;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
public class PairRDDExample {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<String> stringList = Arrays.asList(
"WARN: TUESDAY 4 September 0540",
"WARN: Wednesday 5 September 0541",
"ERROR: Thursday 6 September 0542",
"INFO: Friday 7 September 0543",
"FATAL: Saturday 8 September 0544");
SparkConf sparkConf = new SparkConf().setAppName("RDDExample").setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> stringJavaRDD = javaSparkContext.parallelize(stringList);
JavaPairRDD<String , String > javaPairRDD= stringJavaRDD.mapToPair(value->{
String[] col = value.split(":");
String level = col[0];
String value1 =col[1];
return new Tuple2<String , String>(level,value1);
});
javaPairRDD.collect().forEach(System.out::println);
JavaPairRDD<String , Long > javaPairRDD1= stringJavaRDD.mapToPair(value->{
String[] col = value.split(":");
String level = col[0];
return new Tuple2<String , Long>(level,1L);
});
JavaPairRDD<String , Long> countWithKey = javaPairRDD1.reduceByKey((value1 , value2) ->value1+value2);
countWithKey.collect().forEach(System.out::println);
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
javaSparkContext.close();
}
}
package com.safeway.epe;
import java.io.Serializable;
public class Promotion implements Serializable {
private int promoId;
private String promoType;
public Promotion(int promoId , String promoType)
{
this.promoId = promoId;
this.promoType= promoType;
}
public int getPromoId() {
return promoId;
}
public void setPromoId(int promoId) {
this.promoId = promoId;
}
public String getPromoType() {
return promoType;
}
public void setPromoType(String promoType) {
this.promoType = promoType;
}
}
package com.safeway.epe;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
public class RDDSetup implements Serializable {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<Integer> integerList = Arrays.asList(2,4,3,5,11,44,22);
SparkConf sparkConf = new SparkConf().setAppName("RDDExample").setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD<Integer> integerJavaRDD = javaSparkContext.parallelize(integerList);
Integer value = integerJavaRDD.reduce((Integer i, Integer j) ->i+j );
JavaRDD<Double> doubleList = integerJavaRDD.map(value1->Math.sqrt(value1));
doubleList.foreach(value1-> System.out.println(value1));
doubleList.collect().forEach(System.out::println);
System.out.println(value);
javaSparkContext.close();
}
}
package com.safeway.epe;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class ReadJsonfile {
public static void main(String[] args) {
// configure spark
SparkSession spark = SparkSession
.builder()
.appName("Spark Example - Read JSON to RDD")
.master("local[*]")
.getOrCreate();
// read list to RDD
String jsonPath = "/Users/nisum/Desktop/EPE/ignit2/spark-example/src/main/resources/file/employee.json";
JavaRDD<Row> items = spark.read().json(jsonPath).toJavaRDD();
items.foreach(item -> {
System.out.println(item);
});
}
}
package com.safeway.epe;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
public class TestFileExample {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("TestFileExample").setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("/scr/resources/file/promotion.text");
stringJavaRDD.flatMap(value-> Arrays.asList(value.split(" ")).iterator()).collect()
.forEach(System.out::println);
javaSparkContext.close();
}
}
package com.safeway.epe;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class TuppleExample {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<Integer> integerList = Arrays.asList(2,4,3,5,11,44,22);
List<String> stringList = Arrays.asList(
"WARN: TUESDAY 4 September 0540",
"WARN: Wednesday 5 September 0541",
"ERROR: Thursday 6 September 0542",
"INFO: Friday 7 September 0543",
"FATAL: Saturday 8 September 0544");
SparkConf sparkConf = new SparkConf().setAppName("RDDExample").setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD<Integer> integerJavaRDD = javaSparkContext.parallelize(integerList);
JavaRDD<Double> doubleList = integerJavaRDD.map(value1->Math.sqrt(value1));
JavaRDD<Tuple2<Integer , Double>> tuple2JavaRDD= integerJavaRDD.map(val->new Tuple2<>(val , Math.sqrt(val)));
//JavaRDD<Tuple2<Integer , Double>> myValue = new Tuple2(integerJavaRDD ,doubleList);
tuple2JavaRDD.collect().forEach(System.out::println);
JavaRDD<String> stringJavaRDD = javaSparkContext.parallelize(stringList);
}
}
package com.safeway.epe.kafkastreaming;
import com.safeway.epe.model.Promotion;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.codehaus.jackson.map.ObjectMapper;
import scala.Tuple2;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class KafkaStreamingExample {
static ObjectMapper mapper = new ObjectMapper();
public static void main(String[] args) throws InterruptedException, IOException {
System.setProperty("hadoop.home.dir" , "c:/hadoop");
Logger.getLogger("org.apache").setLevel(Level.WARN);
Logger.getLogger("org.apache.spark.storage").setLevel(Level.ERROR);
Map<String, Object> params = new HashMap<>();
params.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ,"localhost:9092");
params.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
params.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
params.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
params.put(ConsumerConfig.GROUP_ID_CONFIG,"schema");
SparkConf sparkConf = new SparkConf().setAppName("Read file from Kafka").setMaster("local[*]");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf , Durations.seconds(2));
JavaInputDStream<ConsumerRecord<Object, Promotion>> stream = KafkaUtils
.createDirectStream(javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(Arrays.asList("promotion"), params));
JavaDStream<Promotion> result = stream.map(promotion -> promotion.value());
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value())).print();
result.print();
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
}
package com.safeway.epe.kafkastreaming;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
public class StreamingMongoDBExample {
public static void main(String[] args) throws IOException, RestClientException {
SparkConf sparkConf = new SparkConf().setAppName("sending data to mongodb")
.setMaster("local[*]")
.set("spark.mongodb.input.uri", "mongodb://127.0.0.1/promo");
SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate();
SQLContext sqlContext = session.sqlContext();
CachedSchemaRegistryClient cachedSchemaRegistryClient = new CachedSchemaRegistryClient("", 128);
SchemaMetadata latestSchemaMetadata = cachedSchemaRegistryClient.getLatestSchemaMetadata("" + "-value");
Schema schemaRegistryClientById = cachedSchemaRegistryClient.getById(latestSchemaMetadata.getId());
StructType dataType = (StructType) SchemaConverters.toSqlType(schemaRegistryClientById).dataType();
session.udf().register("deserialize", (byte[] data) -> {
CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("", 128);
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
GenericRecord record = (GenericRecord) kafkaAvroDeserializer.deserialize("", data);
return RowFactory.create(Long.parseLong("" + record.get("promotionId")), record.get("promotionType").toString(),
record.get("discount")
);
}, dataType);
}
}
package com.safeway.epe.kafkastreaming;
import com.safeway.epe.model.Promotion;
import com.safeway.epe.producer.SparkKafkaProducer;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.*;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import static org.apache.spark.sql.Encoders.bean;
public class StructuredStreamingExample {
public static void main(String[] args) throws StreamingQueryException, IOException, RestClientException {
System.setProperty("hadoop.home.dir" , "c:/hadoop");
Logger.getLogger("org.apache").setLevel(Level.WARN);
Logger.getLogger("org.apache.spark.storage").setLevel(Level.ERROR);
SparkSession session = SparkSession.builder()
.master("local[*]")
.appName("Structure Streaming with Kafka")
.getOrCreate();
SQLContext sqlContext = session.sqlContext();
CachedSchemaRegistryClient cachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:8081", 128);
SchemaMetadata latestSchemaMetadata = cachedSchemaRegistryClient.getLatestSchemaMetadata("promotion_spark" + "-value");
Schema schemaRegistryClientById = cachedSchemaRegistryClient.getById(latestSchemaMetadata.getId());
StructType dataType = (StructType) SchemaConverters.toSqlType(schemaRegistryClientById).dataType();
session.udf().register("deserialize", (byte[] data) -> {
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(new CachedSchemaRegistryClient("http://localhost:8081", 128));
GenericRecord promotion = (GenericRecord) kafkaAvroDeserializer.deserialize("", data);
return RowFactory.create(promotion.get("promotionId"), promotion.get("promotionType"), promotion.get("discount"));
}, dataType);
Dataset<Promotion> dataset = session.readStream()
.format("kafka")
.option("kafka.bootstrap.servers" , "localhost:9092")
.option("subscribe","promotion_spark")
.load()
.selectExpr("deserialize(value) as value")
.select("value.*")
.as(bean(Promotion.class));
dataset.createOrReplaceTempView("viewresult");
Dataset<Promotion> promotionDataset = sqlContext.sql("select * from viewresult").as(bean(Promotion.class));
StreamingQuery checkpointLocation = promotionDataset.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.option("checkpointLocation", "Desktop/temp/OfferDataProcessor")
.foreach(new SparkKafkaProducer("promo"))
.start();
// Dataset<Row> result = session.sql("select * from viewresult");
// StreamingQuery query = result.writeStream()
// .format("console")
// .outputMode(OutputMode.Append())
// .start();
// Dataset result = session.sql("select * from viewresult");
// StreamingQuery ds = result
// .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// .writeStream()
// .format("kafka")
// .option("kafka.bootstrap.servers", "localhost:9092")
// .option("subscribe","promotion")
// .start();
checkpointLocation.awaitTermination(120000);
}
}
package com.safeway.epe.model;
public class Promotion {
private int promotionId;
private String promotionType;
private int discount;
public Promotion(int promotionId, String promotionType, int discount) {
this.promotionId = promotionId;
this.promotionType = promotionType;
this.discount = discount;
}
public Promotion() {
}
public int getPromotionId() {
return promotionId;
}
public void setPromotionId(int promotionId) {
this.promotionId = promotionId;
}
public String getPromotionType() {
return promotionType;
}
public void setPromotionType(String promotionType) {
this.promotionType = promotionType;
}
public int getDiscount() {
return discount;
}
public void setDiscount(int discount) {
this.discount = discount;
}
@Override
public String toString() {
return "Promotions{" +
"promotionId=" + promotionId +
", promotionType='" + promotionType + '\'' +
", discount=" + discount +
'}';
}
}
package com.safeway.epe.producer;
import com.safeway.epe.model.Promotion;
import com.safeway.epe.model.PromotionData;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.spark.sql.ForeachWriter;
import java.util.Properties;
import java.util.concurrent.Future;
public class SparkKafkaProducer extends ForeachWriter<Promotion> {
private KafkaProducer<String , PromotionData> promotionData;
String topic;
public SparkKafkaProducer(String topic)
{
this.topic=topic;
}
@Override
public boolean open(long l, long l1) {
promotionData = kafkaProducer(PromotionData.class);
return true;
}
@Override
public void process(Promotion promotion) {
PromotionData.Builder promotionBuilder = PromotionData.newBuilder();
promotionBuilder.setPromotionId(promotion.getPromotionId());
promotionBuilder.setPromotionType(promotion.getPromotionType());
promotionBuilder.setDiscount(promotion.getDiscount());
PromotionData promotionData = promotionBuilder.build();
sendPromotion(promotionData);
}
private void sendPromotion(PromotionData promotion) {
try {
ProducerRecord<String, PromotionData> producerRecord = new ProducerRecord<>(topic,promotion);
Future<RecordMetadata> metadataFuture = promotionData.send(producerRecord);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close(Throwable throwable) {
promotionData.close();
}
private <T> KafkaProducer<String, T> kafkaProducer(Class<T> clazz) {
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://127.0.0.1:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://127.0.0.1:8081");
return new KafkaProducer<>(properties);
}
}
package com.safeway.epe.streaming;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.spark.Accumulator;
import org.apache.spark.AccumulatorParam;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
/**
*
*/
public class SparkHandle {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("Test Exceptions");
JavaSparkContext sc = new JavaSparkContext(conf);
Accumulator<HashSet<Throwable>> exceptions = sc.accumulator(new HashSet<>(), new HashSetAccumulatorParam<>());
// // Original
// long result = sc.parallelize(IntStream.range(1, 100).boxed().collect(Collectors.toList()))
// .map(TryMapJava::myConversion)
// .count();
// With try
long result = sc.parallelize(IntStream.range(1, 100).boxed().collect(Collectors.toList()))
.flatMap(TryWrapper.of(exceptions, SparkHandle::myConversion))
.count();
// Printing the result
System.out.println(result);
int i = 1;
for (Throwable t : exceptions.value()) {
System.out.println("Exception " + (i++) + ": " + t.getMessage());
}
}
public static String myConversion(int num) {
if (num % 5 == 0) {
throw new RuntimeException("A forced exception...");
}
return ">> " + num;
}
// Classes used to implement the functionality
public static class TryWrapper<S, R> implements FlatMapFunction<S, R>, Serializable {
private Accumulator<HashSet<Throwable>> exceptions;
private Function<S, R> function;
private TryWrapper(Accumulator<HashSet<Throwable>> exceptions, Function<S, R> function) {
this.exceptions = exceptions;
this.function = function;
}
public static <S, R> TryWrapper<S, R> of(Accumulator<HashSet<Throwable>> exceptions, Function<S, R> function) {
return new TryWrapper<S, R>(exceptions, function);
}
@Override
public Iterator<R> call(S s) throws Exception {
List<R> resultIterable = new LinkedList<>();
try {
R result = function.call(s);
resultIterable.add(result);
} catch (Throwable t) {
HashSet exc = new HashSet();
exc.add(t);
exceptions.add(exc);
}
return resultIterable.iterator();
}
}
public static class HashSetAccumulatorParam<T> implements AccumulatorParam<HashSet<T>> {
@Override
public HashSet<T> addAccumulator(HashSet<T> t1, HashSet<T> t2) {
t1.addAll(t2);
return t1;
}
@Override
public HashSet<T> addInPlace(HashSet<T> r1, HashSet<T> r2) {
return addAccumulator(r1, r2);
}
@Override
public HashSet<T> zero(HashSet<T> initialValue) {
return new HashSet<>();
}
}
}
{
"namespace" : "com.safeway.epe.model",
"type": "record",
"name": "PromotionData",
"fields": [
{"name": "promotionId","type": ["null","int"]},
{"name": "promotionType" , "type": ["null","string"]},
{"name": "discount" , "type":["null","int"]}
]
}
\ No newline at end of file
{"name":"Michael", "salary":3000}
heloo
hi
uday
kumar
singh
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment