0% found this document useful (0 votes)
4 views124 pages

10 Graph Processing

The document discusses large-scale graph processing frameworks such as Pregel, GraphLab, and GraphX, focusing on the challenges of graph algorithms and partitioning. It highlights the difficulties in extracting parallelism from data and computation, and introduces concepts like Edge-Cut and Vertex-Cut graph partitioning. Additionally, it covers the implementation of PageRank using MapReduce and the limitations of MapReduce for iterative graph analytics.

Uploaded by

mraiyata
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPTX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
4 views124 pages

10 Graph Processing

The document discusses large-scale graph processing frameworks such as Pregel, GraphLab, and GraphX, focusing on the challenges of graph algorithms and partitioning. It highlights the difficulties in extracting parallelism from data and computation, and introduces concepts like Edge-Cut and Vertex-Cut graph partitioning. Additionally, it covers the implementation of PageRank using MapReduce and the limitations of MapReduce for iterative graph analytics.

Uploaded by

mraiyata
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPTX, PDF, TXT or read online on Scribd
You are on page 1/ 124

Large Scale Graph Processing - Pregel, GraphLab, and

GraphX
Amir H.
Payberah
[email protected]
2022-09-29
The Course Web
Page

htt ps://id222 1kth. github.io

1/
The Questions-Answers
Page

htt ps://ti nyurl.com/bdenpwc5

2/
Where Are
We?

3/
► A flexible abstraction for describing relationships between
discrete objects.

4/
Large
Graph

5/
Graph Algorithms Challenges

► Difficult to extract parallelism based on partitioning of the


data.

► Difficult to express parallelism based on partitioning of


computation.

6/
Graph Algorithms Challenges

► Difficult to extract parallelism based on partitioning of the


data.

► Difficult to express parallelism based on partitioning of


computation.

► Graph partition is a challenging problem.

6/
Graph
Partitioning

► Partition large scale graphs and distribut


to hosts.

7 / 80
Edge-Cut Graph Partitioning

► Divide vertices of a graph into disjoint clusters.

► Nearly equal size (w.r.t. the number of vertices).

► With the minimum number of edges that span separated


clusters.

8 / 80
Vertex-Cut Graph Partitioning

► Divide edges of a graph into disjoint


clusters.

► Nearly equal size (w.r.t. the number of


edges).

► With the minimum number of replicated


vertices.

9 / 80
Edge-Cut vs. Vertex-Cut Graph Partitioning
(1/2)

► Natural graphs: skewed Power-Law degree


distribution.
► Edge-cut algorithms perform poorly on Power-Law
Graphs.

10 /
Edge-Cut vs. Vertex-Cut Graph Partitioning
(2/2)

11 /
PageRank with
MapReduce

12 /
PageRan
k

Σ
R[i] = w ji R[j]
j∈Nbrs(i)

13 /
PageRank Example
(1/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)

14 /
PageRank Example (1/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)

► Inpu
t
V1: [0.25, V2, V3, V4]
V2: [0.25, V3, V4]
V3: [0.25, V1]
V4: [0.25, V1, V3]

14 /
PageRank Example (1/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)

► Inpu
t
V1: [0.25, V2, V3, V4]
V2: [0.25, V3, V4]
V3: [0.25, V1]
V4: [0.25, V1, V3]

► Share the rank among all


outgoing links
V1: (V2, 0 . 2 5 / 3 ) , ( V 3 , 0 . 2 5 / 3 ) , ( V 4 , 0.25/3 )
V2: (V3, 0 . 2 5 / 2 ) , ( V 4 , 0.25/2 )
V3: (V1, 0.25/1 )
V4: (V1, 0 . 2 5 / 2 ) , ( V 3 , 0.25/2 )

14 /
PageRank Example (2/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)

V1: (V2, 0 . 2 5 / 3 ) , ( V 3 , 0 . 2 5 / 3 ) , ( V 4 , 0.25/3 )


V2: (V3, 0 . 2 5 / 2 ) , ( V 4 , 0.25/2 )
V3: (V1, 0.25/1 )
V4: (V1, 0 . 2 5 / 2 ) , ( V 3 , 0.25/2 )

15 /
PageRank Example (2/2)
Σ
► R[i] = w ji R[j]
j∈Nbrs(i)

V1: (V2, 0 . 2 5 / 3 ) , ( V 3 , 0 . 2 5 / 3 ) , ( V 4 , 0.25/3 )


V2: (V3, 0 . 2 5 / 2 ) , ( V 4 , 0.25/2 )
V3: (V1, 0.25/1 )
V4: (V1, 0 . 2 5 / 2 ) , ( V 3 , 0.25/2 )

► Output after one


iteration
V1: [0.37, V2, V3, V4]
V2: [0.08, V3, V4]
V3: [0.33, V1]
V4: [0.20, V1, V3]

15 /
PageRank in MapReduce - Map (1/2)

► Map
function

map(key: [ u r l , pagerank], va l u e : o u t l i n k _ l i s t )
f o r each o u t l i n k i n o u t l i n k _ l i s t :
em it(key: o u t l i n k , va l u e : pagerank /
size(outlink_list))

em it(key: u r l , va l u e : o u t l i n k _ l i s t )

16 /
PageRank in MapReduce - Map (1/2)

► Map
function

