Cloud-
Enabling
Technologies:
Map Reduce
Slides are modified from several
sources. Please check reference page at
the back
Distributed System
⚫ Any system should deal with two tasks:
– Storage -> GFS
– Computation
⚫ How do we deal with the scalability problem?
⚫ How do we use multiple computers to do what
used to do on one?
How it all got started:
Google MapReduce (2004)
23320 citations and counting …
3
Key-Value Pairs
(key, value) pairs are used as the format for both data and intermediate results
4
MapReduce
• Mappers and Reducers are users’ code (provide as functions)
• Just need to obey the Key-Value pairs interface
• Mappers:
• Consume <key, value> pairs
• Produce <key, value> pairs
• Reducers:
• Consume <key, <list of values>>
• Produce <key, value>
• Shuffling and Sorting:
• Hidden phase between mappers and reducers
• Groups all <key, value> pairs with the same key from all mappers, and passes
them to a certain reducer in the form of <key, <list of values>>
5
A Brief View of MapReduce
Processing Granularity
• Mappers
• Run on a record-by-record basis
• Your code processes that record and may produce
• Zero, one, or many outputs
• Reducers
• Run on a group-of-records (having same key)
• Your code processes that group and may produce
• Zero, one, or many outputs
9
MapReduce: The Map Step
Input Intermediate
key-value pairs key-value pairs
k v
map
v
k k v
map
v
k k v
… …
v k v
k
MapReduce: The Reduce Step
Intermediate Key-value groups
key-value pairs
reduce
k v k v v v k v
reduce
k v k v v k v
group
k v
… … …
k v k v k v
Warm up: Word Count
⚫ We have a large file of words, one word to a line
⚫ Count the number of times each distinct word
appears in the file
⚫ Sample application: analyze web server logs to
find popular URLs
Word Count
⚫ Case 1: Entire file fits in memory
⚫ Load the file into memory and do the counting.
⚫ Case 2: File too large for mem, but all <word,
count> pairs fit in mem
⚫ Create a list of <word, count> pair in the
memory, and scan the file on disk in a
streaming fashion
⚫ Case 3: File on disk, too many distinct words to fit
in memory
⚫ Sort the file on disk (costly) and then scan the file
and count
– sort datafile | uniq –c
Word Count
⚫ To make it slightly harder, suppose we have a
large corpus of documents
⚫ Count the number of times each distinct word
occurs in the corpus
– words(docs/*) | sort | uniq -c
– where words takes a file and outputs the words in it,
one to a line
⚫ The above captures the essence of MapReduce
– Great thing is it is naturally parallelizable
Word Count
• Job: Count the occurrences of each word in a data set
Map Reduce
Tasks Tasks
15
Word Count Example
Provided by the Provided by the
programmer programmer
MAP: Reduce:
Read input and Group by key: Collect all
produces a set Collect all pairs values
The crew of the space
of key-value with same key belonging to the
reads
shuttle Endeavor at
recently pairs key and output a
d
returned to Earth as e
ambassadors, harbingers th
sequential
of a new eraof space
exploration. Scientists at ea
NASA are saying that (The, 1) (crew, 1) d
ry
the l
(crew, 1) (crew, 1)
recent assembly of the (crew, 2)
(of, 1) (space, 1) al
Dextre bot is the first
(space, 1) ti
step in a long-term (the, 1) (the, 1) e
Only
space- (the, 3) n
(space, 1) (the, 1) u
based man/mache
(shuttle, 1) (the, 1) (shuttle, 1)
partnership. '"The work (recently, 1) S
we're doing now -- the (Endeavor, 1) (shuttle, 1)
robotics we're doing -- is (recently, 1) (recently, 1) … e
what we're going to
q
…. …
need
……………………..
Big document (key, value) (key, value) (key, value)
Word Count Example
Key range the node
is responsible for
(apple, 3)
(apple, {1, 1, 1}) (an, 2)
Mapper Reducer
(1-2) (an, {1, 1}) (A-G) (because, 1)
(1, the apple)
(because, 1) (green, 1)
(2, is an apple) (green, 1)
(3, not an orange) Mapper (is, {1, 1}) Reducer (is, 2)
(3-4)
(not, {1, 1})
(H-N) (not, 2)
(4, because the)
(5, orange) (orange, {1, 1, 1}) (orange, 3)
Mapper Reducer
(6, unlike the apple) (5-6) (the, {1, 1, 1}) (O-U)
(the, 3)
(7, is orange) (unlike, 1) (unlike, 1)
(8, not green)
Mapper Reducer
(7-8) (V-Z)
1 Each mapper 2 The mappers 3 Each KV-pair output 4 The reducers 5 The reducers
receives some process the by the mapper is sort their input process their
of the KV- KV-pairs sent to the reducer by key input one
pairs one by one that is responsible and group it group
as input for it at a time
How it looks like in Java
Provide implementation for
Hadoop’s Mapper abstract class
Map function
Provide implementation for
Hadoop’s Reducer abstract class
Reduce function
Job configuration
Example 2: Inverted Index
• Search engines use inverted index to find webpages containing a
given keyword quickly
• MapReduce program for creating an inverted index:
• Map
• For each (url, doc) pair
• Emit (keyword, url) for each keyword in doc
• Reduce
• For each keyword, output (keyword, list of urls)
20
Exercise1: Find the maximum temperature
each year
• Given a large dataset of weather station readings, write down the
Map and Reduce steps necessary to find the maximum temperature
recorded for each year for all weather stations.
• The dataset contains lines with the following format: `stationID, year,
month, day, max temperature (maxTemp), min temperature
(minTemp)‘
21
Exercise2: How to process this SQL query in
MapReduce?
SELECT AuthorName FROM Authors, Books WHERE
Authors.AuthorID=Books.AuthorID AND Books.Date>1980
22
Answer Q1:
• (Map Steps) For each record,
• Read each line and parse it
• Emit (year, maxTemp), where year is the key and max temperature (maxTemp)
is the value.
• (Reduce Steps) For each key,
• Collect all values
• Keep only the max value
23
Answer Q2:
• For each record in the ‘Authors’ table:
• Map: Emit (AuthorID, AuthorName)
• For each record in the ‘Books’ table:
• Map: Emit (AuthorID, Date)
• Reduce:
• For each AuthorID, if Date>1980, output AuthorName
24
Answer Q2 (Optimized)
• For each record in the ‘Authors’ table:
• Map: Emit (AuthorID, AuthorName)
• For each record in the ‘Books’ table:
• Map: If Date>1980, emit (AuthorID, Date)
• Reduce:
• For each AuthorID, output AuthorName
25
Hadoop
• Hadoop is open-source implementation for Google’s MapReduce and
GFS
• Clean and simple programming abstraction
• Users only provide two functions “map” and “reduce”
• Automatic parallelization & distribution
• Hidden from the end-user
• Fault tolerance and automatic recovery
• Nodes/tasks will fail and will recover automatically
26
Brief history
• Initially developed by Doug Cutting as a filesystem for Apache Nutch, a
web search engine
• early name: Nutch Distributed FileSystem (NDFS)
• moved out of Nutch and acquired by Yahoo! in 2006 as an independent
project called Hadoop
2
8
The origin of the name
• “Hadoop” is a made-up name, as explained by Doug Cutting:
“The name my kid gave a stuffed yellow elephant.
Short, relatively easy to spell and pronounce,
meaningless, and not used elsewhere: those are my
naming criteria. Kids are good at generating such.”
2
9
Hadoop: How it Works
30
Hadoop Architecture
• Hadoop framework consists of two main layers
• Distributed file system (HDFS)
• Execution engine (MapReduce)
Main node (single node)
Many worker nodes
31
MapReduce Framework
Hadoop Distributed File System (HDFS)
One namenode
Maintains metadata info about files:
• Maps a filename to a set of blocks
• Maps a block to the DataNodes where it resides
• Replication engine for blocks
File F 1 2 3 4 5
Blocks (64 MB)
Many datanode (1000s)
- Store the actual data
- Files are divided into blocks
- Each block is replicated r times
(Default = 3)
- Communicates with NameNode
through periodic “heartbeat” (once per 3
secs) 33
Data flow overview ClusterId
NameNode
(Master)
Client
Secondary
NameNode
ClusterId
DataNodes
3
4
Data Flow
⚫ Input, final output are stored on a distributed file
system
– Scheduler tries to schedule map tasks “close” to
physical storage location of input data
⚫ Intermediate results are stored on local FS of
map and reduce workers
⚫ Output is often input to another map reduce task
Distributed Execution Overview
User
Program
fork fork fork
assign Master
assign
map
reduce
Input Data Worker
write Output
local Worker File 0
Split 0 write
read
Split 1 Worker
Output
Split 2 Worker File 1
Worker remote
read,
sort
Heartbeats
• DataNodes send heartbeats to the NameNode
• Once every 3 secs
• NameNode uses heartbeats to detect DataNode failure
• No response in 10 mins is considered a failure
37
Replication engine
• Upon detecting a DataNode failure
• Choose new DataNodes for replicas
• Balance disk usage
• Balance communication traffic to DataNodes
38
HDFS Erasure Coding
• New feature introduced in Hadoop 3.0
• Problem with Replication Mechanism in HDFS
• Each replica uses 100% storage overhead, thus results
in 200% storage overhead.
• Cold replica
39
• Requires only 50% storage overhead.
• But can tolerate only 1 failure.
40
Reed-Solomon Algorithm
• RS multiplies 𝑚 data cells with a Generator Matrix (GT) to get
extended codeword with 𝑚 data cells and 𝑛 parity cells.
• Data can be recovered by multiplying the inverse of the generator
matrix with the extended codewords as long as 𝑚 out of 𝑚 + 𝑛 cells are
available.
• XOR is the special case with 𝑛 = 1
• Can tolerate up
to 𝑛 failures
• But increases
CPU load
41
NameNode failure
• A single point of failure
• Transaction log stored in multiple directories
• Directory on local file system
• A directory on a remote file system (NFS)
• Add a secondary NameNode
42
Hadoop Map-Reduce
(Example: Color Count)
Input blocks Produces (k, v) Shuffle & Sorting Consumes(k, [v])
on HDFS ( , 1) based on k ( , [1,1,1,1,1,1..])
Produces(k’, v’)
Map Parse-hash ( , 100)
Reduce
Map Parse-hash
Reduce
Map Parse-hash
Reduce
Map Parse-hash
Users only provide the “Map” and “Reduce” functions
43
Hadoop MapReduce
• Job Tracker is the master node (runs with the namenode)
• Receives the user’s job
• Decides on how many tasks will run (number of mappers)
• Decides on where to run each mapper (locality matters)
Node 1 Node 2 Node 3
• This file has 5 Blocks → run 5 map tasks
• Where to run the task reading block “1”
• Try to run it on Node 1 or Node 3
44
Hadoop MapReduce
• Task Tracker is the slave node (runs on each datanode)
• Receives the task from Job Tracker
• Runs the task until completion (either map or reduce task)
• Always in communication with the Job Tracker reporting progress
Map Parse-hash
Reduce
Map Parse-hash
Reduce
In this example, 1 map-reduce
job consists of 4 map tasks and 3
Map Parse-hash
reduce tasks
Reduce
Map Parse-hash
45
Failures
⚫ Map worker failure
– Map tasks completed or in-progress at worker are
reset to idle
– Reduce workers are notified when task is rescheduled
on another worker
⚫ Reduce worker failure
– Only in-progress tasks are reset to idle
⚫ Master failure
– MapReduce task is aborted and client is notified
On worker failure
• Detect failure via periodic heartbeats
• Workers send heartbeat messages (ping) periodically to
the master node
• Re-execute completed and in-progress map tasks
• Re-execute in-progress reduce tasks
• Task completion committed through master
47
Reference
• Chapter 6, Dan C. Marinescu, Cloud Computing Theory and Practice,
Second Edition
• https://www.ibm.com/docs/en/cics-ts/5.4?topic=processing-acid-
properties-transactions
• https://www.mongodb.com/nosql-explained/best-nosql-database
• Slides from, M. Silic, Analysis of Massive Dataset, University of Zagreb
48