-
Notifications
You must be signed in to change notification settings - Fork 256
Description
Current implementation of AggregateMessages stores the columns sent to source and destination vertices in an Option[Column]. This means that if I call either sendToSrc or sendToDst multiple times from the same AggregateMessages instance, only the column specified in the last call is stored and all the previous ones are missed. I.e.:
g.aggregateMessages
.sendToSrc(AM.dst("age"))
.sendToSrc(AM.dst("height"))
.agg(sum(AM.msg))
in this example the aggregation is applied to "height" column and never to "age". if I wanted to aggregate on both columns then I would have to create a new AggregateMessages instance and apply the aggregation on the other column. My understanding is that this negatively impacts on the performance because every AggregateMessages instance is translated into a couple of dataframe join and group by operations which they would be executing over again for every AggregateMessages instance.
My proposal is to allow multiple calls to sendToSrc & sendToDst from the same AggregateMessages instance. The resulting dataframe would have the vertexId column + one column for each aggregation defined in the agg method.
If there is consensus for this change, I can submit a PR with this enhancement.
Thanks
Esteban