0% found this document useful (0 votes)
24 views140 pages

Week 7 - Lecture Notes

Uploaded by

rjr06477
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
24 views140 pages

Week 7 - Lecture Notes

Uploaded by

rjr06477
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd

Peer to Peer Systems

in Cloud Computing

EL
PT
N
Dr. Rajiv Misra
Associate Professor
Dept. of Computer Science & Engg.
Indian Institute of Technology Patna
[email protected]
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Preface
Content of this Lecture:

• In this lecture, we will discuss the Peer to Peer (P2P)

EL
techniques in cloud computing systems.

PT
• We will study some of the widely-deployed P2P systems
such as: Napster, Gnutella, Fasttrack and BitTorrent and
N
P2P Systems with provable properties such as: Chord,
Pastry and Kelips.

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Need of Peer to Peer Systems
First distributed systems that seriously focused on
scalability with respect to number of nodes

EL
P2P techniques be abundant in cloud computing systems

PT
Key-value stores (e.g., Cassandra) use Chord p2p
hashing
N

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
P2P Systems
Widely-deployed P2P Systems:
1. Napster
2. Gnutella

EL
3. Fasttrack
4. BitTorrent
PT
P2P Systems with Provable Properties:
N
1. Chord
2. Pastry
3. Kelips
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Napster Structure
Filename Info about
Store a directory, i.e.,
filenames with peer pointers
Public enemy.mp3 Beatles,
@123.34.12.32:

EL
napster.com 1003
Servers
S S
Client machines
(“Peers”)
P
PT S

P
N
P P Store their own
P P
files

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Napster Structure
Client

Connect to a Napster server:

EL
Upload list of music files that you want to share

PT
Server maintains list of <filename, ip_address, portnum>
tuples. Server stores no files.
N

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Napster Operations
Client (contd.)
Search
Send server keywords to search with

EL
(Server searches its list with the keywords)
Server returns a list of hosts - <ip_address, portnum>
PT
tuples - to client
Client pings each host in the list to find transfer rates
N
Client fetches file from best host
All communication uses TCP (Transmission Control Protocol)
Reliable and ordered networking protocol

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Napster Search

2. All servers search their lists (ternary tree algorithm)

EL
Store peer pointers
napster.com for all files
Servers
Client machines
(“Peers”)
PT
S
S
S 1. Query
N
P 3. Response P 4. ping candidates

P P Store their own


P P
5. download from best host files
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Nodes Joining a P2P system
Can be used for any p2p system

Send an http request to well-known url for that P2P

EL
service.

PT
Message routed (after lookup in DNS=Domain Name
system) to introducer, a well known server that keeps
track of some recently joined nodes in p2p system
N
Introducer initializes new peers’ neighbor table

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Issues with Napster
Centralized server a source of congestion
Centralized server single point of failure
No security: plaintext messages and passwords

EL
napster.com declared to be responsible for users’
copyright violation
PT
“Indirect infringement”
Next P2P system: Gnutella
N

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Gnutella
Eliminate the servers
Client machines search and retrieve amongst themselves
Clients act as servers too, called servents

EL
Gnutella (possibly by analogy with the GNU Project) is a
large peer-to-peer network. It was the first decentralized

PT
peer-to-peer network of its kind.
[Mar 2000] release by AOL, immediately withdrawn, but
88K users by [Mar 2003]
N
Original design underwent several modifications

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Gnutella
Store their own
files
Servents (“Peers”)

EL
P P
P
Also store
PT “peer pointers”
P
N
P P
P
Connected in an overlay graph
(== each link is an implicit Internet path)

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
How do I search for a particular file?
Gnutella routes different messages within the overlay graph
Gnutella protocol has 5 main message types
1. Query (search)

EL
2. QueryHit (response to query)
3. Ping (to probe network for other peers)

PT
4. Pong (reply to ping, contains address of another peer)
5. Push (used to initiate file transfer)
N
Into the message structure and protocol
All fields except IP address are in little-endian format
Ox12345678 stored as 0x78 in lowest address byte, then
0x56 in next higher address, and so on.

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
How do I search for a particular file?
Descriptor Header Payload

Descriptor ID Payload descriptor TTL Hops Payload length

EL
0 15 16 17 18 22

ID of this
search
0x00 Ping
0x01 Pong
PT
Type of payload
Number of bytes of
message following
this header
N
transaction Decremented at
0x40 Push each hop, Message
0x80 Query Incremented at
dropped when ttl=0
0x81 Queryhit ttl_initial usually 7 each hop
to 10
Gnutella Message Header Format
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
How do I search for a particular file?

EL
Query (0x80)
Minimum Speed Search criteria (keywords)
0 PT 1 …..
N
Payload Format in Gnutella Query Message

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Gnutella Search
Query’s flooded out, ttl-restricted, forwarded only once

P P
P

EL
PT
TTL=2
P
N
P P
P
Who has Publicenemy.mp3?

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Gnutella Search

EL
QueryHit (0x81) : successful result to a query
Num. hits port ip_address speed (fileindex,filename,fsize) servent_id

0 1 3
Info about
PT7 11
Results
n n+16
N
responder
Unique identifier of responder;
a function of its IP address

Payload Format in Gnutella QueryHit Message

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Gnutella Search
Successful results QueryHit’s routed on reverse path

P P
P

EL
PT P
N
P P
P
Who has Publicenemy.mp3?

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Avoiding excessive traffic
To avoid duplicate transmissions, each peer maintains a
list of recently received messages
Query forwarded to all neighbors except peer from which

EL
received
Each Query (identified by DescriptorID) forwarded only
once
PT
QueryHit routed back only to peer from which Query
received with same DescriptorID
N
Duplicates with same DescriptorID and Payload descriptor
(msg type, e.g., Query) are dropped
QueryHit with DescriptorID for which Query not seen is
dropped
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
After receiving QueryHit messages
Requestor chooses “best” QueryHit responder
Initiates HTTP request directly to responder’s ip+port
GET /get/<File Index>/<File Name>/HTTP/1.0\r\n
Connection: Keep-Alive\r\n

EL
Range: bytes=0-\r\n
User-Agent: Gnutella\r\n
\r\n
PT
Responder then replies with file packets after this message:
N
HTTP 200 OK\r\n
Server: Gnutella\r\n
Content-type:application/binary\r\n
Content-length: 1024 \r\n
\r\n
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
After receiving QueryHit messages (2)

HTTP is the file transfer protocol. Why?


Because it’s standard, well-debugged, and widely used.

EL
Why the “range” field in the GET request?