map(key: [ u r l , pagerank], va l u e : o u t l i n k _ l i s t )
f o r each o u t l i n k i n o u t l i n k _ l i s t :
em it(key: o u t l i n k , va l u e : pagerank /
size(outlink_list))

em it(key: u r l , va l u e : o u t l i n k _ l i s t )

► Input (key,
( ( V 1 ,value)
0 . 2 5 ) , [ V 2 , V3, V4])
( ( V 2 , 0 . 2 5 ) , [ V 3 , V4])
((V3, 0.25), [V1])
( ( V 4 , 0 . 2 5 ) , [ V 1 , V3])

16 /
PageRank in MapReduce - Map (2/2)

► Map
function

map(key: [ u r l , pagerank], va l u e : o u t l i n k _ l i s t )
f o r each o u t l i n k i n o u t l i n k _ l i s t :
em it(key: o u t l i n k , va l u e : pagerank /
size(outlink_list))

em it(key: u r l , va l u e : o u t l i n k _ l i s t )

17 /
PageRank in MapReduce - Map (2/2)

► Map
function

map(key: [ u r l , pagerank], va l u e : o u t l i n k _ l i s t )
f o r each o u t l i n k i n o u t l i n k _ l i s t :
em it(key: o u t l i n k , va l u e : pagerank /
size(outlink_list))

em it(key: u r l , va l u e : o u t l i n k _ l i s t )

► Intermediate (key,
( V 2 , value)
0.25/3), (V3, 0.25/3), (V4, 0.25/3), (V3, 0.25/2), (V4, 0.25/2), (V1, 0.25/1),
(V1, 0 . 2 5 / 2 ) , ( V 3 , 0.25/2 )
(V1, [ V 2 , V3, V4])
(V2, [ V 3 , V4])
(V3, [V1])
(V4, [ V 1 , V3])

17 /
PageRank in MapReduce - Shuffle

► Intermediate (key,
( V 2 , value)
0.25/3), (V3, 0.25/3), (V4, 0.25/3), (V3, 0.25/2), (V4, 0.25/2), (V1, 0.25/1),
(V1, 0 . 2 5 / 2 ) , ( V 3 , 0.25/2 )
(V1, [ V 2 , V3, V4])
(V2, [ V 3 , V4])
(V3, [V1])
(V4, [ V 1 , V3])

18 /
PageRank in MapReduce - Shuffle

► Intermediate (key,
( V 2 , value)
0.25/3), (V3, 0.25/3), (V4, 0.25/3), (V3, 0.25/2), (V4, 0.25/2), (V1, 0.25/1),
(V1, 0 . 2 5 / 2 ) , ( V 3 , 0.25/2 )
(V1, [ V 2 , V3, V4])
(V2, [ V 3 , V4])
(V3, [V1])
(V4, [ V 1 , V3])

► After
( V 1 , shuffling
0.25/1), (V1, 0 . 2 5 / 2 ) , ( V 1 , [ V 2 , V3, V4])
( V 2 , 0 . 2 5 / 3 ) , ( V 2 , [ V 3 , V4])
(V3, 0.25/3), (V3, 0.25/2), (V3, 0.25/2), (V3,
[V1])
( V 4 , 0 . 2 5 / 3 ) , ( V 4 , 0 . 2 5 / 2 ) , ( V 4 , [ V 1 , V3])

18 /
PageRank in MapReduce - Reduce (1/2)

► Reduce
function
reducer(key: url, va l u e : l i s t _ p r _ o r _ u r l s )
outlink_list = [ ]
pagerank = 0

f o r each pr_or_u rls i n l i s t _ p r _ o r _ u r l s :


i f is_list(pr_or_urls):
o u t l i n k _ l i s t = pr_or_urls
else
pagerank += pr_or_urls

em it(key: [ u r l , pagerank], va l u e :
outlink_list)

19 /
PageRank in MapReduce - Reduce (1/2)

► Reduce
function
reducer(key: url, va l u e : l i s t _ p r _ o r _ u r l s )
outlink_list = [ ]
pagerank = 0

f o r each pr_or_u rls i n l i s t _ p r _ o r _ u r l s :


i f is_list(pr_or_urls):
o u t l i n k _ l i s t = pr_or_urls
else
pagerank += pr_or_urls

em it(key: [ u r l , pagerank], va l u e :
outlink_list)
► Input of the Reduce

( V 1 , function
0.25/1), (V1, 0 . 2 5 / 2 ) , ( V 1 , [ V 2 , V3, V4])
(V2, 0.25/3), (V2, [ V 3 , V4])
(V3, 0.25/3), (V3, 0.25/2), (V3, 0.25/2), (V3,
[V1])
(V4, 0.25/3), (V4, 0 . 2 5 / 2 ) , ( V 4 , [ V 1 , V3])

19 /
PageRank in MapReduce - Reduce (2/2)

► Reduce
function
reducer(key: url, va l u e : l i s t _ p r _ o r _ u r l s )
outlink_list = [ ]
pagerank = 0

f o r each pr_or_u rls i n l i s t _ p r _ o r _ u r l s :


i f is_list(pr_or_urls):
o u t l i n k _ l i s t = pr_or_urls
else
pagerank += pr_or_urls

em it(key: [ u r l , pagerank], va l u e :
outlink_list)

20 /
PageRank in MapReduce - Reduce (2/2)

