10 Graph Processing
10 Graph Processing
GraphX
Amir H.
Payberah
[email protected]
2022-09-29
The Course Web
Page
1/
The Questions-Answers
Page
2/
Where Are
We?
3/
► A flexible abstraction for describing relationships between
discrete objects.
4/
Large
Graph
5/
Graph Algorithms Challenges
6/
Graph Algorithms Challenges
6/
Graph
Partitioning
7 / 80
Edge-Cut Graph Partitioning
8 / 80
Vertex-Cut Graph Partitioning
9 / 80
Edge-Cut vs. Vertex-Cut Graph Partitioning
(1/2)
10 /
Edge-Cut vs. Vertex-Cut Graph Partitioning
(2/2)
11 /
PageRank with
MapReduce
12 /
PageRan
k
Σ
R[i] = w ji R[j]
j∈Nbrs(i)
13 /
PageRank Example
(1/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)
14 /
PageRank Example (1/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)
► Inpu
t
V1: [0.25, V2, V3, V4]
V2: [0.25, V3, V4]
V3: [0.25, V1]
V4: [0.25, V1, V3]
14 /
PageRank Example (1/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)
► Inpu
t
V1: [0.25, V2, V3, V4]
V2: [0.25, V3, V4]
V3: [0.25, V1]
V4: [0.25, V1, V3]
14 /
PageRank Example (2/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)
15 /
PageRank Example (2/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)
15 /
PageRank in MapReduce - Map (1/2)
► Map
function
map(key: [ u r l , pagerank], va l u e : o u t l i n k _ l i s t )
f o r each o u t l i n k i n o u t l i n k _ l i s t :
em it(key: o u t l i n k , va l u e : pagerank /
size(outlink_list))
em it(key: u r l , va l u e : o u t l i n k _ l i s t )
16 /
PageRank in MapReduce - Map (1/2)
► Map
function
map(key: [ u r l , pagerank], va l u e : o u t l i n k _ l i s t )
f o r each o u t l i n k i n o u t l i n k _ l i s t :
em it(key: o u t l i n k , va l u e : pagerank /
size(outlink_list))
em it(key: u r l , va l u e : o u t l i n k _ l i s t )
► Input (key,
( ( V 1 ,value)
0 . 2 5 ) , [ V 2 , V3, V4])
( ( V 2 , 0 . 2 5 ) , [ V 3 , V4])
((V3, 0.25), [V1])
( ( V 4 , 0 . 2 5 ) , [ V 1 , V3])
16 /
PageRank in MapReduce - Map (2/2)
► Map
function
map(key: [ u r l , pagerank], va l u e : o u t l i n k _ l i s t )
f o r each o u t l i n k i n o u t l i n k _ l i s t :
em it(key: o u t l i n k , va l u e : pagerank /
size(outlink_list))
em it(key: u r l , va l u e : o u t l i n k _ l i s t )
17 /
PageRank in MapReduce - Map (2/2)
► Map
function
map(key: [ u r l , pagerank], va l u e : o u t l i n k _ l i s t )
f o r each o u t l i n k i n o u t l i n k _ l i s t :
em it(key: o u t l i n k , va l u e : pagerank /
size(outlink_list))
em it(key: u r l , va l u e : o u t l i n k _ l i s t )
► Intermediate (key,
( V 2 , value)
0.25/3), (V3, 0.25/3), (V4, 0.25/3), (V3, 0.25/2), (V4, 0.25/2), (V1, 0.25/1),
(V1, 0 . 2 5 / 2 ) , ( V 3 , 0.25/2 )
(V1, [ V 2 , V3, V4])
(V2, [ V 3 , V4])
(V3, [V1])
(V4, [ V 1 , V3])
17 /
PageRank in MapReduce - Shuffle
► Intermediate (key,
( V 2 , value)
0.25/3), (V3, 0.25/3), (V4, 0.25/3), (V3, 0.25/2), (V4, 0.25/2), (V1, 0.25/1),
(V1, 0 . 2 5 / 2 ) , ( V 3 , 0.25/2 )
(V1, [ V 2 , V3, V4])
(V2, [ V 3 , V4])
(V3, [V1])
(V4, [ V 1 , V3])
18 /
PageRank in MapReduce - Shuffle
► Intermediate (key,
( V 2 , value)
0.25/3), (V3, 0.25/3), (V4, 0.25/3), (V3, 0.25/2), (V4, 0.25/2), (V1, 0.25/1),
(V1, 0 . 2 5 / 2 ) , ( V 3 , 0.25/2 )
(V1, [ V 2 , V3, V4])
(V2, [ V 3 , V4])
(V3, [V1])
(V4, [ V 1 , V3])
► After
( V 1 , shuffling
0.25/1), (V1, 0 . 2 5 / 2 ) , ( V 1 , [ V 2 , V3, V4])
( V 2 , 0 . 2 5 / 3 ) , ( V 2 , [ V 3 , V4])
(V3, 0.25/3), (V3, 0.25/2), (V3, 0.25/2), (V3,
[V1])
( V 4 , 0 . 2 5 / 3 ) , ( V 4 , 0 . 2 5 / 2 ) , ( V 4 , [ V 1 , V3])
18 /
PageRank in MapReduce - Reduce (1/2)
► Reduce
function
reducer(key: url, va l u e : l i s t _ p r _ o r _ u r l s )
outlink_list = [ ]
pagerank = 0
em it(key: [ u r l , pagerank], va l u e :
outlink_list)
19 /
PageRank in MapReduce - Reduce (1/2)
► Reduce
function
reducer(key: url, va l u e : l i s t _ p r _ o r _ u r l s )
outlink_list = [ ]
pagerank = 0
em it(key: [ u r l , pagerank], va l u e :
outlink_list)
► Input of the Reduce
( V 1 , function
0.25/1), (V1, 0 . 2 5 / 2 ) , ( V 1 , [ V 2 , V3, V4])
(V2, 0.25/3), (V2, [ V 3 , V4])
(V3, 0.25/3), (V3, 0.25/2), (V3, 0.25/2), (V3,
[V1])
(V4, 0.25/3), (V4, 0 . 2 5 / 2 ) , ( V 4 , [ V 1 , V3])
19 /
PageRank in MapReduce - Reduce (2/2)
► Reduce
function
reducer(key: url, va l u e : l i s t _ p r _ o r _ u r l s )
outlink_list = [ ]
pagerank = 0
em it(key: [ u r l , pagerank], va l u e :
outlink_list)
20 /
PageRank in MapReduce - Reduce (2/2)
►Reduce
function
reducer(key: url, va l u e : l i s t _ p r _ o r _ u r l s )
outlink_list = [ ]
pagerank = 0
em it(key: [ u r l , pagerank], va l u e :
outlink_list)
► Outpu
( ( V 1 ,t 0 . 3 7 ) , [ V 2 , V3, V4])
((V2, 0.08), [ V 3 , V4])
((V3, 0.33), [V1])
((V4, 0.20), [ V 1 , V3])
20 /
Problems with MapReduce for Graph
Analytics
21 /
Problems with MapReduce for Graph
Analytics
21 /
Think Like a
Vertex
22 /
Think Like a
Vertex
► Each vertex computes individually its value (in
parallel).
► Computation typically depends on the
neighbors.
23 /
Think Like a
Vertex
► Each vertex computes individually its value (in
parallel).
► Computation typically depends on the
neighbors.
► Also know as graph-parallel processing model.
23 /
Data-Parallel vs. Graph-Parallel
Computation
24 /
Preg
el
25 /
Preg
el
► Large-scale graph-parallel processing platform developed
at Google.
26 /
Execution Model
(1/2)
27 /
Execution Model (1/2)
27 /
Execution Model (1/2)
27 /
Execution Model (2/2)
28 /
Execution Model (2/2)
28 /
Execution Model (2/2)
28 /
Execution Model (2/2)
28 /
Example: Max Value (1/4)
i _ v a l : = val
f o r each message m
i f m > v a l then v a l : = m
i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )
29 /
Example: Max Value (2/4)
i _ v a l : = val
f o r each message m
i f m > v a l then v a l : = m
i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )
30 /
Example: Max Value (3/4)
i _ v a l : = val
f o r each message m
i f m > v a l then v a l : = m
i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )
31 /
Example: Max Value (4/4)
i _ v a l : = val
f o r each message m
i f m > v a l then v a l : = m
i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )
32 /
Example:
PageRank
Σ
R[i] = w ji R[j]
j∈Nbrs(i)
33 /
Example: PageRank
34 /
System Model
► Master-worker model.
► The master
• Coordinates workers.
• Assigns one or more partitions to each
worker.
• Instructs each worker to perform a
superstep.
35 /
System Model
► Master-worker model.
► The master
• Coordinates workers.
• Assigns one or more partitions to each worker.
• Instructs each worker to perform a superstep.
► Each worker
• Executes the local computation method on its
vertices.
• Maintains the state of its partitions.
• Manages messages to and from other workers.
35 /
Pregel
Limitations
36 /
GraphLab/
Turi
37 /
GraphLab
38 /
GraphLab
38 /
GraphLab
► A vertex can read and modify any of the data in its scope (shared
memory).
38 /
Example: PageRank
(GraphLab)
GraphLab_PageRank(i)
/ / compute sum over neighbors
total = 0
fo r e a c h ( j i n i n _ n e i g h b o r s ( i ) ) :
t o t a l = t o t a l + R [ j ] * wji
Σ
R[i] = w ji R[j]
j∈Nbrs(i)
39 /
Consistency (1/5)
40 /
Consistency (1/5)
40 /
Consistency (2/5)
41 /
Consistency (3/5)
42 /
Consistency (4/5)
43 /
Consistency
(5/5)
Consistency vs.
Parallelism
[Low, Y., GraphLab: A Distributed Abstraction for Large Scale Machine Learning (Doctoral dissertation, University of
California), 2013.]
44 /
GraphLab2/Turi
(PowerGraph)
45 /
PowerGraph
► Factorizes the local vertices functions into the Gather, Apply and
Scatter phases.
46 /
Programming Model
► Gather-Apply-Scatter (GAS)
47 /
Execution Model (1/2)
48 /
Execution Model (1/2)
48 /
Execution Model (2/2)
49 /
Execution Model (2/2)
49 /
Example: PageRank (PowerGraph)
PowerGraph_PageRank(i):
G at h e r ( j - > i ) :
return w j i * R [ j ]
sum(a, b ) :
return a + b
/ / t o t a l : Gather and
sum
Apply(i, total):
R [ i ] = total
S c a tt e r ( i - > j ) :
i f R [ i ] changed then
a c ti v a t e ( j )
Σ
R[i] = w ji R[j]
j∈Nbrs(i)
50 /
Think Like a
Table
51 /
Data-Parallel vs. Graph-Parallel
Computation
52 /
Motivation (2/3)
53 /
Motivation (2/3)
53 /
Motivation
(3/3)
54 /
Motivation
(3/3)
54 /
Think Like a
Table
55 /
Graph
X
56 /
Graph
X
57 /
The Property Graph Data Model
58 /
The Vertex
Collection
► VertexRDD: contains the vertex properties keyed by the
c l a s svertex ID.ED] {
Graph[VD,
v a l v e r ti c e s : VertexRDD[VD]
v a l edges: EdgeRDD[ED]
}
59 /
The Edge
Collection
► EdgeRDD: contains the edge properties keyed by the source and
destination vertex IDs.
c l a s s Graph[VD, ED] {
v a l v e r ti c e s : VertexRDD[VD]
v a l edges: EdgeRDD[ED]
}
60 /
The Triplet
Collection
61 /
The Triplet
Collection
61 /
The Triplet
Collection
► The EdgeTriplet class extends the Edge class by adding the s r c A tt r and
dst Att r
members, which contain the source and destination properties
respectively.
61 /
Building a Property Graph
62 /
Building a Property Graph
62 /
Building a Property Graph
62 /
Building a Property Graph
62 /
Graph Operators
63 /
Information About The Graph (1/2)
64 /
Information About The Graph (1/2)
64 /
Information About The Graph (2/2)
65 /
Information About The Graph (2/2)
65 /
Information About The Graph (2/2)
65 /
Property Operators
66 /
Property Operators
66 /
Property Operators
v a l newGraph = g ra p h . m a p Tr i p l e t s ( t r i p l e t =>
t r i p l e t . s r c A tt r . _ 1 + " i s the " + t r i p l e t . a tt r + " of " + t r i p l e t . d s t A tt r . _ 1 )
newGraph.edges.collect.foreach (println)
66 /
Structural Operators
► reverse returns a new graph with all the edge directions reversed.
► subgraph takes vertex/edge predicates and returns the graph
containing only the vertices/edges that satisfy the given
predicate.
def re v e rs e : Graph[VD, ED]
def subgraph(epred: Ed geTriplet [ V D, ED] => Boolean, vpred: ( Ve r t e x I d , VD) => Boolean):
Graph[VD, ED]
67 /
Structural Operators
► reverse returns a new graph with all the edge directions reversed.
► subgraph takes vertex/edge predicates and returns the graph
containing only the vertices/edges that satisfy the given
predicate.
def re v e rs e : Graph[VD, ED]
def subgraph(epred: Ed geTriplet [ V D, ED] => Boolean, vpred: ( Ve r t e x I d , VD) => Boolean):
Graph[VD, ED]
va l i d G ra p h . v e r ti c e s . c o l l e c t . fo re a c h ( p r i n t l n )
67 /
Join Operators
68 /
Join Operators
j o in e d G ra p h . v e r ti c e s . c o l l e c t . fo re a c h ( p r in t l n )
68 /
Aggregation (1/2)
def aggregateMessages[Msg: C l a s s Ta g ] (
sendMsg: EdgeContext[VD, ED, Msg] => U n i t , / / map
mergeMsg: (Msg, Msg) => Msg, / / reduce
t r i p l e t F i e l d s : Tr i p l e t F i e l d s = Tr i p l e t F i e l d s . A l l ) :
VertexRDD[Msg]
69 /
Aggregation (2/2)
p r o f s . c o l l e c t . fo r e a c h ( p r i n t l n )
70 /
Iterative Computation
(1/6)
71 /
Iterative Computation (2/6)
i _ v a l : = val
f o r each message m
i f m > v a l then v a l : = m
i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )
72 /
Iterative Computation (3/6)
def pregel[ A]
( i n i ti a l M s g : A , m axIter: I n t = Int.MaxValue, a c ti v e D i r : EdgeDirecti on = EdgeDirecti on.Out)
( v p ro g : ( Ve r t e x I d , VD, A) => VD, sendMsg: Ed geTriplet [ V D, ED] => I t e r a t o r [ ( Ve r t e x I d , A ) ] ,
mergeMsg: ( A , A) => A ) :
Graph[VD, ED]
73 /
Iterative Computation (3/6)
def pregel[ A]
( i n i ti a l M s g : A , m axIter: I n t = Int.MaxValue, a c ti v e D i r : EdgeDirecti on = EdgeDirecti on.Out)
( v p ro g : ( Ve r t e x I d , VD, A) => VD, sendMsg: Ed geTriplet [ V D, ED] => I t e r a t o r [ ( Ve r t e x I d , A ) ] ,
mergeMsg: ( A , A) => A ) :
Graph[VD, ED]
73 /
Iterative Computation (3/6)
def pregel[ A]
( i n i ti a l M s g : A , m axIter: I n t = Int.MaxValue, a c ti v e D i r : EdgeDirecti on = EdgeDirecti on.Out)
( v p ro g : ( Ve r t e x I d , VD, A) => VD, sendMsg: Ed geTriplet [ V D, ED] => I t e r a t o r [ ( Ve r t e x I d , A ) ] ,
mergeMsg: ( A , A) => A ) :
Graph[VD, ED]
73 /
Iterative Computation (4/6)
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
v a l i n i ti a l M s g = - 9999
74 /
Iterative Computation (4/6)
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
v a l i n i ti a l M s g = - 9999
74 /
Iterative Computation (4/6)
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
v a l i n i ti a l M s g = - 9999
/ / ( v e r t e x I D, (new vertex v a lu e ,
old vertex v a l u e ) )
v a l v e r ti c e s : RDD[(VertexId, ( I n t , I n t ) ) ] = s c . p a r a l l e l i z e ( A r r a y ( ( 1 L , ( 1 , - 1 ) ) ,
(2L, (2, - 1)), (3L, (3, - 1)), (6L, (6, - 1))))
v a l r e l a ti o n s h i p s : RDD[Edge[Boolean]] = s c . p a r a l l e l i z e ( A r r a y ( E d g e ( 1 L , 2 L , t r u e ) ,
Edge(2L , 1 L , t r u e ) , Edge(2L , 6 L , t r u e ) , Edge(3L , 6 L , t r u e ) , Edge(6L , 1 L , t r u e ) ,
Edge(6L , 3 L , t r u e ) ) )
74 /
Iterative Computation
(4/6)
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
v a l i n i ti a l M s g = - 9999
v a l r e l a ti o n s h i p s : RDD[Edge[Boolean]] = s c . p a r a l l e l i z e ( A r r a y ( E d g e ( 1 L , 2 L , t r u e ) ,
Edge(2L , 1 L , t r u e ) , Edge(2L , 6 L , t r u e ) , Edge(3L , 6 L , t r u e ) , Edge(6L , 1 L , t r u e ) ,
Edge(6L , 3 L , t r u e ) ) )
v a l graph = G ra p h ( v e r ti c e s , re l a ti o n s h i p s )
74 /
Iterative Computation (5/6)
75 /
Iterative Computation (5/6)
75 /
Iterative Computation (5/6)
75 /
Iterative Computation (6/6)
v a l minGraph = g ra p h . p re ge l ( i n iti a l M s g ,
Int.MaxValue,
EdgeDirecti on .Out)
( vprog , / / apply
sendMsg, / / s ca tt e r
mergeMsg) / / gather
76 /
Summa
ry
77 /
Summa
ry
78 /
References
79 /
Question
s?
80 /