0% found this document useful (0 votes)
16 views90 pages

Module2 D MapReduceParadigm

MapReduce Steps

Uploaded by

Siddhant Jha
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPTX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
16 views90 pages

Module2 D MapReduceParadigm

MapReduce Steps

Uploaded by

Siddhant Jha
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPTX, PDF, TXT or read online on Scribd
You are on page 1/ 90

Map Reduce

Other Algorithms using MR

Big Data Analytics - Module


2D
OVERVIEW OF THE CHAPTER
• Algorithms Using MapReduce:
• Matrix-Vector Multiplication by MapReduce
• Relational-Algebra Operations, Computing
Selections by MapReduce, Computing
Projections by MapReduce, Union, Intersection,
and Difference by MapReduce, Computing
Natural Join by MapReduce, Grouping and
Aggregation by MapReduce
• Matrix Multiplication, Matrix Multiplication with
One MapReduce Step .
• Illustrating use of MapReduce with use of real
life databases and applications.
SORTING IN MAP REDUCE
• in MapReduce, during the
shuffle and sort phase, all the
keys from all mappers are:
• Collected across the cluster.
• Globally sorted by key
(within the Hadoop
framework).
• Partitioned by key ranges
and sent to the correct
reducer.
SORTING IN MAP REDUCE
• Each mapper first sorts its local
output by key.
• Then the system does a shuffle,
moving keys to the appropriate
reducer.
• Example:
• Reducer 1 gets keys 1–100,
• Reducer 2 gets 101–200, etc.
• During this shuffle, the framework
ensures that keys are sorted before
being fed into each reducer.
SORTING
• Sorting is global with respect to reducers:
• If you want the entire dataset in one sorted
list → use one reducer.
• If you use multiple reducers, each reducer’s
input is sorted locally, but the final output is
range-partitioned sorted chunks.
• Example with 2 reducers:
• Reducer 1 → sorted list of numbers 1–100
• Reducer 2 → sorted list of numbers 101–
200
• Final output = concatenation of these
sorted partitions = globally sorted list.
EXTREMELY LARGE KEY
SETS
• Sorting at the Mapper Side
(Local Sort)
• Each mapper writes its output (key,
value pairs) to a buffer in memory.
• When the buffer fills, Hadoop spills the
data to disk in sorted order (called spill
files).
• At the end, the mapper merges these
spill files into one sorted mapper output
file.
• So before shuffling even begins,
mapper outputs are already locally
sorted.
EXTREMELY LARGE KEY
SETS
• Shuffle (Data Transfer Across
Network)
• Keys are partitioned so each reducer
gets a range of keys.
• E.g., Reducer 1 gets [1–1000],
• Reducer 2 gets [1001–2000].
• Data is copied over the network from
mappers to reducers.
• While data is copied, reducers
merge-sort the incoming sorted
streams from different mappers.
EXTREMELY LARGE KEY
SETS
• External Merge-Sort at
Reducer
• Reducers don’t load everything into
memory at once.
• Instead, they use external sorting:
• Merge sorted chunks as they arrive (like
merge sort on disk).
• Only keep enough in memory to merge
efficiently.
• This way, Hadoop can handle terabytes
of keys without exceeding memory.
EXTREMELY LARGE KEY
SETS
• Example
• Suppose we want to sort 10 billion
numbers:
• Each mapper sorts its chunk (say 100
million numbers).
• Reducer 1 merges all chunks for keys [1–
500M],
• Reducer 2 for [500M–1B], etc.
• Final outputs are sorted partitions.
• Concatenating them = globally sorted
list.
MATRIX – VECTOR MULTIPLICATION
Multiply M = (mij) (n × n matrix) & v = (vi) (n-
vector) nXn

n
M v

Case 1: Large n, M does not fit into main memory, but v does
 Since v fits into main memory, v is available to every map task
 Map: for each matrix element mij, emit key value pair (i, mijvj)
 Shuffle and sort: groups all mijvj values together for the same i
 Reduce: sum mijvj for all j for the same i
HOW THE VECTOR IS REPLICATED:

• Distributed Cache/Broadcast: Before the