PT
To support partial file transfers.
N
What if responder is behind firewall that disallows
incoming connections?

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Dealing with Firewalls
Requestor sends Push to responder asking for file transfer

EL
P
P

Has Public enemy.mp3


PT But behind firewall
N
P

P P
P

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Dealing with Firewalls

EL
Push (0x40)
servent_id fileindex ip_address port

PT
N
same as in
received QueryHit
Address at which
requestor can accept
incoming connections
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Dealing with Firewalls
Responder establishes a TCP connection at ip_address, port
specified. Sends
GIV <File Index>:<Servent Identifier>/<File Name>\n\n

EL
Requestor then sends GET to responder (as before) and file is

PT
transferred as explained earlier
N
What if requestor is behind firewall too?
Gnutella gives up
Can you think of an alternative solution?

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Ping-Pong
Ping (0x00)
no payload
Pong (0x01)

EL
Port ip_address Num. files shared Num. KB shared

PT
• Peers initiate Ping’s periodically

• Pings flooded out like Querys, Pongs routed along reverse path
N
like QueryHits
• Pong replies used to update set of neighboring peers

• to keep neighbor lists fresh in spite of peers joining, leaving


and failing
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Summary: Gnutella
No servers
Peers/servents maintain “neighbors”, this forms an overlay
graph
Peers store their own files

EL
Queries flooded out, ttl restricted
QueryHit (replies) reverse path routed

