0 ratings 0% found this document useful (0 votes) 22 views 57 pages Chapter 6
Chapter 6 of 'Distributed and Cloud Computing' discusses Cloud Programming and Software Environments, focusing on the MapReduce model for large-scale data processing. It explains how MapReduce simplifies data processing across distributed systems by utilizing user-defined functions for mapping and reducing data, and highlights its applications in various fields such as search indexing and data mining. The chapter also covers the architecture of MapReduce, fault tolerance mechanisms, and the Hadoop framework as an implementation of this model.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content,
claim it here .
Available Formats
Download as PDF or read online on Scribd
Go to previous items Go to next items
Distributed and Cloud Computing
K. Hwang, G. Fox and J. Dongarra
Chapter 6: Cloud Programming
and Software Environments
ela a
Adapted from Kai Hwang, University of Southern California
TTT lel tim UCD}
Matei Zaharia, EECS, UC Berkeley
November 25, 2012
Cr Uae ad ry asaParallel Computing and Programming
Enviroments
e MapReduce
e Hadoop
e Amazon Web ServicesWhat is MapReduce?
e Simple data-parallel programming model
e For large-scale data processing
> Exploits large set of commodity computers
> Executes process in distributed manner
pm Ohi -1e-M ale) a= NZTICLe)I LANs
e Pioneered by Google
> Processes 20 petabytes of data per day
e Popularized by open-source Hadoop project
» Used at Yahoo!, Facebook, Amazon, ...What is MapReduce used for?
e At Google:
» Index construction for Google Search
> Article clustering for Google News
» Statistical machine translation
e At Yahoo!:
> “Web map” powering Yahoo! Search
> Spam detection for Yahoo! Mail
e At Facebook:
> Data mining
> Ad optimization
>» Spam detectionMotivation: Large Scale Data Processing
e Many tasks composed of processing lots of
data to produce lots of other data
e Want to use hundreds or thousands of CPUs
.. but this needs to be easy!
e MapReduce provides
>
va
v
User-defined functions
Automatic parallelization and distribution
Lar Oise) (ie lal)
1/0 scheduling
Status and monitoringWhat is MapReduce used for?
e Inresearch:
Astronomical image analysis (Washington)
Bioinformatics (Maryland)
Analyzing Wikipedia conflicts (PARC)
Natural language processing (CMU)
Particle physics (Nebraska)
Ocean climate simulation (Washington)
vVvVvVVVYV Vv
NaDistributed Grep
— grep —-
Very [Spit data ee matches |
em Mesiaecew —- grep —- ad] in athe |
data
. .
Syigecicy — grep —~
grep is a command-line utility for searching plain-text data sets for lines
matching a regular expression.
cat is a standard Unix utility that concatenates and lists filesDistributed Word Count
Split data Feemuellioee| count |
Very Split data eee count | "4
merge
big [geal Spit data PEC count MMC co cnr
data . 2
Fy Fy
EEIEEE —- count—Very
big
data
Partitioning
Function
IV ETeR
Accepts input
key/value pair
Emits intermediate
key/value pair
Reduce :
Accepts
intermediate
key/value* pair
Emits output
key/value pair
9Architecture overview
Master node
ig
ESET alele AN
Slave node SEV allel 4GFS: underlying storage system
e Goal
> global view
>» make huge files available in the face of node failures
e Master Node (meta server)
> Centralized, index all chunks on data servers
e Chunk server (data server)
> File is split into contiguous chunks, typically 16-
Ee
> Each chunk replicated (usually 2x or 3x).
Try to keep replicas in different racks.GFS architecture
Chunkserver 1 Chunkserver 2 Chunkserver NFunctions in the Model
Map
>» Process a key/value pair to generate intermediate
key/value pairs
Reduce
> Merge all intermediate values associated with the
Tals
Partition
> Bydefault: hash(key) mod R
> Well balancedProgramming Concept
Map
> Perform a function on individual values in a data
set to create a new list of values
> Example: square x = x * x
map square [1,2,3,4,5]
returns [1,4,9, 16,25]
Reduce
> Combine values in a data set to create a new
value
>» Example: sum = (each elem in arr, total +=)
reduce [1,2,3,4,5]
returns 15 (the sum of the elements)Data
data data data
a data data data
data data data data data
data data data data desta
data data data
adata
data data data data data
data data data
ata data
data data data data data
data data data
adata
Compute Cluster
[0 i |
[ Map |Map worker 1 iilap worker Map worker
Input split Input split Input split
Input flo
parttioning
6 Fa eee
[ki ky ket Kay] ty ky kv Ky) Kaw Way Wav ka
I I T I
Soran SO Soran ora
e é
Pantonng
Ry
Caer]
7 Synechronization
‘communication
Re
Way Hay Kav Fay av Riv Rov Ki
[_Sarans 90a —] [eerendaoer | | song ans rousing
aa ava ThnwRay
Fig.6.5
Dataflow
Implementation ba
of MapReduce Sapa Caputo ouput
Racice worker Reduce worker
Reduce
ee Ya ea aeA Simple Example
a Counting words in a large set of documents
eC une RCC)
UR et ULL
Jvalue: document contents
for each word w in value
Te eure ei an
Beer mies men cere cK Ay
plus an associated count of occurrences
(just a “1” is recorded in this
pseudo-code)
eee CALC Cm CL) )
IIkey: word OU e co SRO oelE
Ea eed counts emitted for a particular word
aC ea Uy
for each v in values
result += Parselnt(v);
Emit(AsString(result));‘Most people
Intermediate (key, val) pairs
nore most poetry
(ignore, 1) Unique keys s
ignores, 1
® ignores (nore) —pf-=-—\ > [Gignore,
(most, 1) (most. 1) Genores, 1) —op 1]
(poetry. 1) (enost. 1) (vost, 1.11)
(most.1) Sort (most. 1) Group 12,11
(poetty.1) (people, 1) ‘eee. tty
j+~— ianores.1) (people. 1) (oety.331)
Reduce”
(poetry, 2
/ (most, 1) (poetry, 1)
(people, 1) (poetry, 1)
MapReduce
Most poetry library
ignores most people
rT Saad 18mop task
AGURE 6.6
YE ToM ere oe Rolle oR ece Mie MUR mole
ei BUR esc R OB coo
Ree ete ae Race D gu Cun hae cuus come ooo
eeee Se sin tuna aC Relay
R and the partitioning function are specified by the programmer.
rr)MapReduce : Operation Steps
When the user program calls the MapReduce function, the
following sequence of actions occurs :
1) The MapReduce library in the user program first splits the
input files into M pieces - 16 megabytes to 64 megabytes
(MB) per piece. It then starts up many copies of program on a
cluster of machines.
2) One of the copies of program is master. The rest are workers
that are assigned work by the master.
F)
20MapReduce : Operation Steps
3) A worker who is assigned a map task :
= reads the contents of the corresponding input split
= parses key/value pairs out of the input data and passes each
pair to the user - defined Map function.
The intermediate key/value pairs produced by the Map function
are buffered in memory.
4) The buffered pairs are written to local disk, partitioned into R
regions by the partitioning function.
The location of these buffered pairs on the local disk are
passed back to the master, who forwards these locations to the
reduce workers.
Py]
aMapReduce : Operation Steps
5) When a reduce worker is notified by the master about
these locations, it reads the buffered data from the local
lS ela la oR ele
When a reduce worker has read all intermediate data, it
SoMa NA MN Uefa ROM re 1a-] Melee let]
of the same key are grouped together.
6) The reduce worker iterates over the sorted
intermediate data and for each unique intermediate key,
it passes the key and the corresponding set of
intermediate values to the user's Reduce function.
The output of the Reduce function is appended to a
Lilar=|meleleol0l mil (om
v2
FrMapReduce : Operation Steps
7) When all map tasks and reduce tasks have been
completed, the master wakes up the user program.
At this point, MapReduce call in the user program
returns back to the user code.
After successful completion, output of the mapreduce
execution is available in the R output files.
23
23Logical Data Flow in 5 Processing
Steps in MapReduce Process
Intermediate (key, val) pairs
Line offset Line content Unique keys
(key!, (val, val. .J) ——of
(keyS, val)
(key8, val)
Output
Input MapReduce
library
ee Ya ea ae Py reyLocality issue
e Master scheduling policy
> Asks GFS for locations of replicas of input file blocks
> Map tasks typically split into 64MB (== GFS block
742)
> Map tasks scheduled so GFS input block replica are
Co) gt aN MATA MOST AML 8L.6
e Effect
>» Thousands of machines read input at local disk
speed
> Without this, rack switches limit read rate
25Fault Tolerance
e Reactive way
Pan mca)
- Heartbeat, Workers are periodically pinged by master
« NO response = failed worker
« If the processor of a worker fails, the tasks of that worker
are reassigned to another worker.
> Master failure
= Master writes periodic checkpoints
= Another master can be started from the last checkpointed
Bei
= If eventually the master dies, the job will be aborted
26Fault Tolerance
e Proactive way (Redundant Execution)
oa
The problem of “stragglers” (slow workers)
= Other jobs consuming resources on machine
= Bad disks with soft errors transfer data very slowly
= Weird things: processor caches disabled (!!)
When computation almost done, reschedule in-
progress tasks
Whenever either the primary or the backup
executions finishes, mark it as completed
rgFault Tolerance
e Input error: bad records
>
Map/Reduce functions sometimes fail for particular
inputs
Best solution is to debug & fix, but not always
possible
On segment fault
= Send UDP packet to master from signal handler
= Include sequence number of record being processed
Skip bad records
= If master sees two failures for same record, next worker is
told to skip the record
28Status monitor
MapReduce status: MR_Indexer-beta6-large-2003_10_28 00_03
Fei Now 7 0951.07 2003 -- up Or 37 min 01 see
ers, 1 death
‘Type _|Shards| Done| Active Input(MB)|Done(MB) | Output(MB)
rl
‘Shulle
Reduce!
i
i
E
calle
Reduce shard ee
29Points need to be emphasized
e No reduce can begin until map is complete
e Master must communicate locations of
intermediate files
e Tasks scheduled based on location of data
e If map worker fails any time before reduce
finishes, task must be completely rerun
e MapReduce library does most of the hard work
for us!Other Examples
Distributed Grep:
>» Map function emits a line if it matches a supplied pattern.
» Reduce function is an identity function that copies the supplied
intermediate data to the output.
er ela O ae ee
> Map function processes logs of web page requests and outputs ,
» Reduce function adds together all values for the same URL, emitting pairs.
Reverse Web-Link graph; e.g., all URLs with reference to http://dblab.usc.edu:
> Map function outputs for each link to a tgt in a page named src,
>» Reduce concatenates the list of all src URLS associated with a given tgt
URL and emits the pair: .
Inverted Index; e.g., all URLs with 585 as a word:
» Map function parses each document, emitting a sequence of ,
>» Reduce accepts all pairs for a given word, sorts the corresponding doc_IDs
and emits a pair.
> Set of all output pairs forms a simple inverted index.
TTMapReduce Implementations
elt i
1, Google
2, Apache Hadoop
MapReduce
Py SESE
yy
A if
Yr I
TT fete) el SO a
Phoenix @ stanford GPU,
Mars@HKUSTHadoop : software platform originally developed by Yahoo
enabling users to write and run applications over vast
distributed data.
Attractive Features in Hadoop :
= Scalable : can easily scale to store and process petabytes of
data in the Web space
= Economical : An open-source MapReduce minimizes the
overheads in task spawning and massive data communication.
| sion acs iea emai mea ke meted olse|
across a large number of commodity nodes
@ Reliable : Automatically maintains multiple copies of data to
facilitate redeployment of computing tasks on failures
Ce Ea ee ee Fr} Ped)<+— gigabit
<— > I gigabit
40 nodes/rack, 1000-4000 nodes in cluster
1 Gbps bandwidth within rack, 8 Gbps out of rack
Node specs (Yahoo terasort):
8 x 2GHz cores, 8 GB RAM, 4 disks (= 4 TB?)Challenges
Cheap nodes fail, especially if you have many
1. Mean time between failures for 1 node = 3 years
2. Mean time between failures for 1000 nodes = 1 day
3. Solution: Build fault-tolerance into system
Commodity network = low bandwidth
1. Solution: Push computation to the data
Programming distributed systems is hard
1. Solution: Data-parallel programming model: users
write “map” & “reduce” functions, system distributes
aCe) a= Lao areal (xm cole Les)
cryHadoop Components
e Distributed file system (HDFS)
> Single namespace for entire cluster
>» Replicates data 3x for fault-tolerance
e MapReduce framework
> Executes user jobs specified as “map” and
“reduce” functions
>» Manages work distribution & fault-tolerance
cidHadoop Distributed File System
e Files split into 128MB
Iollele ea Namenode
e Blocks replicated across Ler
several datanodes (usually
Kc)
e Single namenode stores
metadata (file names, block cE E& P|
locations, etc)
_ a
e Optimized for large files,
Eyeto (0 (vate 1 mace le se
Yen zel stele)
e Files are append-onlyCluster
> 4
JobTracker
MapReduce|
engnie
NameNode
FIGURE 6.11
HDFS and MapReduce architecture in Hadoop.
SsSecure Query Processing with
Hadoop/MapReduce
Se es a a a
7)Higher-level languages over
Hadoop: Pig and HiveMotivation
e Many parallel algorithms can be expressed by
a series of MapReduce jobs
e But MapReduce is fairly low-level: must think
about keys, values, partitioning, etc
e Can we capture common “job building blocks”?Pig
Started at Yahoo! Research
Runs about 30% of Yahoo!’s jobs
mere lK os
4
2
>
Expresses sequences of MapReduce jobs
Data model: nested “bags” of items
Provides relational (SQL) operators (JOIN, GROUP
BY, etc)
Easy to plug in Java functions
Pig Pen development environment for EclipseAn Example Problem
Suppose you have
user data in one file,
page view data in
another, and you need
to find the top 5 most
visited pages by users
aged 18 - 25.
ee ene
Th
ry
Lee ne entedIn MapReduceIn Pig Latin
Users
Lani -Ta-10)
Py
Joined
Grouped
Summed
Sorted
Top5S
Ee 001-1 |=] <-D
See elon Ut 1a ob
age >= 18 ©) age <= 25;
= load ‘pages’ as (user, url);
= ©.) Filtered >. name, Pages ©. user;
EamenmenerOnmns (01 11-10 MORIN [an ed
Serene C1 0) ] (10 MCU SUSRMORE 401 0)0] 09
coun’ (Joined) >= clicks;
EenmerenammsY©((|1|(-1¢ ORIN cn OK cL 4ST
= 9.— Sorted 5;
store TopS into ‘topSsites’;
ry
ee eee ee LT ee MeL mente titeEase of Translation
Notice how naturally the components of the job translate into Pig Latin.
Users = (000 ..
Lis ho el(e
oY -4-5 ee
Joined = ©...
eK = group ...
i T0111 (=10 eo @ Po
sorted order ..
TopS = limit ..
—
Ca
ee ee ee ee LT ee MeL nem tee iadEase of Translation
Notice how naturally the components of the job translate into Pig Latin.
*Users = load ...
+Filtered = filter ..
Pages = load ...
Joined = ©...
.——— Grouped = group ..
_—Summed = .. (00 (+).
~ Sorted = order ..
Da eee
RI) 4
Job 3
Cee ee ae aoa oe ee eeim INVA)
Developed at Facebook
Used for majority of Facebook jobs
“Relational database” built on Hadoop
Maintains list of table schemas
SQL-like query language (HQL)
Can call Hadoop Streaming scripts from HQL
Supports table partitioning, clustering, complex
data types, some optimizations
ana
Na
ASample Hive Queries
+ Find top 5 pages visited by users aged 18-25:
SELECT p.url, COUNT(1) as clicks
FROM users u JOIN page views p ON (u.name = p.user)
WHERE u.age >= 18 AND u.age <= 25
GROUP BY p.url
ORDER BY clicks
te es
+ Filter page views through Python script:
SELECT TRANSFORM(p.user, p.date)
USING ‘map_script.py’
AS dt, uid CLUSTER BY dt
FROM page_views p;
7)Amazon Elastic MapReduce
e Provides a web-based interface and command-
line tools for running Hadoop jobs on Amazon
|= 072
Data stored in Amazon S3
Monitors job and shuts down machines after
use
e Small extra charge on top of EC2 pricing
e If you want more control over how you Hadoop
runs, you can launch a Hadoop cluster on EC2
manually using the scripts in src/contrib/ec2
orElastic MapReduce Workflow
Create a New Job Flow
Creating a job low to process your data using Amazon Elastic MapReduce is simple and quick. Let's begin by giving your Job low a name:
land selecting ts type. If you don't already have an application you'd ike to run on Amazon Elastic MapReduce, samples are availabe to
help you get stated.
Job Flow Name*: ly Job Flow
Types: @ Streaming
O custom Jar
O Pig Program
Q Sample Applications
Wor our Svaning” Word cur se Pinon ppeaton tht cours ecureces lech nordElastic MapReduce Workflow
Create a New Job Flow
Specty Mapper and Reducer functions to run within the ob Flow. The mapper and reducers may be ether (1) ass names refering toa
mapper or reducer cass in Hadoop or (i) locations in Amazon $3. (Ck Mere fora list of avaiable tools to help you upload and download
fle from Amazon S3.) The format for spectying aloction in Amazon $3 ls bucket-name/path. name. Te locton should pont to an,
‘executable program, for example a python program. Extra arguments are passed tthe Hadoop streaming program and ca specty things
Such as adaiona fas tobe loaded into the dtibuted cache
Input Location*: ‘laslcmapreducelsamploslwordcounVinpat
‘Output Location: '/wordcourl otpu/2008-08-79
Mapper*: lasticmapreducelsamplesiworcourtiworSpiier py
Reducer*: aggregate
extra Args:Create a New Job Flow
Enter the number and typeof E You'd tke to run your Job flow on.
Number of instances:
‘Type of Instance: (Snail i
cy‘Amazon ec2 | Sispmegues —_CbudrontConclusions
MapReduce programming model hides the complexity of
work distribution and fault tolerance
Principal design philosophies:
> Make it scalable, so you can throw hardware at problems
> Make it cheap, lowering hardware, programming and admin
costs
MapReduce is not suitable for all problems, but when it
works, it may save you quite a bit of time
Cloud computing makes it straightforward to start using
Hadoop (or other parallel software) at scale
cyResources
Hadoop: http://hadoop.apache.org/core/
Pig: http://hadoop.apache.org/pig
Hive: http://hadoop.apache.org/hive
Video tutorials: http://www.cloudera.com/hadoop-training
Amazon Web Services: http://aws.amazon.com/
Amazon Elastic MapReduce guide:
http://docs.amazonwebservices.com/ElasticMapReduce/lat
est/GettingStartedGuide/
cg