Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
S
Spark-App
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Syed Javed Ali
Spark-App
Commits
3e7c82d8
Commit
3e7c82d8
authored
Apr 20, 2021
by
Syed Javed Ali
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Added examples on Tuple,PairRDD
parent
454ea7d1
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
67 additions
and
8 deletions
+67
-8
Main.java
src/main/java/com/virtualpairprogrammers/Main.java
+37
-8
Main1.java
src/main/java/com/virtualpairprogrammers/Main1.java
+30
-0
No files found.
src/main/java/com/virtualpairprogrammers/Main.java
View file @
3e7c82d8
package
com
.
virtualpairprogrammers
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.log4j.Level
;
import
org.apache.log4j.Logger
;
import
org.apache.spark.SparkConf
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.JavaSparkContext
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
@Slf4j
public
class
Main
{
public
static
void
main
(
String
[]
args
)
{
List
<
Double
>
inputData
=
new
ArrayList
<>();
inputData
.
add
(
55
.5
);
inputData
.
add
(
43
.2
);
inputData
.
add
(
23
.21
);
inputData
.
add
(
67
.2
);
List
<
Integer
>
inputData
=
new
ArrayList
<>();
inputData
.
add
(
55
);
inputData
.
add
(
43
);
inputData
.
add
(
23
);
inputData
.
add
(
67
);
//to avoid unnecessary INFO logs and have WARN log only
Logger
.
getLogger
(
"org.apache"
).
setLevel
(
Level
.
WARN
);
...
...
@@ -24,11 +27,37 @@ public class Main {
SparkConf
conf
=
new
SparkConf
().
setAppName
(
"startingSpark"
)
.
setMaster
(
"local[*]"
);
//represents
spark cluster connection
//represents
a connection with spark cluster
JavaSparkContext
sc
=
new
JavaSparkContext
(
conf
);
//to load java data into rdd
JavaRDD
<
Double
>
myRDD
=
sc
.
parallelize
(
inputData
);
//to load java data into rdd(spark)
JavaRDD
<
Integer
>
myRDD
=
sc
.
parallelize
(
inputData
);
//reduce method to perform operations on rdd
Integer
result
=
myRDD
.
reduce
((
value1
,
value2
)->
value1
+
value2
);
log
.
info
(
" Result: "
+
result
);
//map method to perform operations on rdd
JavaRDD
<
Double
>
sqrtRDD
=
myRDD
.
map
(
value
->
Math
.
sqrt
(
value
));
sqrtRDD
.
foreach
(
res
->
log
.
info
(
"Result :"
+
res
));
//how many elements in sqrtRDD
long
count
=
sqrtRDD
.
count
();
log
.
info
(
"Elements in rdd: "
+
count
);
//how many elements in sqrtRDD
//using map and reduce
JavaRDD
<
Long
>
singleRdd
=
sqrtRDD
.
map
(
value
->
1L
);
Long
count1
=
singleRdd
.
reduce
((
value1
,
value2
)->
value1
+
value2
);
log
.
info
(
"Elements in rdd :"
+
count1
);
// to avoid notserializable exception
sqrtRDD
.
collect
().
forEach
(
System
.
out
::
println
);
//tuples in spark
JavaRDD
<
Tuple2
<
Integer
,
Double
>>
newSqrtRdd
=
myRDD
.
map
(
value
->
new
Tuple2
(
value
,
Math
.
sqrt
(
value
)));
newSqrtRdd
.
collect
().
forEach
(
System
.
out
::
println
);
//close the spark connection
sc
.
close
();
...
...
src/main/java/com/virtualpairprogrammers/Main1.java
0 → 100644
View file @
3e7c82d8
package
com
.
virtualpairprogrammers
;
import
org.apache.log4j.Level
;
import
org.apache.log4j.Logger
;
import
org.apache.spark.SparkConf
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.JavaSparkContext
;
import
java.util.ArrayList
;
import
java.util.List
;
public
class
Main1
{
public
static
void
main
(
String
[]
args
)
{
List
<
String
>
inputData
=
new
ArrayList
<>();
inputData
.
add
(
"WARN: Tuesday 4 September 0405"
);
inputData
.
add
(
"ERROR: Tuesday 4 September 0408"
);
inputData
.
add
(
"FATAL: Wednesday 5 September 1632"
);
inputData
.
add
(
"ERROR: Friday 7 September 1854"
);
inputData
.
add
(
"WARN: Saturday 8 September 1942"
);
Logger
.
getLogger
(
"org.apache"
).
setLevel
(
Level
.
WARN
);
SparkConf
conf
=
new
SparkConf
().
setAppName
(
"startingSpark"
).
setMaster
(
"local[*]"
);
JavaSparkContext
sc
=
new
JavaSparkContext
(
conf
);
JavaRDD
<
String
>
originaLogMessages
=
sc
.
parallelize
(
inputData
);
//pairRDD is similier to Map collection except it can store duplicate keys
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment