Map Reduce Architecture
Adapted from Lectures by Anand Rajaraman (Stanford Univ.) and Dan Weld (Univ. of Washington)
Prasad
L06MapReduce
Single-node architecture
CPU
Machine Learning, Statistics
Memory Classical Data Mining Disk
Prasad
L06MapReduce
Commodity Clusters
Web data sets can be very large
Tens to hundreds of terabytes
Cannot mine on a single server (why?) Standard architecture emerging:
Cluster of commodity Linux nodes Gigabit ethernet interconnect
How to organize computations on this architecture?
Mask issues such as hardware failure
Prasad L06MapReduce 3
Cluster Architecture
2-10 Gbps backbone between racks 1 Gbps between any pair of nodes in a rack Switch Switch
Switch
CPU
Mem Disk
CPU
CPU
Mem Disk
CPU
Mem Disk
Mem Disk
Each rack contains 16-64 nodes
Prasad L06MapReduce 4
Stable storage
First order problem: if nodes can fail, how can we store data persistently? Answer: Distributed File System
Provides global file namespace Google GFS; Hadoop HDFS; Kosmix KFS
Typical usage pattern
Huge files (100s of GB to TB) Data is rarely updated in place Reads and appends are common
Prasad L06MapReduce 5
Distributed File System
Chunk Servers
File is split into contiguous chunks Typically each chunk is 16-64MB Each chunk replicated (usually 2x or 3x) Try to keep replicas in different racks
Master node
a.k.a. Name Nodes in HDFS Stores metadata Might be replicated
Client library for file access
Talks to master to find chunk servers Connects directly to chunkservers to access data
Prasad L06MapReduce 6
Motivation for MapReduce (why)
Large-Scale Data Processing
Want to use 1000s of CPUs
But dont want hassle of managing things
MapReduce Architecture provides
Automatic parallelization & distribution Fault tolerance I/O scheduling Monitoring & status updates
L06MapReduce 7
Prasad
What is Map/Reduce
Map/Reduce
Programming model from LISP (and other functional languages)
Many problems can be phrased this way Easy to distribute across nodes Nice retry/failure semantics
Prasad L06MapReduce 8
Map in LISP (Scheme)
(map f list [list2 list3 ])
(map square (1 2 3 4))
(1 4 9 16)
Prasad
L06MapReduce
Reduce in LISP (Scheme)
(reduce f id list) (reduce + 0 (1 4 9 16)) (+ 16 (+ 9 (+ 4 (+ 1 0)) ) ) 30
(reduce + 0 (map square (map l1 l2))))
Prasad L06MapReduce 10
Warm up: Word Count
We have a large file of words, one word to a line Count the number of times each distinct word appears in the file Sample application: analyze web server logs to find popular URLs
Prasad
L06MapReduce
11
Word Count (2)
Case 1: Entire file fits in memory Case 2: File too large for mem, but all <word, count> pairs fit in mem Case 3: File on disk, too many distinct words to fit in memory sort datafile | uniq c
Prasad
L06MapReduce
12
Word Count (3)
To make it slightly harder, suppose we have a large corpus of documents Count the number of times each distinct word occurs in the corpus
words(docs/*) | sort | uniq -c where words takes a file and outputs the words in it, one to a line
The above captures the essence of MapReduce
Great thing is it is naturally parallelizable
Prasad L06MapReduce 13
MapReduce
Input: a set of key/value pairs User supplies two functions:
map(k,v) list(k1,v1) reduce(k1, list(v1)) v2
(k1,v1) is an intermediate key/value pair Output is the set of (k1,v2) pairs
Prasad
L06MapReduce
14
Word Count using MapReduce
map(key, value): // key: document name; value: text of document for each word w in value: emit(w, 1)
reduce(key, values): // key: a word; values: an iterator over counts result = 0 for each count v in values: result += v emit(key,result)
Prasad L06MapReduce 15
Count, Illustrated
map(key=url, val=contents):
For each word w in contents, emit (w, 1)
reduce(key=word, values=uniq_counts):
Sum all 1s in values list Emit result (word, sum)
see bob run see spot throw
see bob run see spot throw
L06MapReduce
1 1 1 1 1 1
bob run see spot throw
1 1 2 1 1
Prasad
16
Model is Widely Applicable
MapReduce Programs In Google Source Tree
Example uses:
distributed grep term-vector / host document clustering
Prasad
distributed sort web access log stats machine learning
L06MapReduce
web link-graph reversal inverted index construction statistical machine translation
17
...
...
...
Implementation Overview Typical cluster:
100s/1000s of 2-CPU x86 machines, 2-4 GB of memory Limited bisection bandwidth Storage is on local IDE disks GFS: distributed file system manages data (SOSP'03) Job scheduling system: jobs made up of tasks, scheduler assigns tasks to machines Implementation is a C++ library linked into user programs
Prasad L06MapReduce 18
Distributed Execution Overview
User Program fork assign map Input Data Split 0 read Split 1 Split 2 Worker Worker Worker
Prasad
fork
Master
fork
assign reduce Worker write Output File 0
local write
Worker remote read, sort
Output File 1
L06MapReduce
19
Data flow
Input, final output are stored on a distributed file system
Scheduler tries to schedule map tasks close to physical storage location of input data
Intermediate results are stored on local FS of map and reduce workers Output is often input to another map reduce task
Prasad L06MapReduce 20
Coordination
Master data structures
Task status: (idle, in-progress, completed) Idle tasks get scheduled as workers become available When a map task completes, it sends the master the location and sizes of its R intermediate files, one for each reducer Master pushes this info to reducers
Master pings workers periodically to detect failures
Prasad L06MapReduce 21
Failures
Map worker failure
Map tasks completed or in-progress at worker are reset to idle Reduce workers are notified when task is rescheduled on another worker
Reduce worker failure
Only in-progress tasks are reset to idle
Master failure
MapReduce task is aborted and client is notified
Prasad L06MapReduce 22
Execution
Prasad
L06MapReduce
23
Parallel Execution
Prasad
L06MapReduce
24
How many Map and Reduce jobs?
M map tasks, R reduce tasks Rule of thumb:
Make M and R much larger than the number of nodes in cluster One DFS chunk per map is common Improves dynamic load balancing and speeds recovery from worker failure
Usually R is smaller than M, because output is spread across R files
Prasad L06MapReduce 25
Combiners
Often a map task will produce many pairs of the form (k,v1), (k,v2), for the same key k
E.g., popular words in Word Count
Can save network time by preaggregating at mapper
combine(k1, list(v1)) v2 Usually same as reduce function
Works only if reduce function is commutative and associative
Prasad L06MapReduce 26
Partition Function
Inputs to map tasks are created by contiguous splits of input file For reduce, we need to ensure that records with the same intermediate key end up at the same worker System uses a default partition function e.g., hash(key) mod R Sometimes useful to override
E.g., hash(hostname(URL)) mod R ensures URLs from a host end up in the same output file
Prasad L06MapReduce 27
Execution Summary
How is this distributed?
1. Partition input key/value pairs into chunks, run map() tasks in parallel 2. After all map()s are complete, consolidate all emitted values for each unique emitted key 3. Now partition space of output map keys, and run reduce() in parallel
If map() or reduce() fails, reexecute!
Prasad L06MapReduce 28
Exercise 1: Host size
Suppose we have a large web corpus Lets look at the metadata file
Lines of the form (URL, size, date, )
For each host, find the total number of bytes
i.e., the sum of the page sizes for all URLs from that host
Prasad
L06MapReduce
29
Exercise 2: Distributed Grep
Find all occurrences of the given pattern in a very large set of files
Prasad
L06MapReduce
30
Grep
Input consists of (url+offset, single line) map(key=url+offset, val=line):
If contents matches regexp, emit (line, 1)
reduce(key=line, values=uniq_counts):
Dont do anything; just emit line
Prasad
L06MapReduce
31
Exercise 3: Graph reversal
Given a directed graph as an adjacency list: src1: dest11, dest12, src2: dest21, dest22, Construct the graph in which all the links are reversed
Prasad
L06MapReduce
32
Reverse Web-Link Graph
Map
For each URL linking to target, Output <target, source> pairs
Reduce
Concatenate list of all source URLs Outputs: <target, list (source)> pairs
Prasad
L06MapReduce
33
Exercise 4: Frequent Pairs
Given a large set of market baskets, find all frequent pairs
Remember definitions from Association Rules lectures
Prasad
L06MapReduce
34
Hadoop
An open-source implementation of Map Reduce in Java
Uses HDFS for stable storage
Download from: http://lucene.apache.org/hadoop/
Prasad
L06MapReduce
35
Reading
Jeffrey Dean and Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters http://labs.google.com/papers/mapreduce.html Sanjay Ghemawat, Howard Gobioff, and ShunTak Leung, The Google File System http://labs.google.com/papers/gfs.html
Prasad
L06MapReduce
36
Conclusions
MapReduce proven to be useful abstraction Greatly simplifies large-scale computations Fun to use:
focus on problem,
let library deal w/ messy details
Prasad L06MapReduce 37