-
Notifications
You must be signed in to change notification settings - Fork 256
Description
Problem
At the moment we are doing left-outer join of non-null messages with all the vertices in the graph:
val verticesWithMsg = currentVertices.join(newAggMsgDF, Seq(ID), "left_outer")And on each iteration we are generating all the triplets and trying to generate all the messages for all the vertices without considering is the vertex active or not. It tends to a huge performance degradation especially for the first iterations of Pregel or for iterations when it is almost converging.
An example of impl of Shortest Paths with GraphFrame Pregel may looks like:

But even on iteration when we have only 1 non-null message we are trying to generate messages for all the triplets!
Possible Solution
- Use
left-joininstead of inner one when updatingcurrentVertices(currentVertices = graph.vertices.join(vertexUpdateColDF, ID)->currentVertices = graph.vertices.join(vertexUpdateColDF, ID, "left-outer") - Introduce to the
currentVerticesa flag "isActive = Pregel.msg.isNotNull". Use this flag to choose only active vertices to generate triplets and messages (val tripletsDF = currentVertices.select(...).join(...)->val tripletsDF = currentVertices.filter(col("isActive")).select(...).join(...))
In that case we will generate new messages only for triplets where there was an update on the last iteration instead of re-generating all the messages for all the triplets just to realize that are empty.