Skip to content

Conversation

@mengxr
Copy link
Contributor

@mengxr mengxr commented May 4, 2017

This is a WIP PR that tries to fix nondeterministic vertex ID assignment in GraphFrames (#159). We used to convert vertex DF to RDD and use zipWithUniqueID there. However, if the input DF is not deterministic, the outcome is still nondeterministic. This PR use hash partition + distinct + sort within partition for the ID assignment. Apparently this is more expensive, but hopefully this leads to correct result.

This is the generated plan. It seems correct (not pushing mono_id down).

scala> spark.range(1000).repartition('id).distinct().sortWithinPartitions('id).withColumn("long_id", monotonically_increasing_id()).explain()

== Physical Plan ==
*Project [id#7L, monotonically_increasing_id() AS long_id#13L]
+- *Sort [id#7L ASC NULLS FIRST], false, 0
   +- *HashAggregate(keys=[id#7L], functions=[])
      +- *HashAggregate(keys=[id#7L], functions=[])
         +- Exchange hashpartitioning(id#7L, 200)
            +- *Range (0, 1000, step=1, splits=Some(4))

The PR is still WIP. Once we confirm this is the right fix, I will clean up the code and add tests.

@nchammas Could you help test this patch? I don't have data to reproduce the issue. Thanks a lot!

@codecov-io
Copy link

codecov-io commented May 4, 2017

Codecov Report

Merging #189 into master will increase coverage by 0.22%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #189      +/-   ##
=========================================
+ Coverage   88.27%   88.5%   +0.22%     
=========================================
  Files          17      17              
  Lines         725     722       -3     
  Branches       54      58       +4     
=========================================
- Hits          640     639       -1     
+ Misses         85      83       -2
Impacted Files Coverage Δ
src/main/scala/org/graphframes/GraphFrame.scala 86.63% <100%> (+0.4%) ⬆️
...in/spark-2.x/org/apache/spark/sql/SQLHelpers.scala 36.36% <0%> (-63.64%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 490f51a...398d8c6. Read the comment docs.

@nchammas
Copy link
Contributor

nchammas commented May 4, 2017

@mengxr - I'll try to test this tomorrow or early next week.

After checking out this branch and building per the README instructions, how do I get this loaded on our YARN cluster and referenced by spark-submit for testing? Do I just copy the JAR to some spot on HDFS and reference it by path in the call to spark-submit --packages ...?

I don't see #159 when running locally, so I need to test this on our cluster.

@mengxr
Copy link
Contributor Author

mengxr commented May 4, 2017

You can try a local standalone cluster, which should give the same result as on YARN. Or you can build the graphframes jar by sbt "set test in assembly := {}" assembly, and then use --jars to include it in your job. --packages only fetches the official release.

val indexedVertices = zipWithUniqueId(vertices)
indexedVertices.select(col("uniq_id").as(LONG_ID), col("row." + ID).as(ID), col("row").as(ATTR))
val withLongIds = vertices.select(ID)
.distinct()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does distinct() do sorting or hashing already? Could you hash partition and then implement distinct() within each partition? I'm not sure how smart Catalyst is here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it doesn't specify which method it uses and it doesn't allow users to set a partition number. Based on the optimized plan, it does shuffle twice. I just wanted to put a quick fix for @nchammas to test whether this is the root cause.

indexedVertices.select(col("uniq_id").as(LONG_ID), col("row." + ID).as(ID), col("row").as(ATTR))
val withLongIds = vertices.select(ID)
.distinct()
.repartition(1024, col(ID))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can numPartitions be set according to the original DataFrame numPartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be very expensive if the original DataFrame has few partitions. Again, this is just a quick patch to see whether it fixes the problem. We need more work after confirmation.

@mengxr
Copy link
Contributor Author

mengxr commented May 8, 2017

@nchammas Did you get a chance to test the patch?

@mengxr
Copy link
Contributor Author

mengxr commented May 8, 2017

It is hard to reproduce the nondeterministic behavior on local. But it is fairly easy to show distinct doesn't guarantee ordering on a multi-node cluster:

screen shot 2017-05-08 at 8 56 35 am

@nchammas
Copy link
Contributor

nchammas commented May 8, 2017

@mengxr - Will try today.

@nchammas
Copy link
Contributor

nchammas commented May 8, 2017

@mengxr - I removed --packages and instead included the locally built graphframes-assembly-0.4.0-SNAPSHOT-spark2.1.jar via --jars. However, Spark can't find GraphFrames:

ImportError: No module named 'graphframes'

I built GraphFrames using sbt "set test in assembly := {}" assembly from your comment above. Is there anything else I need to do?

@mengxr
Copy link
Contributor Author

mengxr commented May 8, 2017

Did you use python or scala?

@nchammas
Copy link
Contributor

nchammas commented May 8, 2017

My app is Python.

@mengxr
Copy link
Contributor Author

mengxr commented May 8, 2017

Then you also need to include the python files, which should be included in the jar. Could you test it in Python instead?

@nchammas
Copy link
Contributor

nchammas commented May 8, 2017

So I need to include python/graphframes/graphframe.py as well, I guess? Or should I zip up all of python/graphframes and include that? Probably the latter.

Could you test it in Python instead?

I presume you meant Scala here, but my app is in Python and I'd like to test this with my app to be sure.

@mengxr
Copy link
Contributor Author

mengxr commented May 8, 2017

Yes, you need the python file, which should be bundled in the assembly jar. But you need to put the jar on the PYTHONPATH. So please add it to both --jars and --py-files:

--jars target/scala-2.11/graphframes-assembly-0.4.0-SNAPSHOT-spark2.1.jar --py-files target/scala-2.11/graphframes-assembly-0.4.0-SNAPSHOT-spark2.1.jar

@nchammas
Copy link
Contributor

nchammas commented May 8, 2017

OK adding the JAR to --py-files worked, thanks.

Looks like GraphFrames @ 2c89c6d fixes the problem for me. 👏 Do I need to test again @ 398d8c6?

@mengxr
Copy link
Contributor Author

mengxr commented May 8, 2017

Not necessary, it is just an optimization. Thanks for confirming the fix! I will update the PR to make it looks better:) And we will probably make a bug fix release after since this is critical.

@mengxr
Copy link
Contributor Author

mengxr commented May 8, 2017

@phi-dbq will take over the follow-up tasks.

@mengxr
Copy link
Contributor Author

mengxr commented May 11, 2017

I'm closing this PR for now. @phi-dbq will take the follow-up work in #195 .

@mengxr mengxr closed this May 11, 2017
@nchammas
Copy link
Contributor

@phi-dbq will take the follow-up work in #159.

I think you mean #195.

@mengxr mengxr changed the title [WIP] [#159] Fix non-deterministic ID assignment [#159] Fix non-deterministic ID assignment Oct 15, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants