Big Data Systems
Session 7 - Distributed Programming
John Benito JP
[email protected]
Topics for today
• Top down design
• Types of parallelism
• MapReduce programming model
• See how a map reduce program works using Hadoop
• Iterative MapReduce
2
Top down design - sequential context
• In the context of a sequential program
main()
• Divide and conquer
• It is easier to divide a problem into sub- f1() f2() f5()
problems and execute one by one f3()
• A sub-problem definition may be left to the f4()
programmer in a sequential programming context
3
Top down design - parallel context
• We cannot decompose the problem into sub-problems in
anyway the programmer chooses to
• Need to think about main()
• Each sub-problem needs to be assigned to a processor f1()
• Goal is to get the program work faster
f2() f2()
• Divide the problem only when we can combine at the
end into the final answer f4()
• Need to decide where to do the combination f5() f5() f5()
• Is there any parallelism in combination or is it
f7()
sequential or trivial
4
Deciding on number of sub-problems
• In conventional top down design for sequential
systems main()
• Keep number of sub-problems manageable f1()
• Because need to keep track of them as
f2() f2()
computation progresses
• In parallel system it is dictated by number of f4()
processors f5() f5() f5()
• Processor utilisation is the key
f7()
• If there are N processors, we can potentially have
N sub-problems How many processors ?
5
Top-down design
• At each level problems need to run in parallel
6
Example 1 - Keyword search in list
• Problem:
• Search for a key k in a sorted list Ls of size N
• Data:
• Ls is stored in a distributed system with p processors each storing N/p items
• Solution:
• Run binary search in each of the p processors in parallel
• Whichever processor finds k return (i, j) where ith processor has found key in jth
position
• Combination: One or more positions are collected at processor 0
• Speedup: p
• Time complexity: O(Log(N/p)) k N/p items
P1 P2 P3 P4
7
Example 1 - Keyword search in list
•0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27,
1 => 50
28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 23 == >> 25
12
40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 45 => 6
=> 3
52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 6 = > 4
64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75,
76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87,
88, 89,90,91,92,93,94,95,96,97,98,99
8
Example 1 - Keyword search in list
•0 - 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
•1 - 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
•2 - 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 1 => 4
•3 - 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, Return (0,4)
•4 - 40, 41, 42, 43, 44, 45, 46, 47, 48, 49,
•5 - 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
•6 - 60, 61, 62, 63, 64, 65, 66, 67, 68, 69,
•7 - 70, 71, 72, 73, 74, 75, 76, 77, 78, 79,
•8 - 80, 81, 82, 83, 84, 85, 86, 87, 88, 89
•9 -90, 91, 92, 93, 94, 95, 96, 97, 98, 99
9
Example 2 - Fingerprint matching
• Find matches for a fingerprint F in a database of D prints
• Set of D prints is partitioned and evenly stored in a distributed database
• Partitioning is an infrequent activity - only when many new entries in database
• Search is the frequent activity
• Speed up p
• Time complexity O(N/p) given sequential search in every partition
10
Example 3: Document search
• Find keywords from each document d in a distributed document collection D
11
Example 3: Document search
• D1 => "Artificial intelligence is the future of technology.",
• Artificial intelligence future technology
• D2 => "Distributed systems are critical for big data applications.",
• Distributed systems critical data applications.
• D3 => "Machine learning algorithms require large datasets.",
• Machine learning algorithms require large datasets
• D4 => "Natural language processing enables understanding text.”,
• Natural language processing enables understanding text
12
Topics for today
• Top down design
• Types of parallelism
• MapReduce programming model
• See how a map reduce program works using Hadoop
• Iterative MapReduce
13
Data parallel execution model
• Data is partitioned to multiple nodes / processors
• Try to make partitions equal or balanced
• All processors execute the same code in parallel
• For homogenous nodes and equal amount of work, the utilization will be
close to 100%
• Execution time is minimal
• Unbalanced data size / work or heterogenous nodes will lead to higher
execution time
14
Where data parallelism is not possible
• There are problems where you cannot divide the work
1. equally
2. independently to proceed in parallel
• QuickSort(Ls, N)
• All N items in Ls have to be in memory of processor 0
15
Where data parallelism is not possible
16
Example : QuickSort
• Pick a pivot element at position m to partition Ls and partition into sub-problems
• Do that in each level
• There is dependency on parent level to partition - not a single level problem (log N or log p levels which ever is lower)
• Choice of m cannot guarantee equal partition
• At a level one set of processors can get large partitions and another set small partitions
• Could be techniques to maintain balanced partitions and improve processor utilization uniformly
0 m N-1
you can’t guarantee
equal partitions
based on D[m]
17
Tree parallel execution model for Quicksort parallel logic
• In step j
• foreach processor p from 1 to 2j-1 do
• partition Lsp into Lsp1 and Lsp2
• assign Lsp1 and Lsp2 to processors 2*p-1 and 2*p
•j=j+1
• repeat until 2j == N
• Depends on how good is the partition - at random ?
• May be over long term for large lists and many processors
• Time taken may be as bad as sequential with bad partitioning
18
Tree parallelism summary
• Dynamic version of divide and conquer - partitions are done dynamically
• Division of problem into sub-problems happens execution time
• Sub-problem is identical in structure to the larger problem
• What is the division step ?
• In quick sort it was picking m to split into 2 sub-problems
• Division / partitioning logic is important to find almost equal sub-problems
• If problem is divided into k sub-problems
• then in LogkN steps needed if N processors execute in parallel
• If p = N then work gets done in Log(N) time with each list item assign to one processor finally
• What if we assign p processors with
• p < N : so that all processors are utilised
• p > N : under-utilised processors
19
Task parallelism - Example 1 : Word processor
• Parallel tasks that work on the same data
• Unlike data and tree parallel Data doesn’t need to be divided, the Task gets divided into
sub-tasks
• May work on same data instance, else need to make data copies and keep them in sync
• If on multiple core, different threads can execute tasks in parallel accessing same data instance
in memory
20
Task parallelism - Example 2 : Independent statistics
• Given a list Ls of numeric values find its mean, median and mode
• Solution
• Independent tasks on same data
• Each task can find a statistic on Ls
• Run tasks in parallel
21
Task parallelism summary
• Identify sub-tasks based on functionality with no common function
• In Tree and Data parallel the tasks are identical function
• Sub-tasks are not identified based on data
• Independent sub-tasks are executed in parallel
• Sub-tasks are often limited and known statically in advance
• We know in a word processor what are the sub-tasks
• We know in statistical analysis what functions we will run in advance
• So limited parallelism scope - not scalable with more resources
• In data or tree parallelism we can potentially get more parallelism with
more data - more scalable with more resources at same time interval
22
Request parallelism
• Problem
• Scalable execution of independent tasks in parallel
• Execute same code but in many parallel instances
• Solution
• On arrival, each request is serviced in parallel along with other existing tasks servicing prior requests
• Could be processing same or fixed data
• Request-reply pairs are independent of each other serviced by a different thread or process in the
backend
• There could be some application specific backend dependency, e.g. GET and POST on same data
item
• Systems Fit
• Servers in client-server models
• e.g. email server, HTTP web-server, cloud services with API interface, file / storage servers
• Scalability metrics : Requests / time (throughput)
23
Exercise: What type of parallelism
• Use case 1 - Historical sales data analysis to find monthly sales aggregates
• Use Case 2 - Web crawler - traverses the web as a graph following links
(edges) to visit websites (nodes) to download unique documents
24
What happens in a loosely coupled distributed system
• Divide
• No shared memory
message passing
• Memory / Storage is on separate nodes communications
• So any exchange of data or coordination between tasks is via message passing
• Divide the problem in a way that computation task can run on local data
• Conquer / Merge
• In shared memory merge it is simpler with each process writing into a memory location
• In distributed
• Need to collect data from the different nodes
• In search example, it is a simpler merge to just collect result - so low cost
• In quick sort, it is simple append whether writing in place for shared memory or sending a message
• Sometimes merges may become sequential
• e.g. k-means - in each iteration (a) guess clusters in parallel to improve the clusters but (2) checking if we
have found right clusters is sequential
25
Topics for today
• Top down design
• Types of parallelism
• MapReduce programming model
• See how a map reduce program works using Hadoop
• Iterative MapReduce
26
MapReduce in terms of Data and Tree parallelism
• Map
• Data parallelism
• Divide a problem into sub-problems
based on data
• Reduce
• Inverse tree parallelism
• With every merge / reduce the
parallelism reduces until we get one
result
• Depending on the problem “reduce”
step may be simple or sequential
27
Example: Word Count using MapReduce
D1 : the blue ship on blue sea
map(key, value):
the, 1 blue, 1 ship, 1 on, 1
// key: document name; value: text of document
blue, 1 sea, 1
for each word w in value:
emit(w, 1)
sort is done on keys
reduce(key, values): to have k,v with same k blue, [1,1] on, 1 sea, 1 ship, 1 the, 1
// key: a word; value: an iterator over counts value together
result = 0
for each count v in values:
result += v
emit(result) blue, 2 on, 1 sea, 1 ship, 1 the, 1
28
Word count (2)
29
MapReduce: The Map Step
Input Intermediate
key-value pairs key-value pairs
k v
map
k1 v1
k v
map
k2 v2
k v
… …
kn vn k v
E.g. (doc—id, doc-content) E.g. (word, wordcount-in-a-doc)
Adapted from Jeff Ullman’s course slides 30
MapReduce: The Reduce Step
Output
Intermediate Key-value groups key-value pairs
key-value pairs
k v k v v v k v
k v
k v k v v
group reduce
k v
… …
…
k v k v k v
E.g. (word, list-of-wordcount) (word, final-count)
(word, wordcount-in-a-doc) ~ SQL Group by ~ SQL aggregation
Adapted from Jeff Ullman’s course slides 31
Formal definition of a MapReduce program
• Input: a set of key/value pairs
• User supplies two functions:
• map(k,v) list(k1,v1)
• reduce(k1, list(v1)) v2
• (k1,v1) is an intermediate key/value pair
• Output is the set of (k1,v2) pairs
32
When will you use this ?
• Huge set of documents that don’t fit into memory
• So need file based processing in stages, e.g. Hadoop
• But can also do this in memory - e.g. Spark
• Lot of data partitioning (high data parallelism)
• Possibly simple merge among partitions (low cost inverse tree
parallelism)
33
MapReduce: Execution overview
• Data centric design • Intermediate results on disk A MapReduce library and
runtime does all the work for
• Move computation closer to data • Dynamic task scheduling
- allocating resources,
- starting workers,
- managing them,
- moving data,
- handling failures
…
34
MapReduce origins
Created in Google on GFS
Open source version created as Apache Hadoop
Perform maps/reduces on data using many machines
▪ The system takes care of distributing the data and managing fault tolerance
▪ You just write code to map one element and reduce elements to a combined result
Separates how to do recursive divide-and-conquer from what computation to perform
▪ Old idea in higher-order functional programming transferred to large-scale distributed computing
▪ Complementary approach to database declarative queries
▪ In SQL you don’t actually write the low level query execution code
▪ Programmer needs to focus just on map and reduce logic and rest of the work is done by the map-
reduce framework.
▪ So restricted programming interface to the system to let the system do the distribution of
work, job tracking, fault tolerance etc.
35
Topics for today
• Top down design
• Types of parallelism
• MapReduce programming model
• See how a map reduce program works using Hadoop
• Iterative MapReduce
36
More complex example - sales data processing
count tx by country
https://www.guru99.com/create-your-first-hadoop-program.html
37
Running and checking status
38
MapReduce stats
39
Topics for today
• Top down design
• Types of parallelism
• MapReduce programming model
• See how a map reduce program works using Hadoop
• Iterative MapReduce
40
Iterative map reduce
• MapReduce is a one-pass computation
• Many applications, esp in ML and Data Mining
areas, need to iteratively process data
• So they need iterative execution of map reduce
jobs
• An approach is to create a main program that calls
the core map reduce with variable data
• Core program also checks for convergence
• error bound (e.g. k-means clustering)
• fixed iterations
41
Example 1: K-means clustering
42
K-means as iterative map reduce
• The MapReduce program
driver is responsible for
repeating the steps via an
iterative construct.
• Within each iteration map and
reduce steps are called.
• Each map step reuses the
result produced in previous
reduce step.
• e.g. k centers computed
https://github.com/thomasjungblut/mapreduce-kmeans/tree/master/src/de/jungblut/clustering/mapreduce
43
Example 2: PageRank
• Given a page X with n in-bound links t1, t2, … tn
• C(ti) is out-degree of ti
• p is the probability of a random jump t1
• N is total number of nodes
X
e.g. 0.5*1/n + 0.5*(PR(t1) /3 + PR(t2)/4 + …) t2
• Iterative process tn
• Start with initial PRi values for each node
• Each page distributes it’s PRi credits to all pages it links to (out-bound)
• Each page adds-up all the credits it gets from in-bound links to compute
PRi+1
• Iterate till values converge
44
PageRank as Iterative MapReduce
45
Iterations using existing runtimes
• Loop implemented on top of
existing file-based single step
map-reduce core
• Large overheads from
• re-initialization of tasks
• reloading of static data
• communication and data
transfers
DistributedCache: https://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/filecache/DistributedCache.html
46
MapReduce++ : Iterative MapReduce
• Some optimizations are done
on top of existing model
• Static data loaded once
• Cached tasks across
invocations
• Combine operations
47
Example in Twister: Enables more APIs
48
https://iterativemapreduce.weebly.com/ and ref Hwang Chap 6, Pg 351
The optimisations indeed help
K-means clustering using various programming models
49
Iterative MapReduce: Other options
• HaLoop
• Modifies Hadoop scheduling to make it loop aware
• Implements caches to avoid going to disk between iterations
• Optional reading: Paper in Proceedings of the VLDB Endowment 3(1):285-296, Sep
2010
• Spark
• Uses in-memory computing to speed up iterations
• An in-memory structure called RDD : Resilient Distributed Dataset replaces files on
disk
• Ideal for iterative computations that reuse lot of data in each iteration
50
adjacency list
Naive PageRank using Spark Example code link 12
13
// load data from file - can be in HDFS 14
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
1 41
// links is a pair of URLs links : 31
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(s -> {
String[] parts = SPACES.split(s);
(1,2) (1,3) (1,4) (4,1) (3,1) (2,1) 21
return new Tuple2<>(parts[0], parts[1]); ranks:
}).distinct().groupByKey().cache();
// ranks stores the initial page ranks init to 1 (1,1) (1,1) (1,1) (4,1) (3,1) (2,1)
JavaPairRDD<String, Double> ranks = links.mapValues(rs -> 1.0);
2
// Start loop to update URL ranks iteratively
for (int current = 0; current < Integer.parseInt(args[1]); current++) {
// Calculates URL contributions to the rank of other URLs. urlCount: 4
JavaPairRDD<String, Double> contribs = links.join(ranks).values() First iteration contribs:
.flatMapToPair(s -> {
int urlCount = Iterables.size(s._1()); (1,3/4) (2,1/4) (3,1/4) (4,1/4)
List<Tuple2<String, Double>> results = new ArrayList<>();
for (String n : s._1) {
results.add(new Tuple2<>(n, s._2() / urlCount));
4 recompute contribs 3 recompute ranks
}
return results.iterator();
}); First iteration ranks:
// Re-calculates URL ranks based on neighbor contributions. (1, 0.78) (1, 0.15 + 0.85 * 0.75)
ranks = contribs.reduceByKey(new Sum()) (2, 0.36)
.mapValues(sum -> 0.15 + sum * 0.85); (2, 0.15 + 0.85 * 0.25)
} (3, 0.36) (3, 0.15 + 0.85 * 0.25)
// end of iterations ranks contains the result to be collected (4, 0.36) (4, 0.15 + 0.85 * 0.25) 51
Summary
• Different types of parallelism
• Data and tree parallelism —> map and reduce
• Basics of MapReduce programs with a historical sales data processing
example
• Optimizations for iterative MapReduce requirements
• How does Spark / in-memory computing help for iterative MapReduce
programming
52
Next Session:
Hadoop MapReduce and YARN