Mansoura University
Faculty of Computers and Information
Department of Information System
Second Semester
[AI3502] BIG DATA ANALYTICS
Grade: 3rd AI
Dr. Amira Rezk
Map-Reduce
Map-Reduce
Much of the course will be devoted to large scale computing for data mining
Challenges:
How to distribute computation?
Distributed/parallel programming is hard
Map-reduce addresses all of the above
Google’s computational/data manipulation model
Elegant way to work with big data
3
SINGLE NODE ARCHITECTURE
CPU
Machine Learning, Statistics
Memory
“Classical” Data Mining
Disk
4
MOTIVATION: GOOGLE EXAMPLE
20+ billion web pages x 20KB = 400+ TB
1 computer reads 30-35 MB/sec from disk
~4 months to read the web
~1,000 hard drives to store the web
Takes even more to do something useful with the data!
Today, a standard architecture for such problems is emerging:
Cluster of commodity Linux nodes
Commodity network (ethernet) to connect them
5
CLUSTER ARCHITECTURE
2-10 Gbps backbone between racks
1 Gbps between Switch
any pair of nodes
in a rack
Switch Switch
CPU CPU CPU CPU
Mem … Mem Mem … Mem
Disk Disk Disk Disk
Each rack contains 16-64 nodes
6
In 2011 it was guestimated that Google had 1M machines, http://bit.ly/Shh0RO
7
LARGE-SCALE COMPUTING
Large-scale computing for data mining problems on commodity hardware
Challenges:
How do you distribute computation?
How can we make it easy to write distributed programs?
Machines fail:
One server may stay up 3 years (1,000 days)
If you have 1,000 servers, expect to loose 1/day
People estimated Google had ~1M machines in 2011
1,000 machines fail every day! 8
IDEA AND SOLUTION
Issue: Copying data over a network takes time
Idea:
Bring computation close to the data
Store files multiple times for reliability
Map-reduce addresses these problems
Google’s computational/data manipulation model
Elegant way to work with big data
Storage Infrastructure – File system
Google: GFS. Hadoop: HDFS
Programming model
9
Map-Reduce
STORAGE INFRASTRUCTURE
Problem:
If nodes fail, how to store data persistently?
Answer:
Distributed File System:
Provides global file namespace
Google GFS; Hadoop HDFS;
Typical usage pattern
Huge files (100s of GB to TB)
Data is rarely updated in place
Reads and appends are common
10
DISTRIBUTED FILE SYSTEM
Chunk servers
File is split into contiguous chunks
Typically, each chunk is 16-64MB
Each chunk replicated (usually 2x or 3x)
Try to keep replicas in different racks
Master node
a.k.a. Name Node in Hadoop’s HDFS
Stores metadata about where files are stored
Might be replicated
Client library for file access
Talks to master to find chunk servers
11
Connects directly to chunk servers to access data
DISTRIBUTED FILE SYSTEM
Reliable distributed file system
Data kept in “chunks” spread across machines
Each chunk replicated on different machines
Seamless recovery from disk or machine failure
C0 C1 D0 C1 C2 C5 C0 C5
C5 C2 C5 C3 D0 D1 … D0 C2
Chunk server 1 Chunk server 2 Chunk server 3 Chunk server N
Bring computation directly to the data!
12
Chunk servers also serve as compute servers
PROGRAMMING MODEL: MapReduce
Warm-up task:
We have a huge text document
Count the number of times each
distinct word appears in the file
Sample application:
Analyze web server logs to find popular URLs
13
TASK: WORD COUNT
Case 1:
File too large for memory, but all <word, count> pairs fit in memory
Case 2:
Count occurrences of words:
words(doc.txt) | sort | uniq -c
where words takes a file and outputs the words in it, one per a line
Case 2 captures the essence of MapReduce
Great thing is that it is naturally parallelizable
14
MapReduce: OVERVIEW
Sequentially read a lot of data
Map:
Extract something you care about
Outline stays the same, Map and Reduce
Group by key: Sort and Shuffle
change to fit the problem
Reduce:
Aggregate, summarize, filter or transform
Write the result
15
MapReduce: THE MAP STEP
Input Intermediate
key-value pairs key-value pairs
k v
map
k v
k v
map
k v
k v
… …
k v k v
16
MapReduce: THE REDUCE STEP
Output
Intermediate Key-value groups key-value pairs
key-value pairs
reduce
k v k v v v k v
reduce
Group
k v k v v k v
by key
k v
… …
…
k v k v k v
17
MORE SPECIFICALLY
Input: a set of key-value pairs
Programmer specifies two methods:
Map(k, v) → <k’, v’>*
Takes a key-value pair and outputs a set of key-value pairs
E.g., key is the filename, value is a single line in the file
There is one Map call for every (k,v) pair
Reduce(k’, <v’>*) → <k’, v’’>*
All values v’ with same key k’ are reduced together
and processed in v’ order
There is one Reduce function call per unique key k’ 18
MapReduce: WORD COUNTING
Provided by the Provided by the
programmer programmer
MAP: Group by Reduce:
Read input and key: Collect all values
produces a set of Collect all pairs belonging to the
key-value pairs with same key key and output
data
The crew of the space
(The, 1) (crew, 1)
reads
shuttle Endeavor recently
(crew, 1) (crew, 1)
read the
returned to Earth as
ambassadors, harbingers of (crew, 2)
a new era of space (of, 1) (space, 1)
(space, 1)
sequential
exploration. Scientists at (the, 1) (the, 1)
NASA are saying that the (the, 3)
(space, 1) (the, 1)
Sequentially
recent assembly of the
Dextre bot is the first step in (shuttle, 1)
a long-term space-based (shuttle, 1) (the, 1)
(recently, 1)
man/mache partnership. (Endeavor, 1) (shuttle, 1)
'"The work we're doing now …
Only
-- the robotics we're doing - (recently, 1) (recently, 1)
- is what we're going to
need ……………………..
…. …
19
Big document (key, value) (key, value) (key, value)
WORD COUNT USING MapReduce
map(key, value):
// key: document name; value: text of the document
for each word w in value:
emit(w, 1)
reduce(key, values):
// key: a word; value: an iterator over counts
result = 0
for each count v in values:
result += v
emit(key, result)
20
MAP-REDUCE: ENVIRONMENT
Map-Reduce environment takes care of:
Partitioning the input data
Scheduling the program’s execution across a
set of machines
Performing the group by key step
Handling machine failures
Managing required inter-machine communication
21
MAP-REDUCE:
A DIAGRAM
Big document
MAP:
Read input and
produces a set of
key-value pairs
Group by key:
Collect all pairs with
same key
(Hash merge,
Shuffle, Sort,
Partition)
Reduce:
Collect all values
belonging to the key
and output
MAP-REDUCE:
IN PARALLEL
All phases are distributed with many tasks doing the work
MAP-REDUCE
Programmer specifies: Input 0 Input 1 Input 2
Map and Reduce and input files
Workflow:
Map 0 Map 1 Map 2
Read inputs as a set of key-value-pairs
Map transforms input kv-pairs into a new set of k'v'-pairs
Sorts & Shuffles the k'v'-pairs to output nodes Shuffle
All k’v’-pairs with a given k’ are sent to the same reduce
Reduce processes all k'v'-pairs grouped by key into new k''v''- Reduce 0 Reduce 1
pairs
Write the resulting pairs to files
Out 0 Out 1
All phases are distributed with many tasks doing the work
DATA FLOW
Input and final output are stored on a distributed file system (FS):
Scheduler tries to schedule map tasks “close” to physical storage location of input data
Intermediate results are stored on local FS of Map and Reduce workers
Output is often input to another MapReduce task
25
COORDINATION: MASTER
Master node takes care of coordination:
Task status: (idle, in-progress, completed)
Idle tasks get scheduled as workers become available
When a map task completes, it sends the master the location and sizes of its R intermediate files,
one for each reducer
Master pushes this info to reducers
Master pings workers periodically to detect failures
26
DEALING WITH FAILURES
Map worker failure
Map tasks completed or in-progress at
worker are reset to idle
Reduce workers are notified when task is rescheduled on another worker
Reduce worker failure
Only in-progress tasks are reset to idle
Reduce task is restarted
Master failure
MapReduce task is aborted, and client is notified
27
HOW MANY MAP AND REDUCE JOBS?
M map tasks, R reduce tasks
Rule of a thumb:
Make M much larger than the number of nodes in the cluster
One DFS chunk per map is common
Improves dynamic load balancing and speeds up recovery from worker failures
Usually, R is smaller than M
Because output is spread across R files
28
TASK GRANULARITY & PIPELINING
Fine granularity tasks: map tasks >> machines
Minimizes time for fault recovery
Can do pipeline shuffling with map execution
Better dynamic load balancing
29
REFINEMENTS: BACKUP TASKS
Problem
Slow workers significantly lengthen the job completion time:
Other jobs on the machine
Bad disks
Weird things
Solution
Near end of phase, spawn backup copies of tasks
Whichever one finishes first “wins”
Effect
Dramatically shortens job completion time
30
REFINEMENT: COMBINERS
Often a Map task will produce many pairs
of the form (k,v1), (k,v2), … for the same
key k
E.g., popular words in the word count example
Can save network time by
pre-aggregating values in the
mapper:
combine(k, list(v1)) → v2
Combiner is usually same as the reduce
function
Works only if reduce function is 31
commutative and associative
REFINEMENT: COMBINERS
Back to our word counting example:
Combiner combines the values of all keys of a single mapper (single machine):
32
Much less data needs to be copied and shuffled!
REFINEMENT: PARTITION FUNCTION
Want to control how keys get partitioned
Inputs to map tasks are created by contiguous splits of input file
Reduce needs to ensure that records with the same intermediate key end up at the same worker
System uses a default partition function:
hash(key) mod R
Sometimes useful to override the hash function:
E.g., hash(hostname(URL)) mod R ensures URLs from a host end up in
the same output file 33
PROBLEMS SUITED FOR
MAP-REDUCE
EXAMPLE: HOST SIZE
Suppose we have a large web corpus
Look at the metadata file
Lines of the form: (URL, size, date, …)
For each host, find the total number of bytes
That is, the sum of the page sizes for all URLs from that particular host
Other examples:
Link analysis and graph processing
Machine Learning algorithms 35
EXAMPLE: LANGUAGE MODEL
Statistical machine translation:
Need to count number of times every 5-word sequence occurs in a large corpus of documents
Very easy with MapReduce:
Map:
Extract (5-word sequence, count) from document
Reduce:
Combine the counts
36
EXAMPLE: JOIN BY MAP-REDUCE
Compute the natural join R(A,B) ⋈ S(B,C)
R and S are each stored in files
Tuples are pairs (a,b) or (b,c)
A B B C A C
a1 b1 b2 c1 a3 c1
a2
a3
b1
b2
⋈ b2 c2 = a3 c2
b3 c3 a4 c3
a4 b3 37
S
R
MAP-REDUCE JOIN
Use a hash function h from B-values to 1...k
A Map process turns:
Each input tuple R(a,b) into key-value pair (b,(a,R))
Each input tuple S(b,c) into (b,(c,S))
Map processes send each key-value pair with key b to Reduce process h(b)
Hadoop does this automatically; just tell it what k is.
Each Reduce process matches all the pairs (b,(a,R)) with all (b,(c,S)) and outputs
(a,b,c). 38
COST MEASURES FOR ALGORITHMS
In MapReduce we quantify the cost of an algorithm using
1. Communication cost = total I/O of all processes
2. Elapsed communication cost = max of I/O along any path
3. (Elapsed) computation cost analogous, but count only running time of processes
Note that here the big-O notation is not the most useful
(adding more machines is always an option) 39
EXAMPLE: COST MEASURES
For a map-reduce algorithm:
Communication cost = input file size + 2 (sum of the sizes of all files passed
from Map processes to Reduce processes) + the sum of the output sizes of the
Reduce processes.
Elapsed communication cost is the sum of the largest input + output for any
map process, plus the same for any reduce process
40
WHAT COST MEASURES MEAN
Either the I/O (communication) or processing (computation) cost dominates
Ignore one or the other
Total cost tells what you pay in rent from
your friendly neighborhood cloud
Elapsed cost is wall-clock time using parallelism
41
COST OF MAP-REDUCE JOIN
Total communication cost
= O(|R|+|S|+|R ⋈ S|)
Elapsed communication cost = O(s)
We’re going to pick k and the number of Map processes so that the I/O limit s is respected
We put a limit s on the amount of input or output that any one process can have. s could be:
What fits in main memory
What fits on local disk
With proper indexes, computation cost is linear in the input + output size
So computation cost is like comm. cost 42
QUESTIONS