What is Spark?
Fast, expressive cluster computing system compatible with Apache Hadoop
- Works with any Hadoop-supported storage system (HDFS, S3, Avro, …)
Improves efficiency through:
- In-memory computing primitives
Up to 100× faster
- General computation graphs
Improves usability through:
- Rich APIs in Java, Scala, Python
Often 2-10× less code
- Interactive shell
How to Run It
Local multicore: just a library in your program
EC2: scripts for launching a Spark cluster
Private cluster: Mesos, YARN, Standalone Mode
Languages
APIs in Java, Scala and Python
Interactive shells in Scala and Python
Outline
Introduction to Spark
Tour of Spark operations
Job execution
Standalone programs
Deployment options
Key Idea
Work with distributed collections as you would with local ones
Concept: resilient distributed datasets (RDDs)
- Immutable collections of objects spread across a cluster
- Built through parallel transformations (map, filter, etc)
- Automatically rebuilt on failure
- Controllable persistence (e.g. caching in RAM)
Operations
Transformations (e.g. map, filter, groupBy, join)
- Lazy operations to build RDDs from other RDDs
Actions (e.g. count, collect, save)
- Return a result or write it to storage
Example: Mining Console Logs
Load error messages from a log into memory, then interactively search for patterns
Cache 1
Base RDD
Transformed RDD Worker
lines = [Link](“hdfs://...”) tasks
errors = [Link](lambda s: [Link](“ERROR”)) Block 1
Driver results
messages = [Link](lambda s: [Link](‘\t’)[2])
[Link]()
Action
[Link](lambda s: “foo” in s).count() Cache 2
Worker
[Link](lambda s: “bar” in s).count()
Cache 3
. . . Block 2
Worker
Result:
Result:
full-text
scaled
search
to 1 TB
of Wikipedia
data in 5-7
in sec
<1 sec Block 3
(vs
(vs170
20 sec
secfor
foron-disk
on-diskdata)
data)
RDD Fault Tolerance
RDDs track the transformations used to build them (their lineage) to recompute lost data
E.g:
messages = textFile(...).filter(lambda s: [Link](“ERROR”))
.map(lambda s: [Link](‘\t’)[2])
HadoopRDD FilteredRDD MappedRDD
path = hdfs://… func = contains(...) func = split(…)
Fault Recovery Test
140
119
120
Iteratrion time (s)
100 Failure happens
81
80
57 56 58 58 57 59 57 59
60
40
20
0
1 2 3 4 5 6 7 8 9 10
Iteration
Behavior with Less RAM
Ite r a tio n tim e (s )
80
70 69
60 58
50
41
40
30 30
20
12
10
0
Cache disabled 25% 50% 75% Fully cached
% of working set in cache
Spark in Java and Scala
Java API: Scala API:
JavaRDD<String> lines = [Link](…); val lines = [Link](…)
errors = [Link]( errors = [Link](s => [Link](“ERROR”))
new Function<String, Boolean>() { // can also write filter(_.contains(“ERROR”))
public Boolean call(String s) {
return [Link](“ERROR”); [Link]
}
});
[Link]()
Which Language Should I Use?
Standalone programs can be written in any, but console is only Python & Scala
Python developers: can stay with Python for both
Java developers: consider using Scala for console (to learn the API)
Performance: Java / Scala will be faster (statically typed), but Python can do well for
numerical work with NumPy
Scala Cheat Sheet
Variables: Collections and closures:
var x: Int = 7 val nums = Array(1, 2, 3)
var x = 7 // type inferred
[Link]((x: Int) => x + 2) // => Array(3, 4, 5)
val y = “hi” // read-only
[Link](x => x + 2) // => same
[Link](_ + 2) // => same
Functions: [Link]((x, y) => x + y) // => 6
[Link](_ + _) // => 6
def square(x: Int): Int = x*x
def square(x: Int): Int = {
x*x // last line returned Java interop:
} import [Link]
More details:
[Link]
new URL(“[Link]
Outline
Introduction to Spark
Tour of Spark operations
Job execution
Standalone programs
Deployment options
Learning Spark
Easiest way: Spark interpreter (spark-shell or pyspark)
- Special Scala and Python consoles for cluster use
Runs in local mode on 1 thread by default, but can control with MASTER environment var:
MASTER=local ./spark-shell # local, 1 thread
MASTER=local[2] ./spark-shell # local, 2 threads
MASTER=spark://host:port ./spark-shell # Spark standalone cluster
First Stop: SparkContext
Main entry point to Spark functionality
Created for you in Spark shells as variable sc
In standalone programs, you’d make your own (see later for details)
Creating RDDs
# Turn a local collection into an RDD
[Link]([1, 2, 3])
# Load text file from local FS, HDFS, or S3
[Link](“[Link]”)
[Link](“directory/*.txt”)
[Link](“hdfs://namenode:9000/path/file”)
# Use any existing Hadoop InputFormat
[Link](keyClass, valClass, inputFmt, conf)
Basic Transformations
nums = [Link]([1, 2, 3])
# Pass each element through a function
squares = [Link](lambda x: x*x) # => {1, 4, 9}
# Keep elements passing a predicate
even = [Link](lambda x: x % 2 == 0) # => {4}
# Map each element to zero or more others
[Link](lambda x: range(0, x)) # => {0, 0, 1, 0, 1, 2}
Range object (sequence of
numbers 0, 1, …, x-1)
Basic Actions
nums = [Link]([1, 2, 3])
# Retrieve RDD contents as a local collection
[Link]() # => [1, 2, 3]
# Return first K elements
[Link](2) # => [1, 2]
# Count number of elements
[Link]() # => 3
# Merge elements with an associative function
[Link](lambda x, y: x + y) # => 6
# Write elements to a text file
[Link](“hdfs://[Link]”)
Working with Key-Value Pairs
Spark’s “distributed reduce” transformations act on RDDs of key-value pairs
Python: pair = (a, b)
pair[0] # => a
pair[1] # => b
Scala: val pair = (a, b)
pair._1 // => a
pair._2 // => b
Java: Tuple2 pair = new Tuple2(a, b); // class scala.Tuple2
pair._1 // => a
pair._2 // => b
Some Key-Value Operations
pets = [Link]([(“cat”, 1), (“dog”, 1), (“cat”, 2)])
[Link](lambda x, y: x + y)
# => {(cat, 3), (dog, 1)}
[Link]()
# => {(cat, Seq(1, 2)), (dog, Seq(1)}
[Link]()
# => {(cat, 1), (cat, 2), (dog, 1)}
reduceByKey also automatically implements combiners on the map side
Example: Word Count
lines = [Link](“[Link]”)
counts = [Link](lambda line: [Link](“ ”)) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y)
“to” (to, 1)
(be, 2)
“to be or” “be” (be, 1)
(not, 1)
“or” (or, 1)
“not” (not, 1)
(or, 1)
“not to be” “to” (to, 1)
(to, 2)
“be” (be, 1)
Multiple Datasets
visits = [Link]([(“[Link]”, “[Link]”),
(“[Link]”, “[Link]”),
(“[Link]”, “[Link]”)])
pageNames = [Link]([(“[Link]”, “Home”), (“[Link]”, “About”)])
[Link](pageNames)
# (“[Link]”, (“[Link]”, “Home”))
# (“[Link]”, (“[Link]”, “Home”))
# (“[Link]”, (“[Link]”, “About”))
[Link](pageNames)
# (“[Link]”, (Seq(“[Link]”, “[Link]”), Seq(“Home”)))
# (“[Link]”, (Seq(“[Link]”), Seq(“About”)))
Controlling the Level of Parallelism
All the pair RDD operations take an optional second parameter for number of tasks
[Link](lambda x, y: x + y, 5)
[Link](5)
[Link](pageViews, 5)
Using Local Variables
External variables you use in a closure will automatically be shipped to the cluster:
query = raw_input(“Enter a query:”)
[Link](lambda x: [Link](query)).count()
Some caveats:
- Each task gets a new copy (updates aren’t sent back)
- Variable must be Serializable (Java/Scala) or Pickle-able (Python)
- Don’t use fields of an outer object (ships all of it!)
Closure Mishap Example
class MyCoolRddApp { How to get around it:
val param = 3.14
val log = new Log(...) class MyCoolRddApp {
... ...
def work(rdd: RDD[Int]) { def work(rdd: RDD[Int]) {
[Link](x => x + param) val param_ = param
.reduce(...) [Link](x => x + param_)
} .reduce(...)
} }
NotSerializableException:
MyCoolRddApp (or Log) } References only local variable
instead of [Link]
More Details
Spark supports lots of other operations!
Full programming guide: [Link]/documentation
Outline
Introduction to Spark
Tour of Spark operations
Job execution
Standalone programs
Deployment options
Software Components
Spark runs as a library in your program Your application
(one instance per app)
SparkContext
Runs tasks locally or on a cluster
- Standalone deploy cluster, Mesos or YARN Cluster Local
Accesses storage via Hadoop InputFormat API manager threads
- Can use HBase, HDFS, S3, …
Worker Worker
Spark Spark
executor executor
HDFS or other storage
Task Scheduler
Supports general task graphs A: B:
Pipelines functions where possible
Cache-aware data reuse & locality F:
Stage 1 groupBy
Partitioning-aware to avoid shuffles
C: D: E:
join
Stage 2 map filter Stage 3
= RDD = cached partition
Hadoop Compatibility
Spark can read/write to any storage system / format that has a plugin for Hadoop!
- Examples: HDFS, S3, HBase, Cassandra, Avro, SequenceFile
- Reuses Hadoop’s InputFormat and OutputFormat APIs
APIs like [Link] support filesystems, while [Link]
allows passing any Hadoop JobConf to configure an input source
Outline
Introduction to Spark
Tour of Spark operations
Job execution
Standalone programs
Deployment options
Build Spark
Requires Java 6+, Scala 2.9.2
git clone git://[Link]/mesos/spark
cd spark
sbt/sbt package
# Optional: publish to local Maven cache
sbt/sbt publish-local
Add Spark to Your Project
Scala and Java: add a Maven dependency on
groupId: [Link]-project
artifactId:spark-core_2.9.1
version: 0.7.0-SNAPSHOT
Python: run program with our pyspark script
Create a SparkContext
import [Link]
Scala
import [Link]._
val sc = new SparkContext(“masterUrl”, “name”, “sparkHome”, Seq(“[Link]”))
Cluster URL, or
import [Link]; App Spark install List of JARs with
Java
local / local[N] name path on cluster app code (to ship)
JavaSparkContext sc = new JavaSparkContext(
“masterUrl”, “name”, “sparkHome”, new String[] {“[Link]”}));
Python
from pyspark import SparkContext
sc = SparkContext(“masterUrl”, “name”, “sparkHome”, [“[Link]”]))
Complete App: Scala
import [Link]
import [Link]._
object WordCount {
def main(args: Array[String]) {
val sc = new SparkContext(“local”, “WordCount”, args(0), Seq(args(1)))
val lines = [Link](args(2))
[Link](_.split(“ ”))
.map(word => (word, 1))
.reduceByKey(_ + _)
.saveAsTextFile(args(3))
}
}
Complete App: Python
import sys
from pyspark import SparkContext
if __name__ == "__main__":
sc = SparkContext( “local”, “WordCount”, [Link][0], None)
lines = [Link]([Link][1])
[Link](lambda s: [Link](“ ”)) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y) \
.saveAsTextFile([Link][2])
Example: PageRank
Why PageRank?
Good example of a more complex algorithm
- Multiple stages of map & reduce
Benefits from Spark’s in-memory caching
- Multiple iterations over the same data
Basic Idea
Give pages ranks (scores) based on links to them
- Links from many pages high rank
- Link from a high-rank page high rank
Image: [Link]/wiki/File:[Link]
Algorithm
1. Start each page at a rank of 1
2. On each iteration, have page p contribute
rankp / |neighborsp| to its neighbors
3. Set each page’s rank to 0.15 + 0.85 × contribs
1.0
1.0 1.0
1.0
Algorithm
1. Start each page at a rank of 1
2. On each iteration, have page p contribute
rankp / |neighborsp| to its neighbors
3. Set each page’s rank to 0.15 + 0.85 × contribs
1.0
1 0.5
1
1.0 0.5 1.0
0.5
0.5
1.0
Algorithm
1. Start each page at a rank of 1
2. On each iteration, have page p contribute
rankp / |neighborsp| to its neighbors
3. Set each page’s rank to 0.15 + 0.85 × contribs
1.85
0.58 1.0
0.58
Algorithm
1. Start each page at a rank of 1
2. On each iteration, have page p contribute
rankp / |neighborsp| to its neighbors
3. Set each page’s rank to 0.15 + 0.85 × contribs
1.85
0.58 0.5
1.85
0.58 0.29 1.0
0.5
0.29
0.58
Algorithm
1. Start each page at a rank of 1
2. On each iteration, have page p contribute
rankp / |neighborsp| to its neighbors
3. Set each page’s rank to 0.15 + 0.85 × contribs
1.31
0.39 1.72
...
0.58
Algorithm
1. Start each page at a rank of 1
2. On each iteration, have page p contribute
rankp / |neighborsp| to its neighbors
3. Set each page’s rank to 0.15 + 0.85 × contribs
Final state: 1.44
0.46 1.37
0.73
Scala Implementation
val links = // RDD of (url, neighbors) pairs
var ranks = // RDD of (url, rank) pairs
for (i <- 1 to ITERATIONS) {
val contribs = [Link](ranks).flatMap {
case (url, (links, rank)) =>
[Link](dest => (dest, rank/[Link]))
}
ranks = [Link](_ + _)
.mapValues(0.15 + 0.85 * _)
}
[Link](...)
PageRank Performance
Ite ra tio n tim e (s )
171
200
150 Hadoop
Spark
100
80
50
23
14
0
30 60
Number of machines
Other Iterative Algorithms
155 Hadoop
K-Means Clustering
4.1 Spark
0 30 60 90 120 150 180
110
Logistic Regression
0.96
0 25 50 75 100 125
Time per Iteration (s)
Outline
Introduction to Spark
Tour of Spark operations
Job execution
Standalone programs
Deployment options
Local Mode
Just pass local or local[k] as master URL
Still serializes tasks to catch marshaling errors
Debug using local debuggers
- For Java and Scala, just run your main program in a debugger
- For Python, use an attachable debugger (e.g. PyDev, winpdb)
Great for unit testing
Private Cluster
Can run with one of:
- Standalone deploy mode (similar to Hadoop cluster scripts)
- Apache Mesos: [Link]/docs/latest/[Link]
- Hadoop YARN: [Link]/docs/0.6.0/[Link]
Basically requires configuring a list of workers, running launch scripts, and passing a
special cluster URL to SparkContext
Amazon EC2
Easiest way to launch a Spark cluster
git clone git://[Link]/mesos/[Link]
cd spark/ec2
./spark-ec2 -k keypair –i id_rsa.pem –s slaves \
[launch|stop|start|destroy] clusterName
Details: [Link]/docs/latest/[Link]
New: run Spark on Elastic MapReduce – [Link]/spark-emr
Viewing Logs
Click through the web UI at master:8080
Or, look at stdout and stdout files in the Spark or Mesos “work” directory for your app:
work/<ApplicationID>/<ExecutorID>/stdout
Application ID (Framework ID in Mesos) is printed when Spark connects
Community
Join the Spark Users mailing list:
[Link]/group/spark-users
Come to the Bay Area meetup:
[Link]/spark-users
Conclusion
Spark offers a rich API to make data analytics fast: both fast to write and fast to run
Achieves 100x speedups in real applications
Growing community with 14 companies contributing
Details, tutorials, videos: [Link]