MapReduce job begins, vector is placed in a
distributed cache (Hadoop's distributed cache
or Spark's broadcast variables).
• This cache ensures that each mapper has a
local copy of the vector.
• Local Access in Mapper: Each mapper
accesses the vector locally from the cache,
avoiding the need for repeated network
communication.
• The reduce phase aggregates the results to
produce the final output vector.
WHY DATA MOVEMENT IS MANAGEABLE:
• Small Vector Size:
• Vectors are usually much smaller than the matrix itself.
• Broadcasting Efficiency:
• Broadcasting a small vector across the nodes involves
relatively little data movement compared to the overall
size of the matrix.
• The overhead of transmitting this small amount of data
to each node is usually negligible, especially in
distributed systems designed to handle such operations
efficiently.
• One-Time Cost:
• Single Broadcast: The vector is typically broadcast
only once at the start of the MapReduce job
• Distributed Cache:
• Systems like Hadoop use a distributed cache to
distribute the vector, ensuring that each node has a
local copy. This minimizes the need for repeated data
movement during the map operations.
MATRIX – VECTOR MULTIPLICATION
This much will fit into main
memory

This whole chunk does not fit in


main memory anymore

Case 2: Very large n, even v does not fit into main memory
 For every map, many accesses to disk (for parts of v) required!
 Solution:
– How much of v will fit in?
– Partition v and rows of M so that each partition of v fits into memory
– Take dot product of one partition of v and the corresponding partition of M
– Map and reduce same as before
Color = stripe.
Each stripe of
chunk matrix divided up
into chunks.

X
MATRIX-VECTOR MULTIPLICATION MR
CODE
• map(key, value):
• for (i, j, a_ij) in value:
• emit(i, a_ij * v[j])
• reduce(key, values):
• result = 0
• for value in values:
• result += value
• emit(key, result)
RELATIONAL ALGEBRA
• Primitives
• Projection ()
• Selection ()
• Cartesian product ()
• Set union ()
• Set difference ()
• Rename ()
• Other operations
• Join (⋈)
• Group by… aggregation
•…
RELATIONAL ALGEBRA
• R, S - relation
• t, t’ - a tuple
• C - a condition of selection
• A, B, C - subset of attributes
• a, b, c - attribute values for a given subset of
attributes
• Relations (however big) can be stored in
a distributed filesystem – If they don’t fit
in a single machine, they’re broken into
pieces (think HDFS)
BIG DATA ANALYSIS
• Peta-scale datasets are everywhere:
• Facebook has 2.5 PB of user data + 15 TB/day
(4/2009)
• eBay has 6.5 PB of user data + 50 TB/day (5/2009)
• …
• A lot of these datasets are (mostly) structured
• Query logs
• Point-of-sale records
• User data (e.g., demographics)
• …
• How do we perform data analysis at scale?
• Relational databases and SQL
• MapReduce (Hadoop)
Relational Databases Vs.
MapReduce
• Relational databases:
• Multipurpose: analysis and transactions; batch
and interactive
• Data integrity via ACID transactions
• Lots of tools in software ecosystem (for
ingesting, reporting, etc.)
• Supports SQL (and SQL integration, e.g., JDBC)
• Automatic SQL query optimization
• MapReduce (Hadoop):
• Designed for large clusters, fault tolerant
• Data is accessed in “native format”
• Supports many query languages
• Programmers retain control over performance
• Open source
Selection Using Mapreduce
• Map: For each tuple t in R, test if t satisfies
C. If so, produce the key-value pair (t, t).
• Reduce: The identity function. It simply
passes each key-value pair to the output.

R1

R2
R1
R3

R4  R3

R5
PROJECTION

R1 R1

R2 R2

R3 R3

R4
 R4

R5 R5
Union Using Mapreduce
• Suppose R and S have the same
schema
• Map tasks are generated from
chunks of both R and S
INTERSECTION (R ∩ S)
DIFFERENCE (R-S)
GROUPING AND AGGREGATION USING
MAPREDUCE
• Group and aggregate on a R
relation R(A,B) using A B

aggregation function γ(B), x 2


y 1
group by attribute A
z 4
• Map: z 1
• For each tuple t = (a,b) of x 5
R, emit key value pair (a,b)
select A, sum(B) from
• Reduce: R group by A;

• For all group {(a,b1), …,


A SUM(B)
(a,bm)} represented by a
x 7
key ‘a’, apply γ to obtain ba
y 1
= b 1 + … + bm
z 5
• Output (a,ba)
GROUP BY… AGGREGATION - 2
• Example: What is the average
time spent per URL?
• In SQL:
• SELECT url, AVG(time) FROM visits GROUP
BY url
• In MapReduce:
• Map over tuples, emit time, keyed by url
• MR automatically groups values by keys
• Compute average in reducer
MAP-REDUCE EXAMPLE: AGGREGATION
• Compute Avg of B for each
distinct value of A

A B C
Reducer 1
R1 1 10 12 (1, 10)
(2, 20) (1, [10, 10, 30, 20]) (1, 17.5)
R2 2 20 34 MAP 1
(1, 10)
R3 1 10 22
R4 1 30 56
Reducer 2
R5 3 40 17 (1, 30)
MAP 2 (3, 40) (2, 10) (2, 15)
R6 2 10 49 (2, 10) (3, 40) (3, 40)
(1, 20)
R7 1 20 44
Grouping & Aggregation Summary
NATURAL JOIN
MAP-REDUCE EXAMPLE : JOIN
 Select R.A, R.B, S.D where
R.A==S.A
A B C
Reducer 1
R1 1 10 12 (1, [R, (1, 10, 30)
R2 2 20 34 10]) (1, 10, 30)
(2, [R, (1, [(R, 10), (R, 10), (1, 10, 20)
R3 1 10 22 MAP 1 (R, 30), (S, 20)] ) (1, 10, 20)
20])
R4 1 30 56 (1, [R,
R5 3 40 17 10])
(1, [R,
30])
A D E (3, [R, Reducer 2
S1 1 20 22 40])
(1, [S, (2, [(R, 20), (S, 30),
S2 2 30 36 20]) (S, 10)] ) (2, 20, 30)
MAP 2
S3 2 10 29
(2, [S, (3, [(R, 40), (S, 50), (2, 20, 10)
(3, 40, 50)
30]) (S, 40)]
S4 3 50 16 (3, 40, 40)
(2, [S,
S5 3 40 37 10])
(3, [S,
50])
Natural Join Using

Mapreduce
Join R(A,B) with S(B,C) on attribute B
R
• Map:
• For each tuple t = (a,b) of R, emit key A B
value pair (b,(R,a)) x a
• For each tuple t = (b,c) of S, emit key
y b
value pair (b,(S,c))
z c
• Reduce:
w d
• Each key b would be associated with a
list of values that are of the form (R,a)
or (S,c)
S
• Construct all pairs consisting of one with
first component R and the other with
first component S , say (R,a ) and (S,c ). B C
The output from this key and value list is a 1
a sequence of key-value pairs c 3
• The key is irrelevant. Each value is one
d 4
of the triples (a, b, c ) such that (R,a )
and (S,c) are on the input list of values g 7
Need For High-level Languages
• Hadoop is great for large-data processing!
• But writing Java programs for everything
is verbose and slow
• Analysts don’t want to (or can’t) write
Java
• Solution: develop higher-level data
processing languages
• Hive: HQL is like SQL
• Pig: Pig Latin is a bit like Perl
HIVE AND PIG
• Hive: data warehousing application in Hadoop
• Query language is HQL, variant of SQL
• Tables stored on HDFS as flat files
• Developed by Facebook, now open source
• Pig: large-scale data processing system
• Scripts are written in Pig Latin, a dataflow
language
• Developed by Yahoo!, now open source
• Roughly 1/3 of all Yahoo! internal jobs
• Common idea:
• Provide higher-level language to facilitate large-
data processing
• Higher-level language “compiles down” to
Hadoop jobs
Matrix Multiplication Using Mapreduce 1

n l l
A C
m
(m × n)
n B = m
(m × l)
(n × l)

 Think of a matrix as a relation with three attributes


 For example matrix A is represented by the relation A(I, J, V)
– For every non-zero entry (i, j, aij), the row number is the value of I,
column number is the value of J, the entry is the value in V
– Also advantage: usually most large matrices would be sparse, the
relation would have less number of entries
 The product is ~ a natural join followed by a grouping with
aggregation
MATRIX MULTIPLICATION USING
MR 2
n l l
A
m n
B = C
(m × n) m
(n × l) (m × l)
(i, j, aij)
(j, k, bjk)

 Natural join of (I,J,V) and (J,K,W)  tuples (i, j, k, aij, bjk)


 Map:
– For every (i, j, aij), emit key value pair (j, (A, i, aij))
– For every (j, k, bjk), emit key value pair (j, (B, k, bjk))
 Reduce:
for each key j
for each value (A, i, aij) and (B, k, bjk)
produce a key value pair ((i,k),(aijbjk))
MATRIX MULTIPLICATION USING MR
3
n l l
A
B C
m (m × n) n
(n × l) = m
(m × l)
(i, j, aij)
(j, k, bjk)

 First MapReduce process has produced key value pairs


((i,k),(aijbjk))
 Another MapReduce process to group and aggregate
 Map: identity, just emit the key value pair ((i,k),(aijbjk))
 Reduce:
for each key (i,k)
produce the sum of the all the values for the key:
MATRIX MULTIPLICATION USING
MAPREDUCE: METHOD 2
n l l
A
B C
m (m × n) n
(n × l)
= m
(m × l)
(i, j, aij)
(j, k, bjk)

 A method with one MapReduce step


 Map:
– For every (i, j, aij), emit for all k = 1,…, l, the key value ((i,k), (A, j, aij))
– For every (j, k, bjk), emit for all i = 1,…, m, the key value ((i,k), (B, j, bjk))
May not fit
 Reduce: in main
for each key (i,k) memory.
sort values (A, j, aij) and (B, j, bjk) by j to group them by j Expensive
for each j multiply aij and bjk external
sort!
sum the products for the key (i,k) to produce
Matrix Multiplication In One Step

• One reducer per output cell


• Each reducer computes Sumj (A[i,j] *
Illustrating Matrix Multiplication
Matrix Multiplication – 2
Phase
Step 3 Map is just identity – emit (i, k, aik
bkj)
Matrix Multiplication – 1 Phase
Mapper for Matrix A (k, v)=((i,
k), (A, j, Aij)) for all k
Mapper for Matrix B (k, v)=((i,
k), (B, j, Bjk)) for all i
Therefore computing the mapper for Matrix A:
• k, i, j computes the number of times it occurs.
• Here all are 2, therefore when k=1, i can have 2 values 1 & 2
• each case can have 2 further values of j=1 and j=2.

k=1 i=1 j=1 ((1, 1), (A, 1, 1))


j=2 ((1, 1), (A, 2, 2))
i=2 j=1 ((2, 1), (A, 1, 3))
j=2 ((2, 1), (A, 2, 4))
k=2 i=1 j=1 ((1, 2), (A, 1, 1))
j=2 ((1, 2), (A, 2, 2))
i=2 j=1 ((2, 2), (A, 1, 3))
j=2 ((2, 2), (A, 2, 4))
Computing the mapper for Matrix B

i=1 j=1 k=1 ((1, 1), (B, 1, 5))


k=2 ((1, 2), (B, 1, 6))
j=2 k=1 ((1, 1), (B, 2, 7))
k=2 ((1, 2), (B, 2, 8))

i=2 j=1 k=1 ((2, 1), (B, 1, 5))


k=2 ((2, 2), (B, 1, 6))
j=2 k=1 ((2, 1), (B, 2, 7))
k=2 ((2, 2), (B, 2, 8))

The formula for Reducer is:


• Reducer (k, v) =(i, k)=>Make sorted Alist
and Blist
• (i, k) => Summation (Aij * Bjk)) for j
• Output =>((i, k), sum)
Reducer
• Observe from Mapper 4 pairs common (1, 1), (1, 2), (2, 1)
and (2, 2)
• Separate lists for Matrix A & B with adjoining values from
(1, 1) =>Alist ={(A, 1, 1), (A, 2, 2)}
Mapper
Blist ={(B, 1, 5), (B, 2, 7)}
Now Aij x Bjk: [(1*5) + (2*7)] =19 Thus we have
-----(i) ((1, 1), 19)
((1, 2), 22)
(1, 2) =>Alist ={(A, 1, 1), (A, 2, 2)} ((2, 1), 43)
Blist ={(B, 1, 6), (B, 2, 8)} ((2, 2), 50)
Now Aij x Bjk: [(1*6) + (2*8)] =22
-----(ii)

(2, 1) =>Alist ={(A, 1, 3), (A, 2, 4)}


Blist ={(B, 1, 5), (B, 2, 7)}
Now Aij x Bjk: [(3*5) + (4*7)] =43
-----(iii)

(2, 2) =>Alist ={(A, 1, 3), (A, 2, 4)}


EXAMPLE - VISITS PER HOUR
• A common metric that web analytic tools provide
about website traffic is the number of page
views on a per-hour basis.
• In order to compute the number of page visits
for each hour, we must create a custom Key
class that encapsulates an hour (day, month,
year, and hour) and then map that key to the
number of observed page views for that hour.
• Just as we did with the WordCount example, the
mapper will return the key mapped to the value
1, and then the reducer and combiners will
compute the actual count of occurrences for
each hour.
• The challenge, is that we need to create a
custom key class to hold our date.
SENTIMENT ANALYSIS
M R R
#Movi
Twitter es
TweetSca Summariz Coun
Posts Per
n Inferred e Rating t Rating
Movie Ratings Medians

“Avatar was great” Movie Rating


“I hated Twilight” Avatar 8 Movie Median

“Twilight was pretty Median #Movies


Twilight 0 Avatar 7
bad”
2 1
“I enjoyed Avatar” Twilight 2 Twilight 2 7 1

“I loved Twilight” Avatar 7

“Avatar was okay” Twilight 7

Avatar 4
APPLICATIONS OF
MAP-REDUCE
DISTRIBUTED GREP

• Very popular example to explain


how Map-Reduce works
• Demo program comes with
Nutch (where Hadoop originated)
Distributed Grep
For Unix guru:
grep -Eh <regex> <inDir>/* | sort | uniq -c | sort -
nr

- counts lines in all files in <inDir> that match


<regex> and displays the counts in descending
Result
order File 1 C File 2 C
B A 3C
B
1A
C

- grep -Eh 'A|C' in/* | sort | uniq -c | sort -nr

- Analyzing web server access logs to find the top


requested pages that match a given pattern
Distributed Grep
Map function in this case:
- input is (file offset, line)
- output is either:
1. an empty list [] (the line does not match)

2. a key-value pair [(line, 1)] (if it matches)

Reduce function in this case:


- input is (line, [1, 1, ...])
- output is (line, n) where n is the number of 1s in
the list.
Distributed Grep
File 1 C File 2 Result
C
B A 3C
B
1A
C

Map tasks: Reduce tasks:


(0, C) -> [(C, 1)] (A, [1]) -> (A, 1)
(2, B) -> [] (C, [1, 1, 1]) -> (C, 3)
(4, B) -> []
(6, C) -> [(C, 1)]
(0, C) -> [(C, 1)]
(2, A) -> [(A, 1)]
GEOGRAPHICAL DATA
• Large data sets including road,
intersection, and feature data
• Problems that Google Maps has
used MapReduce to solve
• Locating roads connected to a
given intersection
• Rendering of map tiles
• Finding nearest feature to a
given address or location
GEOGRAPHICAL DATA
• Input: List of roads and
intersections
• Map: Creates pairs of connected
points (road, intersection) or (road,
road)
• Sort: Sort by key
• Reduce: Get list of pairs with same
key
• Output: List of all points that
connect to a particular road
GEOGRAPHICAL DATA
• Input: Graph describing node network
with all gas stations marked
• Map: Search five mile radius of each
gas station and mark distance to each
node
• Sort: Sort by key
• Reduce: For each node, emit path and
gas station with the shortest distance
• Output: Graph marked and nearest gas
station to each node
Inverted Index For Text Collections
TWITTER ANALYTICS
• Let us take a real-world example to comprehend
the power of MapReduce. Twitter receives around
500 million tweets per day, which is nearly 3000
tweets per second. The following illustration shows
how Tweeter manages its tweets with the help of
MapReduce.
• As shown in the illustration, the MapReduce
algorithm performs the following actions −
• Tokenize − Tokenizes the tweets into maps of
tokens and writes them as key-value pairs.
• Filter − Filters unwanted words from the maps of
tokens and writes the filtered maps as key-value
pairs.
• Count − Generates a token counter per word.
• Aggregate Counters − Prepares an aggregate of
similar counter values into small manageable
units.
MapReduce At FaceBook
• Facebook has a list of friends (note that friends are a bi-
directional thing on Facebook. If I'm your friend, you're
mine).
• They also have lots of disk space and they serve hundreds of
millions of requests everyday.
• They've decided to pre-compute calculations when they can
to reduce the processing time of requests.
• One common processing request is the "You and XXX have
230 friends in common" feature.
• When you visit someone's profile, you see a list of friends
that you have in common.
• This list doesn't change frequently so it'd be wasteful to
recalculate it every time you visited the profile
• FaceBook uses mapreduce so that we can calculate
everyone's common friends once a day and store those
results.
• Later on it's just a quick lookup. We've got lots of disk, it's
MR AT FACEBOOK
• Assume the friends are stored as Person->[List of
Friends], our friends list is then:
• A -> B C D
• B -> A C D E
• C -> A B D E
• D -> A B C E
• E -> B C D
• Each line will be an argument to a mapper.
• For every friend in the list of friends, the mapper will
output a key-value pair.
• The key will be a friend along with the person.
• The value will be the list of friends.
• The key will be sorted so that the friends are in order,
causing all pairs of friends to go to the same reducer.
MR AT FACEBOOK
• After all the mappers are • For map(C -> A B D E) :
done running, you'll have a
list like this:
• (A C) -> A B D E
• (B C) -> A B D E
• For map(A -> B C D) :
• (C D) -> A B D E
• (A B) -> B C D • (C E) -> A B D E
• (A C) -> B C D • For map(D -> A B C E) :
• (A D) -> B C D • (A D) -> A B C E
• For map(B -> A C D E) : • (B D) -> A B C E
(Note that A comes before • (C D) -> A B C E
B in the key)
• (D E) -> A B C E
• (A B) -> A C D E
• And finally for map(E ->
• (B C) -> A C D E B C D):
• (B D) -> A C D E • (B E) -> B C D
• (B E) -> A C D E • (C E) -> B C D
• (D E) -> B C D
MR AT FACEBOOK
• Before we send these key-value pairs to the reducers,
we group them by their keys and get:
• (A B) -> (A C D E) (B C D)
• (A C) -> (A B D E) (B C D)
• (A D) -> (A B C E) (B C D)
• (B C) -> (A B D E) (A C D E)
• (B D) -> (A B C E) (A C D E)
• (B E) -> (A C D E) (B C D)
• (C D) -> (A B C E) (A B D E)
• (C E) -> (A B D E) (B C D)
• (D E) -> (A B C E) (B C D)
MR AT FACEBOOK
• Each line will be passed as an argument to a reducer.
• The reduce function will simply intersect the lists of values and
output the same key with the result of the intersection.
• For example, reduce((A B) -> (A C D E) (B C D)) will output (A
B) : (C D) and means that friends A and B have C and D as
common friends.
• The result after reduction is:
• (A B) -> (C D)
• (A C) -> (B D)
• (A D) -> (B C)
• (B C) -> (A D E)
• (B D) -> (A C E)
• (B E) -> (C D)
• (C D) -> (A B E)
• (C E) -> (B D)
• (D E) -> (B C)
• Now when D visits B's profile, we can quickly look up (B D) and
see that they have three friends in common, (A C E).
MAP REDUCE INAPPLICABILITY

• Database management
• Sub-optimal implementation for
DB
• Does not provide traditional
DBMS features
• Lacks support for default DBMS
tools
Map Reduce Inapplicability

Database implementation issues


• Lack of a schema
• No separation from application
program
• No indexes
• Reliance on brute force
Map Reduce Inapplicability
Feature absence and tool
incompatibility
• Transaction updates
• Changing data and maintaining data
integrity
• Data mining and replication tools
• Database design and construction
tools

You might also like