►Reduce
function
reducer(key: url, va l u e : l i s t _ p r _ o r _ u r l s )
outlink_list = [ ]
pagerank = 0

f o r each pr_or_u rls i n l i s t _ p r _ o r _ u r l s :


i f is_list(pr_or_urls):
o u t l i n k _ l i s t = pr_or_urls
else
pagerank += pr_or_urls

em it(key: [ u r l , pagerank], va l u e :
outlink_list)
► Outpu

( ( V 1 ,t 0 . 3 7 ) , [ V 2 , V3, V4])
((V2, 0.08), [ V 3 , V4])
((V3, 0.33), [V1])
((V4, 0.20), [ V 1 , V3])

20 /
Problems with MapReduce for Graph
Analytics

► MapReduce does not directly support iterative algorithms.


• Invariant graph-topology-data re-loaded and re-processed at each
iteration is wasting I/O, network bandwidth, and CPU

21 /
Problems with MapReduce for Graph
Analytics

► MapReduce does not directly support iterative algorithms.


• Invariant graph-topology-data re-loaded and re-processed at each
iteration is wasting I/O, network bandwidth, and CPU

► Materializations of intermediate results at every MapReduce iteration


harm perfor- mance.

21 /
Think Like a
Vertex

22 /
Think Like a
Vertex
► Each vertex computes individually its value (in
parallel).
► Computation typically depends on the
neighbors.

23 /
Think Like a
Vertex
► Each vertex computes individually its value (in
parallel).
► Computation typically depends on the
neighbors.
► Also know as graph-parallel processing model.

23 /
Data-Parallel vs. Graph-Parallel
Computation

24 /
Preg
el

25 /
Preg
el
► Large-scale graph-parallel processing platform developed
at Google.

► Inspired by bulk synchronous parallel (BSP) model.

26 /
Execution Model
(1/2)

► Applications run in sequence of iterations, called


supersteps.

27 /
Execution Model (1/2)

► Applications run in sequence of iterations, called


supersteps.

► A vertex in superstep S can:


• reads messages sent to it in superstep S-1.
• sends messages to other vertices: receiving at
superstep S+1.
• modifies its state.

27 /
Execution Model (1/2)

► Applications run in sequence of iterations, called


supersteps.

► A vertex in superstep S can:


• reads messages sent to it in superstep S-1.
• sends messages to other vertices: receiving at superstep S+1.
• modifies its state.

► Vertices communicate directly with one another by sending


messages.

27 /
Execution Model (2/2)

► Superstep 0: all vertices are in the


active state.

28 /
Execution Model (2/2)

► Superstep 0: all vertices are in the active state.

► A vertex deactivates itself by voting to halt: no further


work to do.

28 /
Execution Model (2/2)

► Superstep 0: all vertices are in the active state.

► A vertex deactivates itself by voting to halt: no further


work to do.

► A halted vertex can be active if it receives a message.

28 /
Execution Model (2/2)

► Superstep 0: all vertices are in the active state.

► A vertex deactivates itself by voting to halt: no further


work to do.

► A halted vertex can be active if it receives a message.


► The whole algorithm terminates when:
• All vertices are simultaneously inactive.
• There are no messages in transit.

28 /
Example: Max Value (1/4)

i _ v a l : = val

f o r each message m
i f m > v a l then v a l : = m

i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )

29 /
Example: Max Value (2/4)

i _ v a l : = val

f o r each message m
i f m > v a l then v a l : = m

i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )

30 /
Example: Max Value (3/4)

i _ v a l : = val

f o r each message m
i f m > v a l then v a l : = m

i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )

31 /
Example: Max Value (4/4)

i _ v a l : = val

f o r each message m
i f m > v a l then v a l : = m

i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )

32 /
Example:
PageRank

Σ
R[i] = w ji R[j]
j∈Nbrs(i)

33 /
Example: PageRank

Pregel_PageRank (i, messages):


/ / re ce iv e a l l the messages
total = 0
foreach(msg i n messages):
t o t a l = t o t a l + msg

/ / update the rank o f t h i s


vertex
R [ i ] = total

/ / send new messages to


neighbors
fo r e a c h ( j i n out_neig hbors [ i] ):
sendmsg(R[i] * w i j ) to vertex j
Σ
R[i] = w ji R[j]
j∈Nbrs(i)

34 /
System Model

► Master-worker model.

► The master
• Coordinates workers.
• Assigns one or more partitions to each
worker.
• Instructs each worker to perform a
superstep.

35 /
System Model

► Master-worker model.

► The master
• Coordinates workers.
• Assigns one or more partitions to each worker.
• Instructs each worker to perform a superstep.

► Each worker
• Executes the local computation method on its
vertices.
• Maintains the state of its partitions.
• Manages messages to and from other workers.

35 /
Pregel
Limitations

► Inefficient if different regions of the graph converge at


different speed.

► Runtime of each phase is determined by the slowest


machine.

36 /
GraphLab/
Turi

37 /
GraphLab

► GraphLab allows asynchronous iterative


computation.

38 /
GraphLab

► GraphLab allows asynchronous iterative computation.

► Vertex scope of vertex v: the data stored in v, and in all adjacent


vertices and edges.

38 /
GraphLab

► GraphLab allows asynchronous iterative computation.

► Vertex scope of vertex v: the data stored in v, and in all adjacent


