-
Notifications
You must be signed in to change notification settings - Fork 256
Description
I am seeing some surprising and very worrying behavior for connectedComponents(). I am running it on a graph with ~300K vertices and ~275K edges. When I run it locally, I get consistent, correct results. When I run it on our cluster, I get inconsistent, incorrect results.
I can't share a repro at the moment, so let me elaborate a bit on how the results are incorrect when I run connectedComponents() on our cluster.
When I run connectedComponents() on the cluster, about 20-30% of the component IDs returned (so, ~60K-100K) are the same. It's usually a low number from 1 to 10. This is totally incorrect, since I know from running the same code locally and inspecting the results that 200 vertices at most should share the same component ID.
Additionally, when I re-run the same job on our cluster I get different counts every time. So one time it may be ~60K vertices that get assigned a component ID of 0; another time ~80K vertices will be assigned a component ID of 7; and so on. I don't see any such behavior when running locally. When I run this same job locally, I get consistent counts: The most frequently occurring component ID is always shared by exactly 171 vertices.
I don't have any hints about what may be going wrong on the cluster, but occasionally I do see errors like this pop up right around when connectedComponents() runs:
WARN TransportChannelHandler: Exception in connection from /x.x.x.x:55250
ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /x.x.x.x:55250 is closed
WARN BlockManagerMaster: Failed to remove broadcast 56 with removeFromMaster = true - Connection reset by peer
I don't see them every time though, so they may not be related to the incorrect behavior I'm seeing.
I did figure out a way to work around this problem. If I persist() the vertices and edges before calling connectedComponents(), I get correct, consistent results like I do when running locally. So this smells like an optimizer bug in Spark, loosely related to others like SPARK-18589 and SPARK-18492 where adding in a persist() at the right place works around the issue.
If someone is interested in tracking this down, I would be happy to work with them to shed more light on what's happening and perhaps boil things down to a reproducible example.
Cluster Environment:
- GraphFrames 0.3.0
- Spark 2.0.2 or Spark 2.1.0
- Scala 2.11
- Java 7
- Python 3.5
- YARN 2.6.0, CDH 5.5.6
Local Environment (same as cluster except for):
- Spark Standalone (no YARN)
- Java 7 or Java 8