Parallel
Programming
With
Spark
Matei
Zaharia
UC
Berkeley
www.spark-project.org
UC
BERKELEY
What
is
Spark?
Fast
and
expressive
cluster
computing
system
compatible
with
Apache
Hadoop
Works
with
any
Hadoop-supported
storage
system
and
data
format
(HDFS,
S3,
SequenceFile,
)
Improves
eciency
through:
In-memory
computing
primitives
General
computation
graphs
As
much
as
30
faster
Improves
usability
through
rich
Scala
and
Java
APIs
and
interactive
shell
Often
2-10
less
code
How
to
Run
It
Local
multicore:
just
a
library
in
your
program
EC2:
scripts
for
launching
a
Spark
cluster
Private
cluster:
Mesos,
YARN*,
standalone*
*Coming
soon
in
Spark
0.6
Scala
vs
Java
APIs
Spark
originally
written
in
Scala,
which
allows
concise
function
syntax
and
interactive
use
Recently
added
Java
API
for
standalone
apps
(dev
branch
on
GitHub)
Interactive
shell
still
in
Scala
This
course:
mostly
Scala,
with
translations
to
Java
Outline
Introduction
to
Scala
&
functional
programming
Spark
concepts
Tour
of
Spark
operations
Job
execution
About
Scala
High-level
language
for
the
Java
VM
Object-oriented
+
functional
programming
Statically
typed
Comparable
in
speed
to
Java
But
often
no
need
to
write
types
due
to
type
inference
Interoperates
with
Java
Can
use
any
Java
class,
inherit
from
it,
etc;
can
also
call
Scala
code
from
Java
Best
Way
to
Learn
Scala
Interactive
shell:
just
type
scala
Supports
importing
libraries,
tab
completion,
and
all
constructs
in
the
language
Quick
Tour
Declaring
variables:
Java
equivalent:
var x: Int = 7
var x = 7 // type inferred
int x = 7;
val y = hi
final String y = hi;
// read-only
Functions:
Java
equivalent:
def square(x: Int): Int = x*x
int square(int x) {
return x*x;
}
def square(x: Int): Int = {
x*x
Last
expression
in
block
returned
}
def announce(text: String) {
println(text)
}
void announce(String text) {
System.out.println(text);
}
Quick
Tour
Generic
types:
Java
equivalent:
var arr = new Array[Int](8)
int[] arr = new int[8];
var lst = List(1, 2, 3)
// type of lst is List[Int]
List<Integer> lst =
new ArrayList<Integer>();
lst.add(...)
Factory
method
Cant
hold
primitive
types
Indexing:
Java
equivalent:
arr(5) = 7
arr[5] = 7;
println(lst(5))
System.out.println(lst.get(5));
Quick
Tour
Processing
collections
with
functional
programming:
val list = List(1, 2, 3)
Function
expression
(closure)
list.foreach(x => println(x))
list.foreach(println)
list.map(x => x + 2)
list.map(_ + 2)
// prints 1, 2, 3
// same
// => List(3, 4, 5)
// same, with placeholder notation
list.filter(x => x % 2 == 1)
list.filter(_ % 2 == 1)
// => List(1, 3)
// => List(1, 3)
list.reduce((x, y) => x + y)
list.reduce(_ + _)
// => 6
// => 6
All
of
these
leave
the
list
unchanged
(List
is
immutable)
Scala
Closure
Syntax
(x: Int) => x + 2
// full version
x => x + 2
// type inferred
_ + 2
// when each argument is used exactly once
x => {
// when body is a block of code
val numberToAdd = 2
x + numberToAdd
}
// If closure is too long, can always pass a function
def addTwo(x: Int): Int = x + 2
list.map(addTwo)
Scala
allows
dening
a
local
function
inside
another
function
Other
Collection
Methods
Scala
collections
provide
many
other
functional
methods;
for
example,
Google
for
Scala
Seq
Method
on
Seq[T]
Explanation
map(f: T => U): Seq[U]
Pass
each
element
through
f
flatMap(f: T => Seq[U]): Seq[U]
One-to-many
map
filter(f: T => Boolean): Seq[T]
Keep
elements
passing
f
exists(f: T => Boolean): Boolean
True
if
one
element
passes
forall(f: T => Boolean): Boolean
True
if
all
elements
pass
reduce(f: (T, T) => T): T
Merge
elements
using
f
groupBy(f: T => K): Map[K,List[T]]
Group
elements
by
f(element)
sortBy(f: T => K): Seq[T]
Sort
elements
by
f(element)
. . .
Outline
Introduction
to
Scala
&
functional
programming
Spark
concepts
Tour
of
Spark
operations
Job
execution
Spark
Overview
Goal:
work
with
distributed
collections
as
you
would
with
local
ones
Concept:
resilient
distributed
datasets
(RDDs)
Immutable
collections
of
objects
spread
across
a
cluster
Built
through
parallel
transformations
(map,
lter,
etc)
Automatically
rebuilt
on
failure
Controllable
persistence
(e.g.
caching
in
RAM)
for
reuse
Main
Primitives
Resilient
distributed
datasets
(RDDs)
Immutable,
partitioned
collections
of
objects
Transformations
(e.g.
map,
lter,
groupBy,
join)
Lazy
operations
to
build
RDDs
from
other
RDDs
Actions
(e.g.
count,
collect,
save)
Return
a
result
or
write
it
to
storage
Example:
Log
Mining
Load
error
messages
from
a
log
into
memory,
then
interactively
search
for
various
patterns
val lines = spark.textFile(hdfs://...)
Base
RDD
Transformed
RDD
results
val errors = lines.filter(_.startsWith(ERROR))
val messages = errors.map(_.split(\t)(2))
messages.cache()
messages.filter(_.contains(foo)).count
Driver
Cac
he
1
Worker
tasks
Block
1
Action
Cache
2
messages.filter(_.contains(bar)).count
Worker
. . .
Cache
3
of
Win
ikipedia
Result:
sfull-text
caled
to
s1earch
TB
data
5-7
sec
in
<1
(vs
sec
(vs
sec
20
fsor
ec
ofn-disk
or
on-disk
data)
170
data)
Worker
Block
3
Block
2
RDD
Fault
Tolerance
RDDs
track
the
series
of
transformations
used
to
build
them
(their
lineage)
to
recompute
lost
data
E.g:
messages
= textFile(...).filter(_.contains(error))
.map(_.split(\t)(2))
HadoopRDD
path
=
hdfs://
FilteredRDD
func
=
_.contains(...)
MappedRDD
func
=
_.split()
Iteratrion
time
(s)
Fault
Recovery
Test
140
120
100
80
60
40
20
0
119
Failure
happens
81
57
56
58
58
57
59
57
59
5
6
Iteration
10
Behavior
with
Less
RAM
58
80
69
60
30
41
40
12
Iteration
time
(s)
100
20
0
Cache
disabled
25%
50%
75%
%
of
working
set
in
cache
Fully
cached
How
it
Looks
in
Java
lines.filter(_.contains(error)).count()
JavaRDD<String> lines = ...;
lines.filter(new Function<String, Boolean>() {
Boolean call(String s) {
return s.contains(error);
}
}).count();
More
examples
in
the
next
talk
Outline
Introduction
to
Scala
&
functional
programming
Spark
concepts
Tour
of
Spark
operations
Job
execution
Learning
Spark
Easiest
way:
Spark
interpreter
(spark-shell)
Modied
version
of
Scala
interpreter
for
cluster
use
Runs
in
local
mode
on
1
thread
by
default,
but
can
control
through
MASTER
environment
var:
MASTER=local
./spark-shell
MASTER=local[2] ./spark-shell
MASTER=host:port ./spark-shell
# local, 1 thread
# local, 2 threads
# run on Mesos
First
Stop:
SparkContext
Main
entry
point
to
Spark
functionality
Created
for
you
in
spark-shell
as
variable
sc
In
standalone
programs,
youd
make
your
own
(see
later
for
details)
Creating
RDDs
// Turn a Scala collection into an RDD
sc.parallelize(List(1, 2, 3))
// Load text file from local FS, HDFS, or S3
sc.textFile(file.txt)
sc.textFile(directory/*.txt)
sc.textFile(hdfs://namenode:9000/path/file)
// Use any existing Hadoop InputFormat
sc.hadoopFile(keyClass, valClass, inputFmt, conf)
Basic
Transformations
val nums = sc.parallelize(List(1, 2, 3))
// Pass each element through a function
val squares = nums.map(x => x*x)
// {1, 4, 9}
// Keep elements passing a predicate
val even = squares.filter(_ % 2 == 0)
// {4}
// Map each element to zero or more others
nums.flatMap(x => 1 to x) // => {1, 1, 2, 1, 2, 3}
Range
object
(sequence
of
numbers
1,
2,
,
x)
Basic
Actions
val nums = sc.parallelize(List(1, 2, 3))
// Retrieve RDD contents as a local collection
nums.collect() // => Array(1, 2, 3)
// Return first K elements
nums.take(2)
// => Array(1, 2)
// Count number of elements
nums.count()
// => 3
// Merge elements with an associative function
nums.reduce(_ + _) // => 6
// Write elements to a text file
nums.saveAsTextFile(hdfs://file.txt)
Working
with
Key-Value
Pairs
Sparks
distributed
reduce
transformations
operate
on
RDDs
of
key-value
pairs
Scala
pair
syntax:
val pair = (a, b)
// sugar for new Tuple2(a, b)
Accessing
pair
elements:
pair._1
pair._2
// => a
// => b
Some
Key-Value
Operations
val pets = sc.parallelize(
List((cat, 1), (dog, 1), (cat, 2)))
pets.reduceByKey(_ + _) // => {(cat, 3), (dog, 1)}
pets.groupByKey() // => {(cat, Seq(1, 2)), (dog, Seq(1)}
pets.sortByKey()
// => {(cat, 1), (cat, 2), (dog, 1)}
reduceByKey
also
automatically
implements
combiners
on
the
map
side
Example:
Word
Count
val lines = sc.textFile(hamlet.txt)
val counts = lines.flatMap(line => line.split( ))
.map(word => (word, 1))
.reduceByKey(_ + _)
to
be
or
to
be
or
(to,
1)
(be,
1)
(or,
1)
(be,
2)
(not,
1)
not
to
be
not
to
be
(not,
1)
(to,
1)
(be,
1)
(or,
1)
(to,
2)
Other
Key-Value
Operations
val visits = sc.parallelize(List(
(index.html, 1.2.3.4),
(about.html, 3.4.5.6),
(index.html, 1.3.3.1)))
val pageNames = sc.parallelize(List(
(index.html, Home), (about.html, About)))
visits.join(pageNames)
// (index.html, (1.2.3.4, Home))
// (index.html, (1.3.3.1, Home))
// (about.html, (3.4.5.6, About))
visits.cogroup(pageNames)
// (index.html, (Seq(1.2.3.4, 1.3.3.1), Seq(Home)))
// (about.html, (Seq(3.4.5.6), Seq(About)))
Controlling
The
Number
of
Reduce
Tasks
All
the
pair
RDD
operations
take
an
optional
second
parameter
for
number
of
tasks
words.reduceByKey(_ + _, 5)
words.groupByKey(5)
visits.join(pageViews, 5)
Can
also
set
spark.default.parallelism
property
Using
Local
Variables
Any
external
variables
you
use
in
a
closure
will
automatically
be
shipped
to
the
cluster:
val query = Console.readLine()
pages.filter(_.contains(query)).count()
Some
caveats:
Each
task
gets
a
new
copy
(updates
arent
sent
back)
Variable
must
be
Serializable
Dont
use
elds
of
an
outer
object
(ships
all
of
it!)
Closure
Mishap
Example
class MyCoolRddApp {
val param = 3.14
val log = new Log(...)
...
How
to
get
around
it:
class MyCoolRddApp {
...
def work(rdd: RDD[Int]) {
val param_ = param
rdd.map(x => x + param_)
.reduce(...)
}
def work(rdd: RDD[Int]) {
rdd.map(x => x + param)
.reduce(...)
}
}
NotSerializableException:
MyCoolRddApp
(or
Log)
References
only
local
variable
instead
of
this.param
Other
RDD
Operations
sample():
deterministically
sample
a
subset
union():
merge
two
RDDs
cartesian():
cross
product
pipe():
pass
through
external
program
See
Programming
Guide
for
more:
www.spark-project.org/documentation.html
Outline
Introduction
to
Scala
&
functional
programming
Spark
concepts
Tour
of
Spark
operations
Job
execution
Software
Components
Spark
runs
as
a
library
in
your
program
(1
instance
per
app)
Runs
tasks
locally
or
on
Mesos
dev
branch
also
supports
YARN,
standalone
deployment
Accesses
storage
systems
via
Hadoop
InputFormat
API
Can
use
HBase,
HDFS,
S3,
Your
application
SparkContext
Mesos
master
Slave
Slave
Spark
worker
Spark
worker
Local
threads
HDFS
or
other
storage
Task
Scheduler
Runs
general
task
graphs
Pipelines
functions
where
possible
B:
A:
F:
Stage
1
C:
groupBy
D:
E:
Cache-aware
data
reuse
&
locality
Partitioning-aware
to
avoid
shues
join
Stage
2
map
=
RDD
lter
Stage
3
=
cached
partition
Data
Storage
Cached
RDDs
normally
stored
as
Java
objects
Fastest
access
on
JVM,
but
can
be
larger
than
ideal
Can
also
store
in
serialized
format
Spark
0.5:
spark.cache.class=spark.SerializingCache
Default
serialization
library
is
Java
serialization
Very
slow
for
large
data!
Can
customize
through
spark.serializer
(see
later)
How
to
Get
Started
git clone git://github.com/mesos/spark
cd spark
sbt/sbt compile
./spark-shell
More
Information
Scala
resources:
www.artima.com/scalazine/articles/steps.html
(First
Steps
to
Scala)
www.artima.com/pins1ed
(free
book)
Spark
documentation:
www.spark-project.org/documentation.html