vertices and edges.

► A vertex can read and modify any of the data in its scope (shared
memory).

38 /
Example: PageRank
(GraphLab)

GraphLab_PageRank(i)
/ / compute sum over neighbors
total = 0
fo r e a c h ( j i n i n _ n e i g h b o r s ( i ) ) :
t o t a l = t o t a l + R [ j ] * wji

/ / update the PageRank


R [ i ] = total

/ / t r i g g e r neighbors to run again


fo r e a c h ( j i n o u t _ n e i g h b o rs ( i ) ) :
s i g n a l vertex- program on j

Σ
R[i] = w ji R[j]
j∈Nbrs(i)

39 /
Consistency (1/5)

► Overlapped scopes: race-condition in simultaneous execution of two


update func- tions.

40 /
Consistency (1/5)

► Overlapped scopes: race-condition in simultaneous execution of two


update func- tions.

40 /
Consistency (2/5)

► Full consistency: during the execution f ( v ) , no other function reads or


modifies data within the v scope.

41 /
Consistency (3/5)

► Edge consistency: during the execution f ( v ) , no other function reads


or modifies any of the data on v or any of the edges adjacent to v.

42 /
Consistency (4/5)

► Vertex consistency: during the execution f ( v ) , no other function will be


applied to
v.

43 /
Consistency
(5/5)

Consistency vs.
Parallelism

[Low, Y., GraphLab: A Distributed Abstraction for Large Scale Machine Learning (Doctoral dissertation, University of
California), 2013.]

44 /
GraphLab2/Turi
(PowerGraph)

45 /
PowerGraph

► Factorizes the local vertices functions into the Gather, Apply and
Scatter phases.

46 /
Programming Model

► Gather-Apply-Scatter (GAS)

► Gather: accumulate information from


neighborhood.

► Apply: apply the accumulated value to center


vertex.

► Scatter: update adjacent edges and vertices.

47 /
Execution Model (1/2)

► Initially all vertices are active.

► It executes the vertex-program on the active vertices until none


remain.
• Once a vertex-program completes the scatter phase it becomes
inactive until it is reactivated.
• Vertices can activate themselves and neighboring vertices.

48 /
Execution Model (1/2)

► Initially all vertices are active.

► It executes the vertex-program on the active vertices until none


remain.
• Once a vertex-program completes the scatter phase it becomes
inactive until it is reactivated.
• Vertices can activate themselves and neighboring vertices.

► PowerGraph can execute both synchronously and asynchronously.

48 /
Execution Model (2/2)

► Synchronous scheduling like Pregel.


• Executing the gather, apply, and scatter in order.
• Changes made to the vertex/edge data are committed at the end of
each step.

49 /
Execution Model (2/2)

► Synchronous scheduling like Pregel.


• Executing the gather, apply, and scatter in order.
• Changes made to the vertex/edge data are committed at the end of each
step.

► Asynchronous scheduling like GraphLab.


• Changes made to the vertex/edge data during the apply and scatter
functions are immediately committed to the graph.
• Visible to subsequent computation on neighboring vertices.

49 /
Example: PageRank (PowerGraph)

PowerGraph_PageRank(i):
G at h e r ( j - > i ) :
return w j i * R [ j ]

sum(a, b ) :
return a + b

/ / t o t a l : Gather and
sum
Apply(i, total):
R [ i ] = total

S c a tt e r ( i - > j ) :
i f R [ i ] changed then
a c ti v a t e ( j )
Σ
R[i] = w ji R[j]
j∈Nbrs(i)

50 /
Think Like a
Table

51 /
Data-Parallel vs. Graph-Parallel
Computation

52 /
Motivation (2/3)

► Graph-parallel computation: restricting the types of computation to


achieve perfor- mance.

53 /
Motivation (2/3)

► Graph-parallel computation: restricting the types of computation to


achieve perfor- mance.
► The same restrictions make it difficult and inefficient to express many
stages in a typical graph-analytics pipeline.

53 /
Motivation
(3/3)

54 /
Motivation
(3/3)

54 /
Think Like a
Table

► Unifies data-parallel and graph-parallel systems.


► Tables and Graphs are composable views of the same
physical data.

55 /
Graph
X

56 /
Graph
X

► GraphX is the library to perform graph-parallel processing


in Spark.

57 /
The Property Graph Data Model

► Spark represent graph structured data as a property graph.


► It is logically represented as a pair of vertex and edge property
collections.
• VertexRDD and EdgeRDD

/ / VD: the type o f the vertex a tt r ib u t e


/ / ED: the type o f the edge a tt r ib u t e
c l a s s Graph[VD, ED] {
v a l v e r ti c e s : VertexRDD[VD]
v a l edges: EdgeRDD[ED]
}