PT
Supports file transfer through firewalls
Periodic Ping-pong to continuously refresh neighbor lists
N
List size specified by user at peer : heterogeneity means
some peers may have more neighbors
Gnutella found to follow power law distribution:
k
P(#links = L) ~ L (k is a constant)
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Problems
Ping/Pong constituted 50% traffic
Solution: Multiplex, cache and reduce frequency of
pings/pongs

EL
Repeated searches with same keywords
Solution: Cache Query, QueryHit messages

PT
Modem-connected hosts do not have enough bandwidth for
passing Gnutella traffic
Solution: use a central server to act as proxy for such peers
N
Another solution:
FastTrack System

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Problems (Contd...)
Large number of freeloaders
70% of users in 2000 were freeloaders
Only download files, never upload own files

EL
PT
Flooding causes excessive traffic
Is there some way of maintaining meta-information
N
about peers that leads to more intelligent routing?
 Structured Peer-to-peer systems
Example: Chord System

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
FastTrack
Hybrid between Gnutella and Napster

Takes advantage of “healthier” participants in the system

EL
Underlying technology in Kazaa, KazaaLite, Grokster

PT
Proprietary protocol, but some details available
N
Like Gnutella, but with some peers designated as
supernodes

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
A FastTrack-like System
Peers
P
P
P

EL
PT S
N
P S
P
Supernodes

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
FastTrack (Contd...)
A supernode stores a directory listing a subset of nearby
(<filename,peer pointer>), similar to Napster servers
Supernode membership changes over time

EL
Any peer can become (and stay) a supernode, provided it has
earned enough reputation

PT
Kazaalite: participation level (=reputation) of a user
between 0 and 1000, initially 10, then affected by length of
periods of connectivity and total number of uploads
N
More sophisticated Reputation schemes invented,
especially based on economics
A peer searches by contacting a nearby supernode

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
BitTorrent

Website links to Tracker, per file (receives heartbeats, joins

EL
.torrent and leaves from peers)

1. Get tracker
2. Get peers
PT Peer
3. Get file blocks
(seed,
has full file)
N
Peer
(new, leecher) Peer Peer
(seed)
(leecher,
has some blocks)
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
BitTorrent (2)
File split into blocks (32 KB – 256 KB)
Download Local Rarest First block policy: prefer early download of blocks that
are least replicated among neighbors
Exception: New node allowed to pick one random neighbor: helps in
bootstrapping

EL
Tit for tat bandwidth usage: Provide blocks to neighbors that provided it the
best download rates

PT
Incentive for nodes to provide good download rates
Seeds do the same too
Choking: Limit number of neighbors to which concurrent uploads <= a number
N
(5), i.e., the “best” neighbors
Everyone else choked
Periodically re-evaluate this set (e.g., every 10 s)
Optimistic unchoke: periodically (e.g., ~30 s), unchoke a random neigbhor –
helps keep unchoked set fresh
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
DHT (Distributed Hash Table)
A hash table allows you to insert, lookup and delete objects with
keys
A distributed hash table allows you to do the same in a distributed

EL
setting (objects=files)
Performance Concerns:
Load balancing
Fault-tolerancePT
Efficiency of lookups and inserts
N
Locality
Napster, Gnutella, FastTrack are all DHTs (sort of)
So is Chord, a structured peer to peer system

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Comparative Performance

Lookup #Messages
Memory
Latency for a lookup

EL
O(1) O(1)
Napster O(1)
(O(N)@server)

Gnutella
PT
O(N) O(N) O(N)
N
Chord O(log(N)) O(log(N)) O(log(N))

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Chord
Developers: I. Stoica, D. Karger, F. Kaashoek, H. Balakrishnan,
R. Morris, Berkeley and MIT
Intelligent choice of neighbors to reduce latency and message

EL
cost of routing (lookups/inserts)
Uses Consistent Hashing on node’s (peer’s) address

PT
SHA-1(ip_address,port) 160 bit string
Truncated to m bits
N
Called peer id (number between 0 and 2m  1 )
Not unique but id conflicts very unlikely
m
Can then map peers to one of 2 logical points on a circle

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Ring of peers
Say m=7
6 nodes
0

EL
N112 N16

N96
PT N32
N
N80 N45

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Peer Pointers (1): Successors
Say m=7

0
N112 N16

EL
N96
PT N32
N
N80 N45

(similarly predecessors)
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Peer Pointers (2): finger tables

EL
PT
N

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
What about the files?
Filenames also mapped using same consistent hash function
SHA-1(filename) 160 bit string (key)
File is stored at first peer with id greater than or equal to its key
(mod 2 m )

EL
File cnn.com/index.html that maps to key K42 is stored at first peer
with id greater than 42

PT
Note that we are considering a different file-sharing application
here : cooperative web caching
N
The same discussion applies to any other file sharing application,
including that of mp3 files.
Consistent Hashing => with K keys and N peers, each peer stores
O(K/N) keys. (i.e., < c.K/N, for some constant c)

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Mapping Files

Say m=7 0
N112 N16

EL
N96

PT N32
N
N80 N45
File with key K42
stored here

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Search
Say m=7 0
N112 N16

EL
N96
N32
PT
Who has cnn.com/index.html?
(hashes to K42)
N
N80 N45
File cnn.com/index.html with
key K42 stored here

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Search
At node n, send query for key k to largest successor/finger entry <= k
if none exist, send query to successor(n)

0
Say m=7 N112 N16
At or to the anti-clockwise

EL
of k (it wraps around the
N96 ring)

PT
Who has cnn.com/index.html?
(hashes to K42)
N32
N
N80 N45

File cnn.com/index.html with


key K42 stored here
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Search
At node n, send query for key k to largest successor/finger entry <= k
if none exist, send query to successor(n)
0
N112 N16
Say m=7

EL
At or to the anti-clockwise
of k (it wraps around the
N96 ring)

(hashes to K42) PT
Who has cnn.com/index.html?
N32
All “arrows” are RPCs
(remote procedure calls)
N
N80 N45
File cnn.com/index.html with
key K42 stored here

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Analysis
Here
Search takes O(log(N)) time
Proof : Next hop

EL
Key
(intuition): at each step, distance between query and peer-
with-file reduces by a factor of at least 2

PT
(intuition): after log(N) forwardings, distance to key is at most
Number of node identifiers in a range of 2m / 2log(N ) = 2m / N
N
is O(log(N)) with high probability (why? SHA-1! and
“Balls and Bins”)
So using successors in that range will be ok, using another
O(log(N)) hops
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Analysis (Contd.)
O(log(N)) search time holds for file insertions too
(in general for routing to any key)
“Routing” can thus be used as a building block for

EL
• All operations: insert, lookup, delete

PT
O(log(N)) time true only if finger and successor entries
correct
N
When might these entries be wrong?
When you have failures

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Search under peer failures
Lookup fails
Say m=7 0 (N16 does not know N45)
N112 N16

EL
N96 X
X
N32
(hashes to K42) PT
Who has cnn.com/index.html?
X
N
N80 N45

File cnn.com/index.html with


key K42 stored here

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Search under peer failures
One solution: maintain r multiple successor entries
In case of failure, use successor entries
Say m=7
0
N112 N16

EL
N96

PT
Who has cnn.com/index.html?
(hashes to K42)
X
N32
N
N80 N45
File cnn.com/index.html with
key K42 stored here
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Search under peer failures
• Choosing r=2log(N) suffices to maintain lookup
correctness with high probability (i.e., ring connected)
• Say 50% of nodes fail

EL
• Pr(at given node, at least one successor alive)=
1 2 log N 1
PT1 ( )
2
 1 2
N
• Pr(above is true at all alive nodes)=
N
1
1 N /2 
(1  2 )  e 2N
1
N
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Search under peer failures (2)
0 Lookup fails
Say m=7
N112 N16 (N45 is dead)

EL
N96
Who has cnn.com/index.html? N32
(hashes to K42)
PT X

X
N
N80 N45
File cnn.com/index.html with
key K42 stored here

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Search under peer failures (2)
One solution: replicate file/key at r successors and predecessors
0
Say m=7 N112 N16

EL
N96

PT
Who has cnn.com/index.html?
(hashes to K42)
N32

K42 replicated
N
N80 X N45
File cnn.com/index.html with
key K42 stored here
K42 replicated
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Need to deal with dynamic changes
 Peers fail
New peers join
Peers leave

EL
P2P systems have a high rate of churn (node join, leave and failure)
• 25% per hour in Overnet (eDonkey)
• 100% per hour in Gnutella

PT
• Lower in managed clusters
• Common feature in all distributed systems, including wide-area
N
(e.g., PlanetLab), clusters (e.g., Emulab), clouds (e.g., AWS), etc.

So, all the time, need to:


 Need to update successors and fingers, and copy keys
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
New peers joining
Introducer directs N40 to N45 (and N32)
N32 updates successor to N40
N40 initializes successor to N45, and inits fingers from it
N40 periodically talks to neighbors to update finger table

EL
Say m=7 0
N112 N16 Stabilization

N96
PT Protocol
(followed by
all nodes)
N
N32

N40
N80 N45
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
New peers joining (2)
N40 may need to copy some files/keys from N45
(files with file id between 32 and 40)
0

EL
Say m=7 N112 N16

N96 PT N32
N
N40
N80 N45 K34,K38

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
New peers joining (3)
A new peer affects O(log(N)) other finger entries in
the system, on average [Why?]

EL
Number of messages per peer join= O(log(N)*log(N))

PT
Similar set of operations for dealing with peers leaving
N
For dealing with failures, also need failure
detectors.

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Stabilization Protocol
Concurrent peer joins, leaves, failures might cause
loopiness of pointers, and failure of lookups
Chord peers periodically run a stabilization algorithm

EL
that checks and updates pointers and keys
Ensures non-loopiness of fingers, eventual success of

PT
lookups and O(log(N)) lookups with high probability
Each stabilization round at a peer involves a constant
N
number of messages
2
Strong stability takes O( N ) stabilization rounds

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Churn
When nodes are constantly joining, leaving, failing
Significant effect to consider: traces from the Overnet system show
hourly peer turnover rates (churn) could be 25-100% of total number of
nodes in system

EL
Leads to excessive (unnecessary) key copying (remember that keys are
replicated)
Stabilization algorithm may need to consume more bandwidth to keep
up
PT
Main issue is that files are replicated, while it might be sufficient to
N
replicate only meta information about files
Alternatives
– Introduce a level of indirection, i.e., store only pointers to files
(any p2p system)
– Replicate metadata more, e.g., Kelips
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Virtual Nodes
Hash can get non-uniform  Bad load balancing

Treat each node as multiple virtual nodes behaving

EL
independently
Each joins the system

PT
Reduces variance of load imbalance
N

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Remarks
Virtual Ring and Consistent Hashing used in Cassandra,
Riak, Voldemort, DynamoDB, and other key-value
stores

EL
Current status of Chord project:
File systems (CFS,Ivy) built on top of Chord

PT
DNS lookup service built on top of Chord
Internet Indirection Infrastructure (I3) project at UCB
Spawned research on many interesting issues about p2p
N
systems

https://github.com/sit/dht/wiki

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Pastry
Designed by Anthony Rowstron (Microsoft Research)
and Peter Druschel (Rice University)

EL
Assigns ids to nodes, just like Chord
(using a virtual ring)

PT
Leaf Set - Each node knows its successor(s) and
N
predecessor(s)

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Pastry Neighbors
Routing tables based on prefix matching
Think of a hypercube

EL
Routing is thus based on prefix matching, and is
thus log(N)
PT
And hops are short (in the underlying network)
N

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Pastry Routing
Consider a peer with id 01110100101. It maintains a neighbor
peer with an id matching each of the following prefixes
(* = starting bit differing from this peer’s corresponding bit):

EL
*
0*
01*
011* PT
N
… 0111010010*
When it needs to route to a peer, say 01110111001, it starts by
forwarding to a neighbor with the largest matching prefix, i.e.,
011101*
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Pastry Locality
For each prefix, say 011*, among all potential neighbors
with the matching prefix, the neighbor with the shortest
round-trip-time is selected

EL
Since shorter prefixes have many more candidates
(spread out throughout the Internet), the neighbors for
shorter prefixes are likely to be closer than the neighbors
for longer prefixesPT
Thus, in the prefix routing, early hops are short and later
N
hops are longer
Yet overall “stretch”, compared to direct Internet path,
stays short

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Summary: Chord and Pastry
Chord and Pastry protocols:

More structured than Gnutella

EL
Black box lookup algorithms
Churn handling can get complex
PT
O(log(N)) memory and lookup cost
• O(log(N)) lookup hops may be high
N
• Can we reduce the number of hops?

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Kelips : A 1 hop Lookup DHT
k “affinity groups”
15
k~√N 76

Each node hashed to a

EL
group (hash mod k) 18
160
Node’s neighbors 129

PT
(Almost) all other nodes
in its own affinity group
30
167
N
One contact node per
foreign affinity group …
Affinity #1 # k-1
Group # 0

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Kelips Files and Metadata
• Publicenemy.mp3 hashes to k-1
File can be stored at any (few) • Everyone in this group stores
node(s) <Publicenemy.mp3, who-has-file>
Decouple file replication/location 15
(outside Kelips) from file querying 76

EL
(in Kelips)
Each filename hashed to a group 18

PT
All nodes in the group replicate
pointer information, i.e.,
129
160

167
N
<filename, file location>
Spread using gossip 30
Affinity group does not store
files

Affinity #1 # k-1
Group # 0
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Kelips Lookup
• Publicenemy.mp3 hashes to k-1
Lookup • Everyone in this group stores
Find file affinity group <Publicenemy.mp3, who-has-file>
Go to your contact for the file
affinity group 15

EL
76
Failing that try another of your
neighbors to find a contact
18

• Memory cost O(√ N)PT


Lookup = 1 hop (or a few)

• 1.93 MB for 100K nodes, 10M


129
160

167
N
files
30
• Fits in RAM of most
workstations/laptops today
(COTS machines)

Affinity Group # 0 #1 # k-1
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Kelips Soft State
• Publicenemy.mp3 hashes to k-1
• Everyone in this group stores
Membership lists <Publicenemy.mp3, who-has-file>
Gossip-based membership
15
Within each affinity group 76

EL
And also across affinity groups
O(log(N)) dissemination time 18

File metadata
PT 129
160

167
N
Needs to be periodically
refreshed from source node 30

Times out …
Affinity Group # 0 #1 # k-1
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
Chord vs. Pastry vs. Kelips
Range of tradeoffs available:

Memory vs. lookup cost vs. background bandwidth

EL
(to keep neighbors fresh)

PT
N

Cloud Computing and DistributedVuSystems


Pham P2P Systems in Cloud Computing
Conclusion
In this lecture, we have studied some of the widely-
deployed P2P Systems such as:
1. Napster
2. Gnutella

EL
3. Fasttrack
4. BitTorrent
PT
We have also discussed some of the P2P Systems with
N
Provable Properties such as:
1. Chord
2. Pastry
3. Kelips
Cloud Computing and DistributedVuSystems
Pham P2P Systems in Cloud Computing
MapReduce

EL
PT
N
Dr. Rajiv Misra
Associate Professor
Dept. of Computer Science & Engg.
Indian Institute of Technology Patna
[email protected]
Cloud Computing and DistributedVuSystems
Pham MapReduce
Preface
Content of this Lecture:

In this lecture, we will discuss the ‘MapReduce

EL
paradigm’ and its internal working and
implementation overview.

PT
We will also see many examples and different
N
applications of MapReduce being used, and look into
how the ‘scheduling and fault tolerance’ works inside
MapReduce.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Introduction
MapReduce is a programming model and an associated
implementation for processing and generating large data
sets.

EL
Users specify a map function that processes a key/value
pair to generate a set of intermediate key/value pairs, and
PT
a reduce function that merges all intermediate values
associated with the same intermediate key.
N
Many real world tasks are expressible in this model.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Contd…
Programs written in this functional style are automatically
parallelized and executed on a large cluster of commodity
machines.
The run-time system takes care of the details of partitioning the
input data, scheduling the program's execution across a set of

EL
machines, handling machine failures, and managing the required
inter-machine communication.

PT
This allows programmers without any experience with parallel and
distributed systems to easily utilize the resources of a large
distributed system.
N
A typical MapReduce computation processes many terabytes of
data on thousands of machines. Hundreds of MapReduce
programs have been implemented and upwards of one thousand
MapReduce jobs are executed on Google's clusters every day.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Distributed File System
Chunk Servers
File is split into contiguous chunks
Typically each chunk is 16-64MB
Each chunk replicated (usually 2x or 3x)

EL
Try to keep replicas in different racks

Master node
PT
Also known as Name Nodes in HDFS
Stores metadata
N
Might be replicated

Client library for file access


Talks to master to find chunk servers
Connects directly to chunkservers to access data
Cloud Computing and DistributedVuSystems
Pham MapReduce
Motivation for Map Reduce (Why)

Large-Scale Data Processing


Want to use 1000s of CPUs

EL
But don’t want hassle of managing things

PT
MapReduce Architecture provides
Automatic parallelization & distribution
N
Fault tolerance
I/O scheduling
Monitoring & status updates

Cloud Computing and DistributedVuSystems


Pham MapReduce
MapReduce Paradigm

EL
PT
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
What is MapReduce?
Terms are borrowed from Functional Language (e.g., Lisp)
Sum of squares:
(map square ‘(1 2 3 4))
Output: (1 4 9 16)

EL
[processes each record sequentially and independently]
(reduce + ‘(1 4 9 16))

PT
(+ 16 (+ 9 (+ 4 1) ) )
Output: 30
N
[processes set of all records in batches]
Let’s consider a sample application: Wordcount
You are given a huge dataset (e.g., Wikipedia dump or all of
Shakespeare’s works) and asked to list the count for each of the
words in each of the documents therein
Cloud Computing and DistributedVuSystems
Pham MapReduce
Map

Process individual records to generate intermediate


key/value pairs.

EL
Key Value

PT
Welcome Everyone
Hello Everyone
Welcome
Everyone
Hello
1
1
1
N
Everyone 1
Input <filename, file text>

Cloud Computing and DistributedVuSystems


Pham MapReduce
Map

Parallelly Process individual records to generate


intermediate key/value pairs.

EL
MAP TASK 1

PT
Welcome Everyone
Hello Everyone
Welcome
Everyone
Hello
1
1
1
N
Everyone 1
Input <filename, file text>

MAP TASK 2

Cloud Computing and DistributedVuSystems


Pham MapReduce
Map

Parallelly Process a large number of individual


records to generate intermediate key/value pairs.

EL
PT
Welcome 1
Welcome Everyone
Everyone 1
Hello Everyone
Hello 1
Why are you here
I am also here Everyone 1
They are also here Why 1
Yes, it’s THEM!
Are 1
The same people we were thinking of
You 1
…….
N
Here 1
…….

Input <filename, file text>


MAP TASKS

Cloud Computing and DistributedVuSystems


Pham MapReduce
Reduce
Reduce processes and merges all intermediate values
associated per key

EL
Key Value
Welcome 1
Everyone
Hello
1
1
PT Everyone
Hello
Welcome
2
1
1
N
Everyone 1

Cloud Computing and DistributedVuSystems


Pham MapReduce
Reduce
• Each key assigned to one Reduce
• Parallelly Processes and merges all intermediate values
by partitioning keys

EL
Welcome 1 REDUCE Everyone 2
Everyone 1
Hello 1
Everyone 1
PT
TASK 1
REDUCE
TASK 2
Hello
Welcome
1
1
N
• Popular: Hash partitioning, i.e., key is assigned to
– reduce # = hash(key)%number of reduce tasks
Cloud Computing and DistributedVuSystems
Pham MapReduce
Programming Model

The computation takes a set of input key/value pairs, and


produces a set of output key/value pairs.

EL
The user of the MapReduce library expresses the
computation as two functions:

(i) The Map


PT
N
(ii) The Reduce

Cloud Computing and DistributedVuSystems


Pham MapReduce
(i) Map Abstraction

Map, written by the user, takes an input pair and produces

EL
a set of intermediate key/value pairs.

The MapReduce library groups together all intermediate


PT
values associated with the same intermediate key ‘I’ and
passes them to the Reduce function.
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
(ii) Reduce Abstraction
The Reduce function, also written by the user, accepts an
intermediate key ‘I’ and a set of values for that key.

EL
It merges together these values to form a possibly smaller
set of values.

PT
Typically just zero or one output value is produced per
Reduce invocation. The intermediate values are supplied to
N
the user's reduce function via an iterator.

This allows us to handle lists of values that are too large to


fit in memory.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Map-Reduce Functions for Word Count

map(key, value):
// key: document name; value: text of document

EL
for each word w in value:
emit(w, 1)

PT
reduce(key, values):
N
// key: a word; values: an iterator over counts
result = 0
for each count v in values:
result += v
emit(key, result)

Cloud Computing and DistributedVuSystems


Pham MapReduce
Map-Reduce Functions

Input: a set of key/value pairs


User supplies two functions:

EL
map(k,v)  list(k1,v1)
reduce(k1, list(v1))  v2

PT
(k1,v1) is an intermediate key/value pair
Output is the set of (k1,v2) pairs
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
MapReduce Applications

EL
PT
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
Applications
Here are a few simple applications of interesting programs that
can be easily expressed as MapReduce computations.
Distributed Grep: The map function emits a line if it matches a
supplied pattern. The reduce function is an identity function that

EL
just copies the supplied intermediate data to the output.
Count of URL Access Frequency: The map function processes

PT
logs of web page requests and outputs (URL; 1). The reduce
function adds together all values for the same URL and emits a
(URL; total count) pair.
N
ReverseWeb-Link Graph: The map function outputs (target;
source) pairs for each link to a target URL found in a page named
source. The reduce function concatenates the list of all source
URLs associated with a given target URL and emits the pair:
(target; list(source))
Cloud Computing and DistributedVuSystems
Pham MapReduce
Contd…
Term-Vector per Host: A term vector summarizes the
most important words that occur in a document or a set
of documents as a list of (word; frequency) pairs.

EL
The map function emits a (hostname; term vector) pair
for each input document (where the hostname is
PT
extracted from the URL of the document).
N
The reduce function is passed all per-document term
vectors for a given host. It adds these term vectors
together, throwing away infrequent terms, and then emits
a final (hostname; term vector) pair

Cloud Computing and DistributedVuSystems


Pham MapReduce
Contd…
Inverted Index: The map function parses each document,
and emits a sequence of (word; document ID) pairs. The
reduce function accepts all pairs for a given word, sorts
the corresponding document IDs and emits a (word;

EL
list(document ID)) pair. The set of all output pairs forms a
simple inverted index. It is easy to augment this

PT
computation to keep track of word positions.
N
Distributed Sort: The map function extracts the key from
each record, and emits a (key; record) pair. The reduce
function emits all pairs unchanged.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Applications of MapReduce
(1) Distributed Grep:

Input: large set of files

EL
Output: lines that match pattern

Map – Emits a line if it matches the supplied


pattern PT
N
Reduce – Copies the intermediate data to output

Cloud Computing and DistributedVuSystems


Pham MapReduce
Applications of MapReduce
(2) Reverse Web-Link Graph:

Input: Web graph: tuples (a, b)

EL
where (page a  page b)

PT
Output: For each page, list of pages that link to it
N
Map – process web log and for each input <source,
target>, it outputs <target, source>
Reduce - emits <target, list(source)>

Cloud Computing and DistributedVuSystems


Pham MapReduce
Applications of MapReduce
(3) Count of URL access frequency:

Input: Log of accessed URLs, e.g., from proxy server


Output: For each URL, % of total accesses for that URL

EL
Map – Process web log and outputs <URL, 1>
Multiple Reducers - Emits <URL, URL_count>

PT
(So far, like Wordcount. But still need %)
Chain another MapReduce job after above one
Map – Processes <URL, URL_count> and outputs
N
<1, (<URL, URL_count> )>
1 Reducer – Does two passes. In first pass, sums up all
URL_count’s to calculate overall_count. In second pass
calculates %’s
Emits multiple <URL, URL_count/overall_count>
Cloud Computing and DistributedVuSystems
Pham MapReduce
Applications of MapReduce
(4) Map task’s output is sorted (e.g., quicksort)
Reduce task’s input is sorted (e.g., mergesort)

Sort

EL
Input: Series of (key, value) pairs
Output: Sorted <value>s

PT
Map – <key, value>  <value, _> (identity)
Reducer – <key, value>  <key, value> (identity)
N
Partitioning function – partition keys across reducers
based on ranges (can’t use hashing!)
• Take data distribution into account to balance
reducer tasks

Cloud Computing and DistributedVuSystems


Pham MapReduce
MapReduce Scheduling

EL
PT
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
Programming MapReduce
Externally: For user
1. Write a Map program (short), write a Reduce program (short)
2. Specify number of Maps and Reduces (parallelism level)
3. Submit job; wait for result

EL
4. Need to know very little about parallel/distributed programming!

Internally: For the Paradigm and Scheduler


1.
2.
Parallelize MapPT
Transfer data from Map to Reduce (shuffle data)
N
3. Parallelize Reduce
4. Implement Storage for Map input, Map output, Reduce input, and
Reduce output
(Ensure that no Reduce starts before all Maps are finished. That is,
ensure the barrier between the Map phase and Reduce phase)
Cloud Computing and DistributedVuSystems
Pham MapReduce
Inside MapReduce
For the cloud:
1. Parallelize Map: easy! each map task is independent of the other!
• All Map output records with same key assigned to same Reduce
2. Transfer data from Map to Reduce:
• Called Shuffle data

EL
• All Map output records with same key assigned to same Reduce task
• use partitioning function, e.g., hash(key)%number of reducers
3. Parallelize Reduce: easy! each reduce task is independent of the other!


output PT
4. Implement Storage for Map input, Map output, Reduce input, and Reduce

Map input: from distributed file system


N
• Map output: to local disk (at Map node); uses local file system
• Reduce input: from (multiple) remote disks; uses local file systems
• Reduce output: to distributed file system
local file system = Linux FS, etc.
distributed file system = GFS (Google File System), HDFS (Hadoop Distributed
File System)

Cloud Computing and DistributedVuSystems


Pham MapReduce
Internal Workings of MapReduce
Map tasks Reduce tasks Output files
1 into DFS

A A I
2

EL
3
4 B B II

5
6
7
PT C
III
N
C
Blocks Servers Servers
from DFS
(Local write, remote read)

Resource Manager (assigns maps and reduces to servers)


Cloud Computing and DistributedVuSystems
Pham MapReduce
The YARN Scheduler
• Used underneath Hadoop 2.x +
• YARN = Yet Another Resource Negotiator
• Treats each server as a collection of containers
Container = fixed CPU + fixed memory

EL

• Has 3 main components


• Scheduling PT
Global Resource Manager (RM)

Per-server Node Manager (NM)


N

• Daemon and server-specific functions


– Per-application (job) Application Master (AM)
• Container negotiation with RM and NMs
• Detecting task failures of that job

Cloud Computing and DistributedVuSystems


Pham MapReduce
YARN: How a job gets a container
Resource Manager
Capacity Scheduler In this figure
• 2 servers (A, B)
• 2 jobs (1, 2)

EL
1. Need
container
Node A
PT
3. Container on Node B

Node Manager A Node B


2. Container Completed

Node Manager B
N
Application Application Task
4. Start task, please!
Master 1 Master 2 (App2)

Cloud Computing and DistributedVuSystems


Pham MapReduce
MapReduce Fault-Tolerance

EL
PT
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
Fault Tolerance
• Server Failure
– NM heartbeats to RM

• If server fails, RM lets all affected AMs know, and AMs take
appropriate action

EL
– NM keeps track of each task running at its server

• If task fails while in-progress, mark the task as idle and


restart it

PT
– AM heartbeats to RM

• On failure, RM restarts AM, which then syncs up with its


running tasks
N
• RM Failure
– Use old checkpoints and bring up secondary RM

• Heartbeats also used to piggyback container requests


– Avoids extra messages

Cloud Computing and DistributedVuSystems


Pham MapReduce
Slow Servers
Slow tasks are called Stragglers

The slowest task slows the entire job down (why?)

EL
Due to Bad Disk, Network Bandwidth, CPU, or Memory

PT
Keep track of “progress” of each task (% done)

Perform backup (replicated) execution of straggler tasks


N
A task considered done when its first replica
complete called Speculative Execution.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Locality
• Locality
– Since cloud has hierarchical topology (e.g., racks)

– GFS/HDFS stores 3 replicas of each of chunks


(e.g., 64 MB in size)

EL
• Maybe on different racks, e.g., 2 on a rack, 1 on a
different rack

PT
– Mapreduce attempts to schedule a map task on

1. a machine that contains a replica of corresponding


N
input data, or failing that,
2. on the same rack as a machine containing the input,
or failing that,
3. Anywhere

Cloud Computing and DistributedVuSystems


Pham MapReduce
Implementation Overview

EL
PT
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
Implementation Overview
Many different implementations of the MapReduce
interface are possible. The right choice depends on the
environment.

EL
For example, one implementation may be suitable for a
small shared-memory machine, another for a large NUMA

PT
multi-processor, and yet another for an even larger
collection of networked machines.
N
Here we describes an implementation targeted to the
computing environment in wide use at Google: large
clusters of commodity PCs connected together with
switched Ethernet.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Contd…
(1) Machines are typically dual-processor x86 processor running
Linux, with 2-4 GB of memory per machine.
(2) Commodity networking hardware is used . Typically either
100 megabits/second or 1 gigabit/second at the machine

EL
level, but averaging considerably less in overall bisection
bandwidth.

PT
(3) A cluster consists of hundreds or thousands of machines, and
therefore machine failures are common.
(4) Storage is provided by inexpensive IDE disks attached
N
directly to individual machines.
(5) Users submit jobs to a scheduling system. Each job consists
of a set of tasks, and is mapped by the scheduler to a set of
available machines within a cluster.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Distributed Execution Overview
The Map invocations are distributed across multiple machines
by automatically partitioning the input data into a set of M
splits.
The input splits can be processed in parallel by different

EL
machines.

PT
Reduce invocations are distributed by partitioning the
intermediate key space into R pieces using a partitioning
function (e.g., hash(key) mod R).
N
The number of partitions (R) and the partitioning function are
specified by the user.
Figure 1 shows the overall flow of a MapReduce operation.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Distributed Execution Overview
User
Program

(1)fork (1) fork (1) fork

(2)assign Master (2) assign

EL
map reduce

Worker (6) write Output

Split 0 (3) read


Split 1 Worker
PT
(4) local
write
Worker

(5) Remote read, sort


File 0
N
Split 2 Output
Worker File 1
Worker

Intermediate
Input Files Map phase Files on Disk Reduce phase Output Files

Cloud Computing and DistributedVuSystems


Pham MapReduce
Sequence of Actions
When the user program calls the MapReduce function, the following
sequence of actions occurs:
1. The MapReduce library in the user program first splits the input
files into M pieces of typically 16 megabytes to 64 megabytes (MB)
per piece. It then starts up many copies of the program on a cluster

EL
of machines.
2. One of the copies of the program is special- the master. The rest are

PT
workers that are assigned work by the master. There are M map
tasks and R reduce tasks to assign. The master picks idle workers
and assigns each one a map task or a reduce task.
N
3. A worker who is assigned a map task reads the contents of the
corresponding input split. It parses key/value pairs out of the input
data and passes each pair to the user-defined Map function. The
intermediate key/value pairs produced by the Map function are
buffered in memory.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Contd…
4. Periodically, the buffered pairs are written to local disk,
partitioned into R regions by the partitioning function.
The locations of these buffered pairs on the local disk are passed
back to the master, who is responsible for forwarding these

EL
locations to the reduce workers.
5. When a reduce worker is notified by the master about these
locations, it uses remote procedure calls to read the buffered

PT
data from the local disks of the map workers. When a reduce
worker has read all intermediate data, it sorts it by the
intermediate keys so that all occurrences of the same key are
N
grouped together.
The sorting is needed because typically many different keys map
to the same reduce task. If the amount of intermediate data is
too large to fit in memory, an external sort is used.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Contd…
6. The reduce worker iterates over the sorted intermediate data
and for each unique intermediate key encountered, it passes
the key and the corresponding set of intermediate values to
the user's Reduce function.

EL
The output of the Reduce function is appended to a final
output file for this reduce partition.

PT
7. When all map tasks and reduce tasks have been completed,
the master wakes up the user program.
N
At this point, the MapReduce call in the user program returns
back to the user code.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Contd…
After successful completion, the output of the mapreduce
execution is available in the R output files (one per reduce
task, with file names as specified by the user).

EL
Typically, users do not need to combine these R output files
into one file- they often pass these files as input to another

PT
MapReduce call, or use them from another distributed
application that is able to deal with input that is partitioned
into multiple files.
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
Master Data Structures
The master keeps several data structures. For each map task
and reduce task, it stores the state (idle, in-progress, or
completed), and the identity of the worker machine (for non-
idle tasks).

EL
The master is the conduit through which the location of

PT
intermediate le regions is propagated from map tasks to
reduce tasks. Therefore, for each completed map task, the
master stores the locations and sizes of the R intermediate file
N
regions produced by the map task.

Updates to this location and size information are received as


map tasks are completed. The information is pushed
incrementally to workers that have in-progress reduce tasks.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Fault Tolerance
Since the MapReduce library is designed to help process very
large amounts of data using hundreds or thousands of
machines, the library must tolerate machine failures
gracefully.

EL
Map worker failure
Map tasks completed or in-progress at worker are reset to
idle
PT
Reduce workers are notified when task is rescheduled on
another worker
N
Reduce worker failure
Only in-progress tasks are reset to idle
Master failure
MapReduce task is aborted and client is notified
Cloud Computing and DistributedVuSystems
Pham MapReduce
Locality
Network bandwidth is a relatively scarce resource in the computing
environment. We can conserve network bandwidth by taking
advantage of the fact that the input data (managed by GFS) is
stored on the local disks of the machines that make up our cluster.
GFS divides each file into 64 MB blocks, and stores several copies of

EL
each block (typically 3 copies) on different machines.
The MapReduce master takes the location information of the input

PT
les into account and attempts to schedule a map task on a machine
that contains a replica of the corresponding input data. Failing that,
it attempts to schedule a map task near a replica of that task's
N
input data (e.g., on a worker machine that is on the same network
switch as the machine containing the data).
When running large MapReduce operations on a significant
fraction of the workers in a cluster, most input data is read locally
and consumes no network bandwidth.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Task Granularity
The Map phase is subdivided into M pieces and the reduce
phase into R pieces.
Ideally, M and R should be much larger than the number of
worker machines.

EL
Having each worker perform many different tasks improves
dynamic load balancing, and also speeds up recovery when a

PT
worker fails: the many map tasks it has completed can be
spread out across all the other worker machines.
There are practical bounds on how large M and R can be, since
N
the master must make O(M + R) scheduling decisions and
keeps O(M * R) state in memory.
Furthermore, R is often constrained by users because the
output of each reduce task ends up in a separate output file.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Partition Function
Inputs to map tasks are created by contiguous splits of
input file

EL
For reduce, we need to ensure that records with the
same intermediate key end up at the same worker

PT
System uses a default partition function e.g.,
hash(key) mod R
N
Sometimes useful to override
E.g., hash(hostname(URL)) mod R ensures URLs
from a host end up in the same output file

Cloud Computing and DistributedVuSystems


Pham MapReduce
Ordering Guarantees
It is guaranteed that within a given partition, the
intermediate key/value pairs are processed in increasing
key order.

EL
This ordering guarantee makes it easy to generate a
sorted output file per partition, which is useful when
PT
the output file format needs to support efficient random
access lookups by key, or users of the output and it
N
convenient to have the data sorted.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Combiners Function (1)
In some cases, there is significant repetition in the intermediate
keys produced by each map task, and the user specified Reduce
function is commutative and associative.

EL
A good example of this is the word counting example. Since
word frequencies tend to follow a Zipf distribution, each map
task will produce hundreds or thousands of records of the form
<the, 1>.
PT
N
All of these counts will be sent over the network to a single
reduce task and then added together by the Reduce function to
produce one number. We allow the user to specify an optional
Combiner function that does partial merging of this data before it
is sent over the network.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Combiners Function (2)
The Combiner function is executed on each machine that
performs a map task.
Typically the same code is used to implement both the
combiner and the reduce functions.

EL
The only difference between a reduce function and a
combiner function is how the MapReduce library handles
PT
the output of the function.
The output of a reduce function is written to the final
N
output file. The output of a combiner function is written
to an intermediate le that will be sent to a reduce task.
Partial combining significantly speeds up certain classes of
MapReduce operations.

Cloud Computing and DistributedVuSystems


Pham MapReduce
MapReduce Examples

EL
PT
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example: 1 Word Count using MapReduce

map(key, value):
// key: document name; value: text of document

EL
for each word w in value:
emit(w, 1)

PT
reduce(key, values):
N
// key: a word; values: an iterator over counts
result = 0
for each count v in values:
result += v
emit(key, result)

Cloud Computing and DistributedVuSystems


Pham MapReduce
Count Illustrated

map(key=url, val=contents):
For each word w in contents, emit (w, “1”)
reduce(key=word, values=uniq_counts):
Sum all “1”s in values list

EL
Emit result “(word, sum)”

see 1 bob 1
see bob run
see spot throw PT bob 1
run 1
run 1
see 2
N
see 1 spot 1
spot 1 throw 1
throw 1

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 2: Counting words of different lengths
The map function takes a value and outputs key:value
pairs.

For instance, if we define a map function that takes a

EL
string and outputs the length of the word as the key and
the word itself as the value then

PT
map(steve) would return 5:steve and
N
map(savannah) would return 8:savannah.

This allows us to run the map function against values in


parallel and provides a huge advantage.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 2: Counting words of different lengths
Before we get to the reduce function, the mapreduce
framework groups all of the values together by key, so if the
map functions output the following key:value pairs:
3 : the

EL
3 : and
They get grouped as:
3 : you
4 : then
4 : what PT 3 : [the, and, you]
4 : [then, what, when]
N
4 : when
5 : [steve, where]
5 : steve 8 : [savannah, research]
5 : where
8 : savannah
8 : research
Cloud Computing and DistributedVuSystems
Pham MapReduce
Example 2: Counting words of different lengths
Each of these lines would then be passed as an argument
to the reduce function, which accepts a key and a list of
values.
In this instance, we might be trying to figure out how many

EL
words of certain lengths exist, so our reduce function will
just count the number of items in the list and output the
key with the size of the list, like:

3:3
PT
N
4:3
5:2
8:2

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 2: Counting words of different lengths

The reductions can also be done in parallel, again providing


a huge advantage. We can then look at these final results
and see that there were only two words of length 5 in the

EL
corpus, etc...

PT
The most common example of mapreduce is for counting
the number of times words occur in a corpus.
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 3: Finding Friends
Facebook has a list of friends (note that friends are a bi-directional
thing on Facebook. If I'm your friend, you're mine).
They also have lots of disk space and they serve hundreds of millions
of requests everyday. They've decided to pre-compute calculations
when they can to reduce the processing time of requests. One

EL
common processing request is the "You and Joe have 230 friends in
common" feature.

PT
When you visit someone's profile, you see a list of friends that you
have in common. This list doesn't change frequently so it'd be
wasteful to recalculate it every time you visited the profile (sure you
N
could use a decent caching strategy, but then we wouldn't be able to
continue writing about mapreduce for this problem).
We're going to use mapreduce so that we can calculate everyone's
common friends once a day and store those results. Later on it's just
a quick lookup. We've got lots of disk, it's cheap.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 3: Finding Friends
Assume the friends are stored as Person->[List of Friends], our
friends list is then:

A -> B C D

EL
B -> A C D E
C -> A B D E
D -> A B C E
E -> B C D
PT
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 3: Finding Friends
For map(A -> B C D) :
(A B) -> B C D
(A C) -> B C D

EL
(A D) -> B C D

For map(B -> A C D E) : (Note that A comes before B in the key)


(A B) -> A C D E
(B C) -> A C D E
PT
N
(B D) -> A C D E
(B E) -> A C D E

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 3: Finding Friends
For map(C -> A B D E) :

(A C) -> A B D E

EL
(B C) -> A B D E And finally for map(E -> B C D):
(C D) -> A B D E
(C E) -> A B D E (B E) -> B C D

PT
For map(D -> A B C E) :
(A D) -> A B C E
(C E) -> B C D
(D E) -> B C D
N
(B D) -> A B C E
(C D) -> A B C E
(D E) -> A B C E

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 3: Finding Friends
Before we send these key-value pairs to the reducers, we
group them by their keys and get:

(A B) -> (A C D E) (B C D)

EL
(A C) -> (A B D E) (B C D)
(A D) -> (A B C E) (B C D)

PT
(B C) -> (A B D E) (A C D E)
(B D) -> (A B C E) (A C D E)
N
(B E) -> (A C D E) (B C D)
(C D) -> (A B C E) (A B D E)
(C E) -> (A B D E) (B C D)
(D E) -> (A B C E) (B C D)

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 3: Finding Friends
Each line will be passed as an argument to a reducer.

The reduce function will simply intersect the lists of values

EL
and output the same key with the result of the intersection.

PT
For example, reduce((A B) -> (A C D E) (B C D))
will output (A B) : (C D)
and means that friends A and B have C and D as common
N
friends.

Cloud Computing and DistributedVuSystems


Pham MapReduce
Example 3: Finding Friends
The result after reduction is:
(A B) -> (C D)
(A C) -> (B D)

EL
(A D) -> (B C)
(B C) -> (A D E)
Now when D visits B's profile,
(B D) -> (A C E)
(B E) -> (C D) PT we can quickly look up (B D) and
see that they have three friends
N
(C D) -> (A B E) in common, (A C E).
(C E) -> (B D)
(D E) -> (B C)

Cloud Computing and DistributedVuSystems


Pham MapReduce
Reading
Jeffrey Dean and Sanjay Ghemawat,

“MapReduce: Simplified Data Processing on Large

EL
Clusters”

PT
http://labs.google.com/papers/mapreduce.html
N

Cloud Computing and DistributedVuSystems


Pham MapReduce
Conclusion
The MapReduce programming model has been successfully used
at Google for many different purposes.

The model is easy to use, even for programmers without

EL
experience with parallel and distributed systems, since it hides the
details of parallelization, fault-tolerance, locality optimization, and
load balancing.

PT
A large variety of problems are easily expressible as MapReduce
N
computations.

For example, MapReduce is used for the generation of data for


Google's production web search service, for sorting, for data
mining, for machine learning, and many other systems.
Cloud Computing and DistributedVuSystems
Pham MapReduce
Conclusion

Mapreduce uses parallelization + aggregation to


schedule applications across clusters

EL
Need to deal with failure

PT
Plenty of ongoing research work in scheduling and
N
fault-tolerance for Mapreduce and Hadoop.

Cloud Computing and DistributedVuSystems


Pham MapReduce

You might also like