-
Notifications
You must be signed in to change notification settings - Fork 256
[#159] Fix non-deterministic ID assignment #189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #189 +/- ##
=========================================
+ Coverage 88.27% 88.5% +0.22%
=========================================
Files 17 17
Lines 725 722 -3
Branches 54 58 +4
=========================================
- Hits 640 639 -1
+ Misses 85 83 -2
Continue to review full report at Codecov.
|
|
@mengxr - I'll try to test this tomorrow or early next week. After checking out this branch and building per the README instructions, how do I get this loaded on our YARN cluster and referenced by I don't see #159 when running locally, so I need to test this on our cluster. |
|
You can try a local standalone cluster, which should give the same result as on YARN. Or you can build the graphframes jar by |
| val indexedVertices = zipWithUniqueId(vertices) | ||
| indexedVertices.select(col("uniq_id").as(LONG_ID), col("row." + ID).as(ID), col("row").as(ATTR)) | ||
| val withLongIds = vertices.select(ID) | ||
| .distinct() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does distinct() do sorting or hashing already? Could you hash partition and then implement distinct() within each partition? I'm not sure how smart Catalyst is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it doesn't specify which method it uses and it doesn't allow users to set a partition number. Based on the optimized plan, it does shuffle twice. I just wanted to put a quick fix for @nchammas to test whether this is the root cause.
| indexedVertices.select(col("uniq_id").as(LONG_ID), col("row." + ID).as(ID), col("row").as(ATTR)) | ||
| val withLongIds = vertices.select(ID) | ||
| .distinct() | ||
| .repartition(1024, col(ID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can numPartitions be set according to the original DataFrame numPartitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be very expensive if the original DataFrame has few partitions. Again, this is just a quick patch to see whether it fixes the problem. We need more work after confirmation.
|
@nchammas Did you get a chance to test the patch? |
|
@mengxr - Will try today. |
|
@mengxr - I removed I built GraphFrames using |
|
Did you use python or scala? |
|
My app is Python. |
|
Then you also need to include the python files, which should be included in the jar. Could you test it in Python instead? |
|
So I need to include
I presume you meant Scala here, but my app is in Python and I'd like to test this with my app to be sure. |
|
Yes, you need the python file, which should be bundled in the assembly jar. But you need to put the jar on the |
|
Not necessary, it is just an optimization. Thanks for confirming the fix! I will update the PR to make it looks better:) And we will probably make a bug fix release after since this is critical. |
|
@phi-dbq will take over the follow-up tasks. |

This is a WIP PR that tries to fix nondeterministic vertex ID assignment in GraphFrames (#159). We used to convert vertex DF to RDD and use zipWithUniqueID there. However, if the input DF is not deterministic, the outcome is still nondeterministic. This PR use hash partition + distinct + sort within partition for the ID assignment. Apparently this is more expensive, but hopefully this leads to correct result.
This is the generated plan. It seems correct (not pushing mono_id down).
The PR is still WIP. Once we confirm this is the right fix, I will clean up the code and add tests.
@nchammas Could you help test this patch? I don't have data to reproduce the issue. Thanks a lot!