Week 7 - Lecture Notes
Week 7 - Lecture Notes
in Cloud Computing
EL
PT
N
Dr. Rajiv Misra
Associate Professor
Dept. of Computer Science & Engg.
Indian Institute of Technology Patna
[email protected]
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Preface
Content of this Lecture:
EL
techniques in cloud computing systems.
PT
• We will study some of the widely-deployed P2P systems
such as: Napster, Gnutella, Fasttrack and BitTorrent and
N
P2P Systems with provable properties such as: Chord,
Pastry and Kelips.
EL
P2P techniques be abundant in cloud computing systems
PT
Key-value stores (e.g., Cassandra) use Chord p2p
hashing
N
EL
3. Fasttrack
4. BitTorrent
PT
P2P Systems with Provable Properties:
N
1. Chord
2. Pastry
3. Kelips
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Napster Structure
Filename Info about
Store a directory, i.e.,
filenames with peer pointers
Public enemy.mp3 Beatles,
@123.34.12.32:
EL
napster.com 1003
Servers
S S
Client machines
(“Peers”)
P
PT S
P
N
P P Store their own
P P
files
EL
Upload list of music files that you want to share
PT
Server maintains list of <filename, ip_address, portnum>
tuples. Server stores no files.
N
EL
(Server searches its list with the keywords)
Server returns a list of hosts - <ip_address, portnum>
PT
tuples - to client
Client pings each host in the list to find transfer rates
N
Client fetches file from best host
All communication uses TCP (Transmission Control Protocol)
Reliable and ordered networking protocol
EL
Store peer pointers
napster.com for all files
Servers
Client machines
(“Peers”)
PT
S
S
S 1. Query
N
P 3. Response P 4. ping candidates
EL
service.
PT
Message routed (after lookup in DNS=Domain Name
system) to introducer, a well known server that keeps
track of some recently joined nodes in p2p system
N
Introducer initializes new peers’ neighbor table
EL
napster.com declared to be responsible for users’
copyright violation
PT
“Indirect infringement”
Next P2P system: Gnutella
N
EL
Gnutella (possibly by analogy with the GNU Project) is a
large peer-to-peer network. It was the first decentralized
PT
peer-to-peer network of its kind.
[Mar 2000] release by AOL, immediately withdrawn, but
88K users by [Mar 2003]
N
Original design underwent several modifications
EL
P P
P
Also store
PT “peer pointers”
P
N
P P
P
Connected in an overlay graph
(== each link is an implicit Internet path)
EL
2. QueryHit (response to query)
3. Ping (to probe network for other peers)
PT
4. Pong (reply to ping, contains address of another peer)
5. Push (used to initiate file transfer)
N
Into the message structure and protocol
All fields except IP address are in little-endian format
Ox12345678 stored as 0x78 in lowest address byte, then
0x56 in next higher address, and so on.
EL
0 15 16 17 18 22
ID of this
search
0x00 Ping
0x01 Pong
PT
Type of payload
Number of bytes of
message following
this header
N
transaction Decremented at
0x40 Push each hop, Message
0x80 Query Incremented at
dropped when ttl=0
0x81 Queryhit ttl_initial usually 7 each hop
to 10
Gnutella Message Header Format
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
How do I search for a particular file?
EL
Query (0x80)
Minimum Speed Search criteria (keywords)
0 PT 1 …..
N
Payload Format in Gnutella Query Message
P P
P
EL
PT
TTL=2
P
N
P P
P
Who has Publicenemy.mp3?
EL
QueryHit (0x81) : successful result to a query
Num. hits port ip_address speed (fileindex,filename,fsize) servent_id
0 1 3
Info about
PT7 11
Results
n n+16
N
responder
Unique identifier of responder;
a function of its IP address
P P
P
EL
PT P
N
P P
P
Who has Publicenemy.mp3?
EL
received
Each Query (identified by DescriptorID) forwarded only
once
PT
QueryHit routed back only to peer from which Query
received with same DescriptorID
N
Duplicates with same DescriptorID and Payload descriptor
(msg type, e.g., Query) are dropped
QueryHit with DescriptorID for which Query not seen is
dropped
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
After receiving QueryHit messages
Requestor chooses “best” QueryHit responder
Initiates HTTP request directly to responder’s ip+port
GET /get/<File Index>/<File Name>/HTTP/1.0\r\n
Connection: Keep-Alive\r\n
EL
Range: bytes=0-\r\n
User-Agent: Gnutella\r\n
\r\n
PT
Responder then replies with file packets after this message:
N
HTTP 200 OK\r\n
Server: Gnutella\r\n
Content-type:application/binary\r\n
Content-length: 1024 \r\n
\r\n
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
After receiving QueryHit messages (2)
EL
Why the “range” field in the GET request?
PT
To support partial file transfers.
N
What if responder is behind firewall that disallows
incoming connections?
EL
P
P
P P
P
EL
Push (0x40)
servent_id fileindex ip_address port
PT
N
same as in
received QueryHit
Address at which
requestor can accept
incoming connections
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Dealing with Firewalls
Responder establishes a TCP connection at ip_address, port
specified. Sends
GIV <File Index>:<Servent Identifier>/<File Name>\n\n
EL
Requestor then sends GET to responder (as before) and file is
PT
transferred as explained earlier
N
What if requestor is behind firewall too?
Gnutella gives up
Can you think of an alternative solution?
EL
Port ip_address Num. files shared Num. KB shared
PT
• Peers initiate Ping’s periodically
• Pings flooded out like Querys, Pongs routed along reverse path
N
like QueryHits
• Pong replies used to update set of neighboring peers
EL
Queries flooded out, ttl restricted
QueryHit (replies) reverse path routed
PT
Supports file transfer through firewalls
Periodic Ping-pong to continuously refresh neighbor lists
N
List size specified by user at peer : heterogeneity means
some peers may have more neighbors
Gnutella found to follow power law distribution:
k
P(#links = L) ~ L (k is a constant)
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Problems
Ping/Pong constituted 50% traffic
Solution: Multiplex, cache and reduce frequency of
pings/pongs
EL
Repeated searches with same keywords
Solution: Cache Query, QueryHit messages
PT
Modem-connected hosts do not have enough bandwidth for
passing Gnutella traffic
Solution: use a central server to act as proxy for such peers
N
Another solution:
FastTrack System
EL
PT
Flooding causes excessive traffic
Is there some way of maintaining meta-information
N
about peers that leads to more intelligent routing?
Structured Peer-to-peer systems
Example: Chord System
EL
Underlying technology in Kazaa, KazaaLite, Grokster
PT
Proprietary protocol, but some details available
N
Like Gnutella, but with some peers designated as
supernodes
EL
PT S
N
P S
P
Supernodes
EL
Any peer can become (and stay) a supernode, provided it has
earned enough reputation
PT
Kazaalite: participation level (=reputation) of a user
between 0 and 1000, initially 10, then affected by length of
periods of connectivity and total number of uploads
N
More sophisticated Reputation schemes invented,
especially based on economics
A peer searches by contacting a nearby supernode
EL
.torrent and leaves from peers)
1. Get tracker
2. Get peers
PT Peer
3. Get file blocks
(seed,
has full file)
N
Peer
(new, leecher) Peer Peer
(seed)
(leecher,
has some blocks)
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
BitTorrent (2)
File split into blocks (32 KB – 256 KB)
Download Local Rarest First block policy: prefer early download of blocks that
are least replicated among neighbors
Exception: New node allowed to pick one random neighbor: helps in
bootstrapping
EL
Tit for tat bandwidth usage: Provide blocks to neighbors that provided it the
best download rates
PT
Incentive for nodes to provide good download rates
Seeds do the same too
Choking: Limit number of neighbors to which concurrent uploads <= a number
N
(5), i.e., the “best” neighbors
Everyone else choked
Periodically re-evaluate this set (e.g., every 10 s)
Optimistic unchoke: periodically (e.g., ~30 s), unchoke a random neigbhor –
helps keep unchoked set fresh
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
DHT (Distributed Hash Table)
A hash table allows you to insert, lookup and delete objects with
keys
A distributed hash table allows you to do the same in a distributed
EL
setting (objects=files)
Performance Concerns:
Load balancing
Fault-tolerancePT
Efficiency of lookups and inserts
N
Locality
Napster, Gnutella, FastTrack are all DHTs (sort of)
So is Chord, a structured peer to peer system
Lookup #Messages
Memory
Latency for a lookup
EL
O(1) O(1)
Napster O(1)
(O(N)@server)
Gnutella
PT
O(N) O(N) O(N)
N
Chord O(log(N)) O(log(N)) O(log(N))
EL
cost of routing (lookups/inserts)
Uses Consistent Hashing on node’s (peer’s) address
PT
SHA-1(ip_address,port) 160 bit string
Truncated to m bits
N
Called peer id (number between 0 and 2m 1 )
Not unique but id conflicts very unlikely
m
Can then map peers to one of 2 logical points on a circle
EL
N112 N16
N96
PT N32
N
N80 N45
0
N112 N16
EL
N96
PT N32
N
N80 N45
(similarly predecessors)
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Peer Pointers (2): finger tables
EL
PT
N
EL
File cnn.com/index.html that maps to key K42 is stored at first peer
with id greater than 42
PT
Note that we are considering a different file-sharing application
here : cooperative web caching
N
The same discussion applies to any other file sharing application,
including that of mp3 files.
Consistent Hashing => with K keys and N peers, each peer stores
O(K/N) keys. (i.e., < c.K/N, for some constant c)
Say m=7 0
N112 N16
EL
N96
PT N32
N
N80 N45
File with key K42
stored here
EL
N96
N32
PT
Who has cnn.com/index.html?
(hashes to K42)
N
N80 N45
File cnn.com/index.html with
key K42 stored here
0
Say m=7 N112 N16
At or to the anti-clockwise
EL
of k (it wraps around the
N96 ring)
PT
Who has cnn.com/index.html?
(hashes to K42)
N32
N
N80 N45
EL
At or to the anti-clockwise
of k (it wraps around the
N96 ring)
(hashes to K42) PT
Who has cnn.com/index.html?
N32
All “arrows” are RPCs
(remote procedure calls)
N
N80 N45
File cnn.com/index.html with
key K42 stored here
EL
Key
(intuition): at each step, distance between query and peer-
with-file reduces by a factor of at least 2
PT
(intuition): after log(N) forwardings, distance to key is at most
Number of node identifiers in a range of 2m / 2log(N ) = 2m / N
N
is O(log(N)) with high probability (why? SHA-1! and
“Balls and Bins”)
So using successors in that range will be ok, using another
O(log(N)) hops
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Analysis (Contd.)
O(log(N)) search time holds for file insertions too
(in general for routing to any key)
“Routing” can thus be used as a building block for
EL
• All operations: insert, lookup, delete
PT
O(log(N)) time true only if finger and successor entries
correct
N
When might these entries be wrong?
When you have failures
EL
N96 X
X
N32
(hashes to K42) PT
Who has cnn.com/index.html?
X
N
N80 N45
EL
N96
PT
Who has cnn.com/index.html?
(hashes to K42)
X
N32
N
N80 N45
File cnn.com/index.html with
key K42 stored here
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Search under peer failures
• Choosing r=2log(N) suffices to maintain lookup
correctness with high probability (i.e., ring connected)
• Say 50% of nodes fail
EL
• Pr(at given node, at least one successor alive)=
1 2 log N 1
PT1 ( )
2
1 2
N
• Pr(above is true at all alive nodes)=
N
1
1 N /2
(1 2 ) e 2N
1
N
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Search under peer failures (2)
0 Lookup fails
Say m=7
N112 N16 (N45 is dead)
EL
N96
Who has cnn.com/index.html? N32
(hashes to K42)
PT X
X
N
N80 N45
File cnn.com/index.html with
key K42 stored here
EL
N96
PT
Who has cnn.com/index.html?
(hashes to K42)
N32
K42 replicated
N
N80 X N45
File cnn.com/index.html with
key K42 stored here
K42 replicated
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Need to deal with dynamic changes
Peers fail
New peers join
Peers leave
EL
P2P systems have a high rate of churn (node join, leave and failure)
• 25% per hour in Overnet (eDonkey)
• 100% per hour in Gnutella
PT
• Lower in managed clusters
• Common feature in all distributed systems, including wide-area
N
(e.g., PlanetLab), clusters (e.g., Emulab), clouds (e.g., AWS), etc.
EL
Say m=7 0
N112 N16 Stabilization
N96
PT Protocol
(followed by
all nodes)
N
N32
N40
N80 N45
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
New peers joining (2)
N40 may need to copy some files/keys from N45
(files with file id between 32 and 40)
0
EL
Say m=7 N112 N16
N96 PT N32
N
N40
N80 N45 K34,K38
EL
Number of messages per peer join= O(log(N)*log(N))
PT
Similar set of operations for dealing with peers leaving
N
For dealing with failures, also need failure
detectors.
EL
that checks and updates pointers and keys
Ensures non-loopiness of fingers, eventual success of
PT
lookups and O(log(N)) lookups with high probability
Each stabilization round at a peer involves a constant
N
number of messages
2
Strong stability takes O( N ) stabilization rounds
EL
Leads to excessive (unnecessary) key copying (remember that keys are
replicated)
Stabilization algorithm may need to consume more bandwidth to keep
up
PT
Main issue is that files are replicated, while it might be sufficient to
N
replicate only meta information about files
Alternatives
– Introduce a level of indirection, i.e., store only pointers to files
(any p2p system)
– Replicate metadata more, e.g., Kelips
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Virtual Nodes
Hash can get non-uniform Bad load balancing
EL
independently
Each joins the system
PT
Reduces variance of load imbalance
N
EL
Current status of Chord project:
File systems (CFS,Ivy) built on top of Chord
PT
DNS lookup service built on top of Chord
Internet Indirection Infrastructure (I3) project at UCB
Spawned research on many interesting issues about p2p
N
systems
https://github.com/sit/dht/wiki
EL
Assigns ids to nodes, just like Chord
(using a virtual ring)
PT
Leaf Set - Each node knows its successor(s) and
N
predecessor(s)
EL
Routing is thus based on prefix matching, and is
thus log(N)
PT
And hops are short (in the underlying network)
N
EL
*
0*
01*
011* PT
N
… 0111010010*
When it needs to route to a peer, say 01110111001, it starts by
forwarding to a neighbor with the largest matching prefix, i.e.,
011101*
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Pastry Locality
For each prefix, say 011*, among all potential neighbors
with the matching prefix, the neighbor with the shortest
round-trip-time is selected
EL
Since shorter prefixes have many more candidates
(spread out throughout the Internet), the neighbors for
shorter prefixes are likely to be closer than the neighbors
for longer prefixesPT
Thus, in the prefix routing, early hops are short and later
N
hops are longer
Yet overall “stretch”, compared to direct Internet path,
stays short
EL
Black box lookup algorithms
Churn handling can get complex
PT
O(log(N)) memory and lookup cost
• O(log(N)) lookup hops may be high
N
• Can we reduce the number of hops?
EL
group (hash mod k) 18
160
Node’s neighbors 129
PT
(Almost) all other nodes
in its own affinity group
30
167
N
One contact node per
foreign affinity group …
Affinity #1 # k-1
Group # 0
EL
(in Kelips)
Each filename hashed to a group 18
PT
All nodes in the group replicate
pointer information, i.e.,
129
160
167
N
<filename, file location>
Spread using gossip 30
Affinity group does not store
files
…
Affinity #1 # k-1
Group # 0
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Kelips Lookup
• Publicenemy.mp3 hashes to k-1
Lookup • Everyone in this group stores
Find file affinity group <Publicenemy.mp3, who-has-file>
Go to your contact for the file
affinity group 15
EL
76
Failing that try another of your
neighbors to find a contact
18
167
N
files
30
• Fits in RAM of most
workstations/laptops today
(COTS machines)
…
Affinity Group # 0 #1 # k-1
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Kelips Soft State
• Publicenemy.mp3 hashes to k-1
• Everyone in this group stores
Membership lists <Publicenemy.mp3, who-has-file>
Gossip-based membership
15
Within each affinity group 76
EL
And also across affinity groups
O(log(N)) dissemination time 18
File metadata
PT 129
160
167
N
Needs to be periodically
refreshed from source node 30
Times out …
Affinity Group # 0 #1 # k-1
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Chord vs. Pastry vs. Kelips
Range of tradeoffs available:
EL
(to keep neighbors fresh)
PT
N
EL
3. Fasttrack
4. BitTorrent
PT
We have also discussed some of the P2P Systems with
N
Provable Properties such as:
1. Chord
2. Pastry
3. Kelips
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
MapReduce
EL
PT
N
Dr. Rajiv Misra
Associate Professor
Dept. of Computer Science & Engg.
Indian Institute of Technology Patna
[email protected]
Cloud Computing and DistributedVuSystems
Pham MapReduce
Preface
Content of this Lecture:
EL
paradigm’ and its internal working and
implementation overview.
PT
We will also see many examples and different
N
applications of MapReduce being used, and look into
how the ‘scheduling and fault tolerance’ works inside
MapReduce.
EL
Users specify a map function that processes a key/value
pair to generate a set of intermediate key/value pairs, and
PT
a reduce function that merges all intermediate values
associated with the same intermediate key.
N
Many real world tasks are expressible in this model.
EL
machines, handling machine failures, and managing the required
inter-machine communication.
PT
This allows programmers without any experience with parallel and
distributed systems to easily utilize the resources of a large
distributed system.
N
A typical MapReduce computation processes many terabytes of
data on thousands of machines. Hundreds of MapReduce
programs have been implemented and upwards of one thousand
MapReduce jobs are executed on Google's clusters every day.
EL
Try to keep replicas in different racks
Master node
PT
Also known as Name Nodes in HDFS
Stores metadata
N
Might be replicated
EL
But don’t want hassle of managing things
PT
MapReduce Architecture provides
Automatic parallelization & distribution
N
Fault tolerance
I/O scheduling
Monitoring & status updates
EL
PT
N
EL
[processes each record sequentially and independently]
(reduce + ‘(1 4 9 16))
PT
(+ 16 (+ 9 (+ 4 1) ) )
Output: 30
N
[processes set of all records in batches]
Let’s consider a sample application: Wordcount
You are given a huge dataset (e.g., Wikipedia dump or all of
Shakespeare’s works) and asked to list the count for each of the
words in each of the documents therein
Cloud Computing and DistributedVuSystems
Pham MapReduce
Map
EL
Key Value
PT
Welcome Everyone
Hello Everyone
Welcome
Everyone
Hello
1
1
1
N
Everyone 1
Input <filename, file text>
EL
MAP TASK 1
PT
Welcome Everyone
Hello Everyone
Welcome
Everyone
Hello
1
1
1
N
Everyone 1
Input <filename, file text>
MAP TASK 2
EL
PT
Welcome 1
Welcome Everyone
Everyone 1
Hello Everyone
Hello 1
Why are you here
I am also here Everyone 1
They are also here Why 1
Yes, it’s THEM!
Are 1
The same people we were thinking of
You 1
…….
N
Here 1
…….
EL
Key Value
Welcome 1
Everyone
Hello
1
1
PT Everyone
Hello
Welcome
2
1
1
N
Everyone 1
EL
Welcome 1 REDUCE Everyone 2
Everyone 1
Hello 1
Everyone 1
PT
TASK 1
REDUCE
TASK 2
Hello
Welcome
1
1
N
• Popular: Hash partitioning, i.e., key is assigned to
– reduce # = hash(key)%number of reduce tasks
Cloud Computing and DistributedVuSystems
Pham MapReduce
Programming Model
EL
The user of the MapReduce library expresses the
computation as two functions:
EL
a set of intermediate key/value pairs.
EL
It merges together these values to form a possibly smaller
set of values.
PT
Typically just zero or one output value is produced per
Reduce invocation. The intermediate values are supplied to
N
the user's reduce function via an iterator.
map(key, value):
// key: document name; value: text of document
EL
for each word w in value:
emit(w, 1)
PT
reduce(key, values):
N
// key: a word; values: an iterator over counts
result = 0
for each count v in values:
result += v
emit(key, result)
EL
map(k,v) list(k1,v1)
reduce(k1, list(v1)) v2
PT
(k1,v1) is an intermediate key/value pair
Output is the set of (k1,v2) pairs
N
EL
PT
N
EL
just copies the supplied intermediate data to the output.
Count of URL Access Frequency: The map function processes
PT
logs of web page requests and outputs (URL; 1). The reduce
function adds together all values for the same URL and emits a
(URL; total count) pair.
N
ReverseWeb-Link Graph: The map function outputs (target;
source) pairs for each link to a target URL found in a page named
source. The reduce function concatenates the list of all source
URLs associated with a given target URL and emits the pair:
(target; list(source))
Cloud Computing and DistributedVuSystems
Pham MapReduce
Contd…
Term-Vector per Host: A term vector summarizes the
most important words that occur in a document or a set
of documents as a list of (word; frequency) pairs.
EL
The map function emits a (hostname; term vector) pair
for each input document (where the hostname is
PT
extracted from the URL of the document).
N
The reduce function is passed all per-document term
vectors for a given host. It adds these term vectors
together, throwing away infrequent terms, and then emits
a final (hostname; term vector) pair
EL
list(document ID)) pair. The set of all output pairs forms a
simple inverted index. It is easy to augment this
PT
computation to keep track of word positions.
N
Distributed Sort: The map function extracts the key from
each record, and emits a (key; record) pair. The reduce
function emits all pairs unchanged.
EL
Output: lines that match pattern
EL
where (page a page b)
PT
Output: For each page, list of pages that link to it
N
Map – process web log and for each input <source,
target>, it outputs <target, source>
Reduce - emits <target, list(source)>
EL
Map – Process web log and outputs <URL, 1>
Multiple Reducers - Emits <URL, URL_count>
PT
(So far, like Wordcount. But still need %)
Chain another MapReduce job after above one
Map – Processes <URL, URL_count> and outputs
N
<1, (<URL, URL_count> )>
1 Reducer – Does two passes. In first pass, sums up all
URL_count’s to calculate overall_count. In second pass
calculates %’s
Emits multiple <URL, URL_count/overall_count>
Cloud Computing and DistributedVuSystems
Pham MapReduce
Applications of MapReduce
(4) Map task’s output is sorted (e.g., quicksort)
Reduce task’s input is sorted (e.g., mergesort)
Sort
EL
Input: Series of (key, value) pairs
Output: Sorted <value>s
PT
Map – <key, value> <value, _> (identity)
Reducer – <key, value> <key, value> (identity)
N
Partitioning function – partition keys across reducers
based on ranges (can’t use hashing!)
• Take data distribution into account to balance
reducer tasks
EL
PT
N
EL
4. Need to know very little about parallel/distributed programming!
EL
• All Map output records with same key assigned to same Reduce task
• use partitioning function, e.g., hash(key)%number of reducers
3. Parallelize Reduce: easy! each reduce task is independent of the other!
•
output PT
4. Implement Storage for Map input, Map output, Reduce input, and Reduce
A A I
2
EL
3
4 B B II
5
6
7
PT C
III
N
C
Blocks Servers Servers
from DFS
(Local write, remote read)
EL
–
• Scheduling PT
Global Resource Manager (RM)
EL
1. Need
container
Node A
PT
3. Container on Node B
Node Manager B
N
Application Application Task
4. Start task, please!
Master 1 Master 2 (App2)
EL
PT
N
• If server fails, RM lets all affected AMs know, and AMs take
appropriate action
EL
– NM keeps track of each task running at its server
PT
– AM heartbeats to RM
EL
Due to Bad Disk, Network Bandwidth, CPU, or Memory
PT
Keep track of “progress” of each task (% done)
EL
• Maybe on different racks, e.g., 2 on a rack, 1 on a
different rack
PT
– Mapreduce attempts to schedule a map task on
EL
PT
N
EL
For example, one implementation may be suitable for a
small shared-memory machine, another for a large NUMA
PT
multi-processor, and yet another for an even larger
collection of networked machines.
N
Here we describes an implementation targeted to the
computing environment in wide use at Google: large
clusters of commodity PCs connected together with
switched Ethernet.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Contd…
(1) Machines are typically dual-processor x86 processor running
Linux, with 2-4 GB of memory per machine.
(2) Commodity networking hardware is used . Typically either
100 megabits/second or 1 gigabit/second at the machine
EL
level, but averaging considerably less in overall bisection
bandwidth.
PT
(3) A cluster consists of hundreds or thousands of machines, and
therefore machine failures are common.
(4) Storage is provided by inexpensive IDE disks attached
N
directly to individual machines.
(5) Users submit jobs to a scheduling system. Each job consists
of a set of tasks, and is mapped by the scheduler to a set of
available machines within a cluster.
EL
machines.
PT
Reduce invocations are distributed by partitioning the
intermediate key space into R pieces using a partitioning
function (e.g., hash(key) mod R).
N
The number of partitions (R) and the partitioning function are
specified by the user.
Figure 1 shows the overall flow of a MapReduce operation.
EL
map reduce
Intermediate
Input Files Map phase Files on Disk Reduce phase Output Files
EL
of machines.
2. One of the copies of the program is special- the master. The rest are
PT
workers that are assigned work by the master. There are M map
tasks and R reduce tasks to assign. The master picks idle workers
and assigns each one a map task or a reduce task.
N
3. A worker who is assigned a map task reads the contents of the
corresponding input split. It parses key/value pairs out of the input
data and passes each pair to the user-defined Map function. The
intermediate key/value pairs produced by the Map function are
buffered in memory.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Contd…
4. Periodically, the buffered pairs are written to local disk,
partitioned into R regions by the partitioning function.
The locations of these buffered pairs on the local disk are passed
back to the master, who is responsible for forwarding these
EL
locations to the reduce workers.
5. When a reduce worker is notified by the master about these
locations, it uses remote procedure calls to read the buffered
PT
data from the local disks of the map workers. When a reduce
worker has read all intermediate data, it sorts it by the
intermediate keys so that all occurrences of the same key are
N
grouped together.
The sorting is needed because typically many different keys map
to the same reduce task. If the amount of intermediate data is
too large to fit in memory, an external sort is used.
EL
The output of the Reduce function is appended to a final
output file for this reduce partition.
PT
7. When all map tasks and reduce tasks have been completed,
the master wakes up the user program.
N
At this point, the MapReduce call in the user program returns
back to the user code.
EL
Typically, users do not need to combine these R output files
into one file- they often pass these files as input to another
PT
MapReduce call, or use them from another distributed
application that is able to deal with input that is partitioned
into multiple files.
N
EL
The master is the conduit through which the location of
PT
intermediate le regions is propagated from map tasks to
reduce tasks. Therefore, for each completed map task, the
master stores the locations and sizes of the R intermediate file
N
regions produced by the map task.
EL
Map worker failure
Map tasks completed or in-progress at worker are reset to
idle
PT
Reduce workers are notified when task is rescheduled on
another worker
N
Reduce worker failure
Only in-progress tasks are reset to idle
Master failure
MapReduce task is aborted and client is notified
Cloud Computing and DistributedVuSystems
Pham MapReduce
Locality
Network bandwidth is a relatively scarce resource in the computing
environment. We can conserve network bandwidth by taking
advantage of the fact that the input data (managed by GFS) is
stored on the local disks of the machines that make up our cluster.
GFS divides each file into 64 MB blocks, and stores several copies of
EL
each block (typically 3 copies) on different machines.
The MapReduce master takes the location information of the input
PT
les into account and attempts to schedule a map task on a machine
that contains a replica of the corresponding input data. Failing that,
it attempts to schedule a map task near a replica of that task's
N
input data (e.g., on a worker machine that is on the same network
switch as the machine containing the data).
When running large MapReduce operations on a significant
fraction of the workers in a cluster, most input data is read locally
and consumes no network bandwidth.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Task Granularity
The Map phase is subdivided into M pieces and the reduce
phase into R pieces.
Ideally, M and R should be much larger than the number of
worker machines.
EL
Having each worker perform many different tasks improves
dynamic load balancing, and also speeds up recovery when a
PT
worker fails: the many map tasks it has completed can be
spread out across all the other worker machines.
There are practical bounds on how large M and R can be, since
N
the master must make O(M + R) scheduling decisions and
keeps O(M * R) state in memory.
Furthermore, R is often constrained by users because the
output of each reduce task ends up in a separate output file.
EL
For reduce, we need to ensure that records with the
same intermediate key end up at the same worker
PT
System uses a default partition function e.g.,
hash(key) mod R
N
Sometimes useful to override
E.g., hash(hostname(URL)) mod R ensures URLs
from a host end up in the same output file
EL
This ordering guarantee makes it easy to generate a
sorted output file per partition, which is useful when
PT
the output file format needs to support efficient random
access lookups by key, or users of the output and it
N
convenient to have the data sorted.
EL
A good example of this is the word counting example. Since
word frequencies tend to follow a Zipf distribution, each map
task will produce hundreds or thousands of records of the form
<the, 1>.
PT
N
All of these counts will be sent over the network to a single
reduce task and then added together by the Reduce function to
produce one number. We allow the user to specify an optional
Combiner function that does partial merging of this data before it
is sent over the network.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Combiners Function (2)
The Combiner function is executed on each machine that
performs a map task.
Typically the same code is used to implement both the
combiner and the reduce functions.
EL
The only difference between a reduce function and a
combiner function is how the MapReduce library handles
PT
the output of the function.
The output of a reduce function is written to the final
N
output file. The output of a combiner function is written
to an intermediate le that will be sent to a reduce task.
Partial combining significantly speeds up certain classes of
MapReduce operations.
EL
PT
N
map(key, value):
// key: document name; value: text of document
EL
for each word w in value:
emit(w, 1)
PT
reduce(key, values):
N
// key: a word; values: an iterator over counts
result = 0
for each count v in values:
result += v
emit(key, result)
map(key=url, val=contents):
For each word w in contents, emit (w, “1”)
reduce(key=word, values=uniq_counts):
Sum all “1”s in values list
EL
Emit result “(word, sum)”
see 1 bob 1
see bob run
see spot throw PT bob 1
run 1
run 1
see 2
N
see 1 spot 1
spot 1 throw 1
throw 1
EL
string and outputs the length of the word as the key and
the word itself as the value then
PT
map(steve) would return 5:steve and
N
map(savannah) would return 8:savannah.
EL
3 : and
They get grouped as:
3 : you
4 : then
4 : what PT 3 : [the, and, you]
4 : [then, what, when]
N
4 : when
5 : [steve, where]
5 : steve 8 : [savannah, research]
5 : where
8 : savannah
8 : research
Cloud Computing and DistributedVuSystems
Pham MapReduce
Example 2: Counting words of different lengths
Each of these lines would then be passed as an argument
to the reduce function, which accepts a key and a list of
values.
In this instance, we might be trying to figure out how many
EL
words of certain lengths exist, so our reduce function will
just count the number of items in the list and output the
key with the size of the list, like:
3:3
PT
N
4:3
5:2
8:2
EL
corpus, etc...
PT
The most common example of mapreduce is for counting
the number of times words occur in a corpus.
N
EL
common processing request is the "You and Joe have 230 friends in
common" feature.
PT
When you visit someone's profile, you see a list of friends that you
have in common. This list doesn't change frequently so it'd be
wasteful to recalculate it every time you visited the profile (sure you
N
could use a decent caching strategy, but then we wouldn't be able to
continue writing about mapreduce for this problem).
We're going to use mapreduce so that we can calculate everyone's
common friends once a day and store those results. Later on it's just
a quick lookup. We've got lots of disk, it's cheap.
A -> B C D
EL
B -> A C D E
C -> A B D E
D -> A B C E
E -> B C D
PT
N
EL
(A D) -> B C D
(A C) -> A B D E
EL
(B C) -> A B D E And finally for map(E -> B C D):
(C D) -> A B D E
(C E) -> A B D E (B E) -> B C D
PT
For map(D -> A B C E) :
(A D) -> A B C E
(C E) -> B C D
(D E) -> B C D
N
(B D) -> A B C E
(C D) -> A B C E
(D E) -> A B C E
(A B) -> (A C D E) (B C D)
EL
(A C) -> (A B D E) (B C D)
(A D) -> (A B C E) (B C D)
PT
(B C) -> (A B D E) (A C D E)
(B D) -> (A B C E) (A C D E)
N
(B E) -> (A C D E) (B C D)
(C D) -> (A B C E) (A B D E)
(C E) -> (A B D E) (B C D)
(D E) -> (A B C E) (B C D)
EL
and output the same key with the result of the intersection.
PT
For example, reduce((A B) -> (A C D E) (B C D))
will output (A B) : (C D)
and means that friends A and B have C and D as common
N
friends.
EL
(A D) -> (B C)
(B C) -> (A D E)
Now when D visits B's profile,
(B D) -> (A C E)
(B E) -> (C D) PT we can quickly look up (B D) and
see that they have three friends
N
(C D) -> (A B E) in common, (A C E).
(C E) -> (B D)
(D E) -> (B C)
EL
Clusters”
PT
http://labs.google.com/papers/mapreduce.html
N
EL
experience with parallel and distributed systems, since it hides the
details of parallelization, fault-tolerance, locality optimization, and
load balancing.
PT
A large variety of problems are easily expressible as MapReduce
N
computations.
EL
Need to deal with failure
PT
Plenty of ongoing research work in scheduling and
N
fault-tolerance for Mapreduce and Hadoop.