58 /
The Vertex
Collection
► VertexRDD: contains the vertex properties keyed by the
c l a s svertex ID.ED] {
Graph[VD,
v a l v e r ti c e s : VertexRDD[VD]
v a l edges: EdgeRDD[ED]
}

/ / VD: the type o f the vertex


a tt r ib u t e
a b st ra c t c l a s s VertexRDD[VD]
extends RDD[(VertexId, VD)]

59 /
The Edge
Collection
► EdgeRDD: contains the edge properties keyed by the source and
destination vertex IDs.
c l a s s Graph[VD, ED] {
v a l v e r ti c e s : VertexRDD[VD]
v a l edges: EdgeRDD[ED]
}

/ / ED: the type o f the edge


a tt r ib u t e
case c l a s s E d g e [ E D ] ( s rc I d : Ve r t ex I d , d s t I d : Ve r t ex I d , a tt r : ED)
a b st ra c t c l a s s EdgeRDD[ED] extends RDD[Edge[ED]]

60 /
The Triplet
Collection

► The triplets collection consists of each edge and its corresponding


source and desti- nation vertex properties.

61 /
The Triplet
Collection

► The triplets collection consists of each edge and its corresponding


source and desti- nation vertex properties.

► It logically joins the vertex and edge properties: RDD[EdgeTriplet[VD,


ED]].

61 /
The Triplet
Collection

► The triplets collection consists of each edge and its corresponding


source and desti- nation vertex properties.

► It logically joins the vertex and edge properties: RDD[EdgeTriplet[VD,


ED]].

► The EdgeTriplet class extends the Edge class by adding the s r c A tt r and
dst Att r
members, which contain the source and destination properties
respectively.

61 /
Building a Property Graph

v a l u s e r s : RDD[(VertexId, ( S t r i n g , S t r i n g ) ) ] = s c . p a r a l l e l i z e ( A r r a y ( ( 3 L , ( " r x i n " , " st u d e n t " ) ) ,


( 7 L , ( " j g o n z a l " , " p o st d o c " ) ) , ( 5 L , ( " f r a n k l i n " , " p r o f " ) ) , ( 2 L , ( " i s t o i c a " , " p r o f " ) ) ) )

62 /
Building a Property Graph

v a l u s e r s : RDD[(VertexId, ( S t r i n g , S t r i n g ) ) ] = s c . p a r a l l e l i z e ( A r r a y ( ( 3 L , ( " r x i n " , " st u d e n t " ) ) ,


( 7 L , ( " j g o n z a l " , " p o st d o c " ) ) , ( 5 L , ( " f r a n k l i n " , " p r o f " ) ) , ( 2 L , ( " i s t o i c a " , " p r o f " ) ) ) )

v a l r e l a ti o n s h i p s : RDD[Edge[String ]] = s c . p a r a l l e l i z e ( A r r a y ( E d g e ( 3 L , 7 L , " c o l l a b " ) ,


Edge(5L , 3 L , " a d v i s o r " ) , Edge(2L , 5 L , " c o l l e a g u e " ) , Edge(5L , 7 L , " p i " ) , Edge(5L , 1 L , " - " ) ) )

62 /
Building a Property Graph

v a l u s e r s : RDD[(VertexId, ( S t r i n g , S t r i n g ) ) ] = s c . p a r a l l e l i z e ( A r r a y ( ( 3 L , ( " r x i n " , " st u d e n t " ) ) ,


( 7 L , ( " j g o n z a l " , " p o st d o c " ) ) , ( 5 L , ( " f r a n k l i n " , " p r o f " ) ) , ( 2 L , ( " i s t o i c a " , " p r o f " ) ) ) )

v a l r e l a ti o n s h i p s : RDD[Edge[String ]] = s c . p a r a l l e l i z e ( A r r a y ( E d g e ( 3 L , 7 L , " c o l l a b " ) ,


Edge(5L , 3 L , " a d v i s o r " ) , Edge(2L , 5 L , " c o l l e a g u e " ) , Edge(5L , 7 L , " p i " ) , Edge(5L , 1 L , " - " ) ) )

v a l defaultUser = ("John Doe", "Missing" )

62 /
Building a Property Graph

v a l u s e r s : RDD[(VertexId, ( S t r i n g , S t r i n g ) ) ] = s c . p a r a l l e l i z e ( A r r a y ( ( 3 L , ( " r x i n " , " st u d e n t " ) ) ,


( 7 L , ( " j g o n z a l " , " p o st d o c " ) ) , ( 5 L , ( " f r a n k l i n " , " p r o f " ) ) , ( 2 L , ( " i s t o i c a " , " p r o f " ) ) ) )

v a l r e l a ti o n s h i p s : RDD[Edge[String ]] = s c . p a r a l l e l i z e ( A r r a y ( E d g e ( 3 L , 7 L , " c o l l a b " ) ,


Edge(5L , 3 L , " a d v i s o r " ) , Edge(2L , 5 L , " c o l l e a g u e " ) , Edge(5L , 7 L , " p i " ) , Edge(5L , 1 L , " - " ) ) )

v a l defaultUser = ("John Doe", "Missing" )

v a l graph: G ra p h [ ( S t r i n g , S t r i n g ) , S t r i n g ] = Graph(users, r e l a ti o n s h i p s , defaultUser )

62 /
Graph Operators

► Information about the


graph
► Property operators
► Structural operators
► Joins
► Aggregation
► Iterative computation
► ...

63 /
Information About The Graph (1/2)

► Information about the


val
graph
numEdges: Long
v a l numVerti ces: Long
v a l inDegrees:
VertexRDD[Int]
v a l outDegrees: VertexRDD[Int]
v a l degrees: VertexRDD[Int]

64 /
Information About The Graph (1/2)

► Information about the


val
graph
numEdges: Long
v a l numVerti ces: Long
v a l inDegrees:
VertexRDD[Int]
v a l outDegrees: VertexRDD[Int]
v a l degrees: VertexRDD[Int]

► Views of the graph as


val
collections
v e r ti c e s : VertexRDD[VD]
v a l edges: EdgeRDD[ED]
val t r i p l e t s :
RDD[EdgeTriplet[VD, ED]]

64 /
Information About The Graph (2/2)

/ / Constructed from above


v a l graph: G ra p h [ ( S t r i n g , S t r i n g ) , S t r i n g ]

65 /
Information About The Graph (2/2)

/ / Constructed from above


v a l graph: G ra p h [ ( S t r i n g , S t r i n g ) , S t r i n g ]

/ / Count a l l users which are postdocs


g r a p h . v e r ti c e s . fi l t e r { case ( i d , (name, p o s ) ) => pos == "postdoc" }.count

65 /
Information About The Graph (2/2)

/ / Constructed from above


v a l graph: G ra p h [ ( S t r i n g , S t r i n g ) , S t r i n g ]

/ / Count a l l users which are postdocs


g r a p h . v e r ti c e s . fi l t e r { case ( i d , (name, p o s ) ) => pos == "postdoc" }.count

/ / Count a l l the edges where s r c > dst


g r a p h . e d g e s . fi l t e r ( e => e . s r c I d > e . d st I d ) . c o u n t

65 /
Property Operators

► Transform vertex and edge attributes


► Each of these operators yields a new graph with the vertex or edge
properties modified by the user defined map function.

def mapVerti ces[VD2](map: ( Ve r t e x I d , VD) => VD2): Graph[VD2, ED]


def mapEdges[ED2](map: Edge[ED] => ED2 ): Graph[VD, ED2]
def mapTriplets[ED2](map: Ed geTriplet [ V D, ED] => ED2 ): Graph[VD,
ED2]

66 /
Property Operators

► Transform vertex and edge attributes


► Each of these operators yields a new graph with the vertex or edge
properties modified by the user defined map function.

def mapVerti ces[VD2](map: ( Ve r t e x I d , VD) => VD2): Graph[VD2, ED]


def mapEdges[ED2](map: Edge[ED] => ED2 ): Graph[VD, ED2]
def mapTriplets[ED2](map: Ed geTriplet [ V D, ED] => ED2 ): Graph[VD,
ED2]
v a l r e l a ti o n s : RDD[ String ] = g r a p h . t r i p l e t s . m a p ( t r i p l e t =>
t r i p l e t . s r c A tt r . _ 1 + " i s the " + t r i p l e t . a tt r + " of " + t r i p l e t . d s t A tt r . _ 1 )
r e l a ti o n s . c o l l e c t . f o r e a c h ( p r i n t l n )

66 /
Property Operators

► Transform vertex and edge attributes


► Each of these operators yields a new graph with the vertex or edge
properties modified by the user defined map function.

def mapVerti ces[VD2](map: ( Ve r t e x I d , VD) => VD2): Graph[VD2, ED]


def mapEdges[ED2](map: Edge[ED] => ED2 ): Graph[VD, ED2]
def mapTriplets[ED2](map: Ed geTriplet [ V D, ED] => ED2 ): Graph[VD,
ED2]
v a l r e l a ti o n s : RDD[ String ] = g r a p h . t r i p l e t s . m a p ( t r i p l e t =>
t r i p l e t . s r c A tt r . _ 1 + " i s the " + t r i p l e t . a tt r + " of " + t r i p l e t . d s t A tt r . _ 1 )
r e l a ti o n s . c o l l e c t . f o r e a c h ( p r i n t l n )

v a l newGraph = g ra p h . m a p Tr i p l e t s ( t r i p l e t =>
t r i p l e t . s r c A tt r . _ 1 + " i s the " + t r i p l e t . a tt r + " of " + t r i p l e t . d s t A tt r . _ 1 )
newGraph.edges.collect.foreach (println)

66 /
Structural Operators

► reverse returns a new graph with all the edge directions reversed.
► subgraph takes vertex/edge predicates and returns the graph
containing only the vertices/edges that satisfy the given
predicate.
def re v e rs e : Graph[VD, ED]

def subgraph(epred: Ed geTriplet [ V D, ED] => Boolean, vpred: ( Ve r t e x I d , VD) => Boolean):
Graph[VD, ED]

67 /
Structural Operators

► reverse returns a new graph with all the edge directions reversed.
► subgraph takes vertex/edge predicates and returns the graph
containing only the vertices/edges that satisfy the given
predicate.
def re v e rs e : Graph[VD, ED]

def subgraph(epred: Ed geTriplet [ V D, ED] => Boolean, vpred: ( Ve r t e x I d , VD) => Boolean):
Graph[VD, ED]

/ / Remove missing v e r ti c e s as w e ll as the edges to connected to them


v a l validGraph = graph.subgraph(vpred = ( i d , a tt r ) => a tt r . _ 2 ! = "Missing" )

va l i d G ra p h . v e r ti c e s . c o l l e c t . fo re a c h ( p r i n t l n )

67 /
Join Operators

► j o i n Ve r ti c e s joins the vertices with the input


RDD.

def j o i n Ve r ti c e s [ U ] ( t a b l e : RDD[(VertexId, U)])(m ap: ( Ve r t e x I d , VD, U) => VD): Graph[VD, ED]

68 /
Join Operators

► j o i n Ve r ti c e s joins the vertices with the input RDD.


• Returns a new graph with the vertex properties obtained by applying the
user defined
map function to the result of the joined vertices.
• Vertices without a matching value in the RDD retain their original value.
def j o i n Ve r ti c e s [ U ] ( t a b l e : RDD[(VertexId, U)])(m ap: ( Ve r t e x I d , VD, U) => VD): Graph[VD, ED]

v a l rd d : RDD[(VertexId, S t r i n g ) ] = s c . p a r a l l e l i z e ( A r r a y ( ( 3 L , " ph d" )))

v a l joinedGraph = g r a p h . j o i n Ve r ti c e s ( r d d ) ( ( i d , u s e r , r o l e ) => ( u s e r . _ 1 , r o l e + " " + u s e r . _ 2 ) )

j o in e d G ra p h . v e r ti c e s . c o l l e c t . fo re a c h ( p r in t l n )

68 /
Aggregation (1/2)

► aggregateMessages applies a user defined sendMsg function to each


edge triplet in the graph and then uses the mergeMsg function to
aggregate those messages at their destination vertex.

def aggregateMessages[Msg: C l a s s Ta g ] (
sendMsg: EdgeContext[VD, ED, Msg] => U n i t , / / map
mergeMsg: (Msg, Msg) => Msg, / / reduce
t r i p l e t F i e l d s : Tr i p l e t F i e l d s = Tr i p l e t F i e l d s . A l l ) :
VertexRDD[Msg]

69 /
Aggregation (2/2)

/ / count and l i s t the name o f f r i e n d s o f each user


v a l p r o f s : Vertex RD D [ (I nt, S t r i n g ) ] = validGraph .aggregateMessages [(Int, S t r i n g ) ] (
/ / map
t r i p l e t => {
t r i p l e t . s e n d To D s t ( ( 1 , t r i p l e t . s r c A tt r . _ 1 ) )
},
/ / reduce
( a , b ) => ( a . _ 1 + b . _ 1 , a._2 + " " + b._2)
)

p r o f s . c o l l e c t . fo r e a c h ( p r i n t l n )

70 /
Iterative Computation
(1/6)

71 /
Iterative Computation (2/6)

i _ v a l : = val

f o r each message m
i f m > v a l then v a l : = m

i f i _ v a l == v a l then
vote_to_halt
else
f o r each neighbor v
send_message(v, v a l )

72 /
Iterative Computation (3/6)

► pregel takes two argument lists: g r a p h . p r e g e l ( l i s t 1 )


(list2).

def pregel[ A]
( i n i ti a l M s g : A , m axIter: I n t = Int.MaxValue, a c ti v e D i r : EdgeDirecti on = EdgeDirecti on.Out)
( v p ro g : ( Ve r t e x I d , VD, A) => VD, sendMsg: Ed geTriplet [ V D, ED] => I t e r a t o r [ ( Ve r t e x I d , A ) ] ,
mergeMsg: ( A , A) => A ) :
Graph[VD, ED]

73 /
Iterative Computation (3/6)

► pregel takes two argument lists: g r a p h . p r e g e l ( l i s t 1 ) ( l i s t 2 ) .


► The first list contains configuration parameters
• The initial message, the maximum number of iterations, and the edge
direction in which to send messages.

def pregel[ A]
( i n i ti a l M s g : A , m axIter: I n t = Int.MaxValue, a c ti v e D i r : EdgeDirecti on = EdgeDirecti on.Out)
( v p ro g : ( Ve r t e x I d , VD, A) => VD, sendMsg: Ed geTriplet [ V D, ED] => I t e r a t o r [ ( Ve r t e x I d , A ) ] ,
mergeMsg: ( A , A) => A ) :
Graph[VD, ED]

73 /
Iterative Computation (3/6)

► pregel takes two argument lists: g r a p h . p r e g e l ( l i s t 1 ) ( l i s t 2 ) .


► The first list contains configuration parameters
• The initial message, the maximum number of iterations, and the edge
direction in which to send messages.

► The second list contains the user defined functions.


• Gather: mergeMsg, Apply: vprog, Scatter: sendMsg

def pregel[ A]
( i n i ti a l M s g : A , m axIter: I n t = Int.MaxValue, a c ti v e D i r : EdgeDirecti on = EdgeDirecti on.Out)
( v p ro g : ( Ve r t e x I d , VD, A) => VD, sendMsg: Ed geTriplet [ V D, ED] => I t e r a t o r [ ( Ve r t e x I d , A ) ] ,
mergeMsg: ( A , A) => A ) :
Graph[VD, ED]

73 /
Iterative Computation (4/6)

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

v a l i n i ti a l M s g = - 9999

74 /
Iterative Computation (4/6)

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

v a l i n i ti a l M s g = - 9999

/ / ( v e r t e x I D, (new vertex v a lu e , old vertex v a lu e ) )


v a l v e r ti c e s : RDD[(VertexId, ( I n t , I n t ) ) ] = s c . p a r a l l e l i z e ( A r r a y ( ( 1 L , ( 1 , - 1 ) ) ,
(2L, (2, - 1)), (3L, (3, - 1)), (6L, (6, - 1))))

74 /
Iterative Computation (4/6)

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

v a l i n i ti a l M s g = - 9999

/ / ( v e r t e x I D, (new vertex v a lu e ,
old vertex v a l u e ) )
v a l v e r ti c e s : RDD[(VertexId, ( I n t , I n t ) ) ] = s c . p a r a l l e l i z e ( A r r a y ( ( 1 L , ( 1 , - 1 ) ) ,
(2L, (2, - 1)), (3L, (3, - 1)), (6L, (6, - 1))))

v a l r e l a ti o n s h i p s : RDD[Edge[Boolean]] = s c . p a r a l l e l i z e ( A r r a y ( E d g e ( 1 L , 2 L , t r u e ) ,
Edge(2L , 1 L , t r u e ) , Edge(2L , 6 L , t r u e ) , Edge(3L , 6 L , t r u e ) , Edge(6L , 1 L , t r u e ) ,
Edge(6L , 3 L , t r u e ) ) )

74 /
Iterative Computation
(4/6)

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

v a l i n i ti a l M s g = - 9999

/ / ( v e r t e x I D, (new vertex v a lu e , old vertex v a lu e ) )


v a l v e r ti c e s : RDD[(VertexId, ( I n t , I n t ) ) ] = s c . p a r a l l e l i z e ( A r r a y ( ( 1 L , ( 1 , - 1 ) ) ,
(2L, (2, - 1)), (3L, (3, - 1)), (6L, (6, - 1))))

v a l r e l a ti o n s h i p s : RDD[Edge[Boolean]] = s c . p a r a l l e l i z e ( A r r a y ( E d g e ( 1 L , 2 L , t r u e ) ,
Edge(2L , 1 L , t r u e ) , Edge(2L , 6 L , t r u e ) , Edge(3L , 6 L , t r u e ) , Edge(6L , 1 L , t r u e ) ,
Edge(6L , 3 L , t r u e ) ) )

v a l graph = G ra p h ( v e r ti c e s , re l a ti o n s h i p s )

74 /
Iterative Computation (5/6)

/ / Gather: the func ti on f o r combining messages


def mergeMsg(msg1: I n t , msg2: I n t ) : I n t = math.max(msg1, msg2)

75 /
Iterative Computation (5/6)

/ / Gather: the func ti on f o r combining messages


def mergeMsg(msg1: I n t , msg2: I n t ) : I n t = math.max(msg1, msg2)

/ / Apply: the func ti on f o r r e ce iv i n g messages


def v p ro g ( v e r t ex I d : Ve r t ex I d , va l u e : ( I n t , I n t ) , message: I n t ) : ( I n t , I n t ) =
{ i f (message == i n i ti a l M s g ) / / superstep 0
value
e l s e / / superstep > 0
(math.max(message, v a l u e . _ 1 ) , va l u e . _ 1 ) / / return (newValue, oldValue)
}

75 /
Iterative Computation (5/6)

/ / Gather: the func ti on f o r combining messages


def mergeMsg(msg1: I n t , msg2: I n t ) : I n t = math.max(msg1, msg2)

/ / Apply: the func ti on f o r r e ce iv i n g messages


def v p ro g ( v e r t ex I d : Ve r t ex I d , va l u e : ( I n t , I n t ) , message: I n t ) : ( I n t , I n t ) =
{ i f (message == i n i ti a l M s g ) / / superstep 0
value
e l s e / / superstep > 0
(math.max(message, v a l u e . _ 1 ) , va l u e . _ 1 ) / / return (newValue, oldValue)
}
/ / S c a tt e r : the func ti on f o r computing messages
def sendMsg (triplet : E d g e Tr i p l e t [ ( I n t , I n t ) , Boolean ] ): I t e r a t o r [ ( Ve r t e x I d , I n t ) ] =
{ v a l sourceVertex = t r i p l e t . s r c A tt r
i f (sourceVertex ._1 == sourceVertex ._2) / / newValue == oldValue f o r source vertex?
I terator . em pty / / do nothing
else
/ / propogate new (updated) value to the desti nati on vertex
I t e r a t o r ( ( t r i p l e t . d s t I d , sourceVertex ._1))
}

75 /
Iterative Computation (6/6)

v a l minGraph = g ra p h . p re ge l ( i n iti a l M s g ,
Int.MaxValue,
EdgeDirecti on .Out)
( vprog , / / apply
sendMsg, / / s ca tt e r
mergeMsg) / / gather

m inGraph .verti ces.collect.foreach {


case ( v e r t e x I d , ( v a l u e , o r i g i n a l _ v a l u e ) ) =>
println(value)
}

76 /
Summa
ry

77 /
Summa
ry

► Think like a vertex


• Pregel: BSP, synchronous parallel model, message
passing
• GraphLab: asynchronous model, shared memory
• PowerGraph: synchronous/asynchronous model,
GAS

► Think like a table


• Graphx: unifies data-parallel and graph-parallel
systems.

78 /
References

► G. Malewicz et al., “Pregel: a system for large-scale graph processing”,


ACM SIG- MOD 2010
► Y. Low et al., “Distributed GraphLab: a framework for machine learning
and data mining in the cloud”, VLDB 2012
► J. Gonzalez et al., “Powergraph: distributed graph-parallel computation
on natural graphs”, OSDI 2012
► J. Gonzalez et al., “GraphX: Graph Processing in a Distributed Dataflow
Framework”, OSDI 2014

79 /
Question
s?

80 /

You might also like