-
Notifications
You must be signed in to change notification settings - Fork 256
Do not orphan out of scope persisted dataframes in ConnectedComponent… #459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@rjurney can you take a look? |
|
@james-willis thanks for this, will look it over and test it |
|
@SemyonSinchenko @bjornjorgensen could one of you plz take a look at this? Is this involved in the recent issue @SauronShepherd investigated in Spark? |
| vv.join(ee, vv(ID) === ee(DST), "left_outer") | ||
| .select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT)) | ||
| .select(col(s"$ATTR.*"), col(COMPONENT)) | ||
| .select(col(s"$ATTR.*"), col(COMPONENT)).persist(intermediateStorageLevel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a user-facing change because from now an output of ConnectedComponents.run() is a persisted DataFrame. It may create problems in some cases, for example, if users are calling it multiple time or for multiple small subgraphs. In that case persisted DataFrame's will be in memory forever. I think we should at least change the documentation of the function to explicitly warning an end user that an output is persisted from now. As a better solution I would like to move it to one of the further releases with breaking changes (like 1.0) and disable persisting of the output for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we, maybe, only unpersist the temporary cache to fix the bug and make persisting of the output optional or disabled until the new big release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may create problems in some cases, for example, if users are calling it multiple time or for multiple small subgraphs.
This is already the kind of issue users are encountering today because of the orphaned intermediate dataframes. At least with this change the dataframe that is persisted is one that is available in the user's scope. This way the user can unpersist what is persisted by graphframes without nuking their entire cache.
I view this as only an improvement in regards to this kind of memory leak issue caused by this caching bug.
Can we, maybe, only unpersist the temporary cache to fix the bug and make persisting of the output optional or disabled until the new big release?
If you unpersist the temporary cache before you cache the output, you will have a cache misses when you materialize the output dataframe. Because of the checkpointing code this isn't as bad as having no persisting at all, but can be quite expensive on larger graphs. TBH I don't have a good sense of how expensive your proposed solution would be in terms of execution time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually discovered this bug in sedona when our unit tests were going OOM in a feature that uses graphframes' connected components
See here: apache/sedona#1589
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote an article discussing this issue. The method tracks every persisted DataFrame, so I don’t see how a memory leak could occur because of that. The OOM issue I’ve been analyzing is not directly related to persisting DataFrames but rather to how Spark constantly generates string representations of execution plans. This behavior is independent of unpersisting DataFrames and is not resolved by doing so. If the bug in Sedona gets fixed by disabling AQE, most probably you're dealing with the same global issue, and it's not a bug of Graphframes, but a bug of Spark SPARK-50992. I've even created a PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the Spark master to local[4] (instead of utilizing all 16 cores on my laptop) increases the maximum heap size to 6GB, but the test still passes. However, I noticed that the number of SparkSession instances reached 117. Each time a DF is persisted, a new SparkSession is created, which is then destroyed when the DF is unpersisted.
The memory leak in GraphFrames, caused by not unpersisting the last round of DFs, isn't that big. However, I realised that Sedona is using graphframes-0.8.3-spark3.4-s_2.12, which lacks the unpersist loop. This issue was addressed in a fix introduced in 2024, while the ConnectedComponents class used in Sedona dates back to 2023.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heap dump also showed that the memory usage was dominated by
UnsafeHashRelations originated from BroadcastExchange. We worked around this problem by disabling broadcast but we'd still like to know the root cause of this problem.
Thanks to your example, I've been able to reproduce the issue, but only for Spark versions 3.5.x. In Spark 3.4.4, the number of UnsafeHashedRelation instances seems to increase only by 1 per iteration.
Also, in Spark 3.5.x, disabling the AQE also fixes the problem. Looks like something changed from 3.4 to 3.5 and now, when a persisted DF with broadcast is not unpersisted, things get somehow out of control ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SauronShepherd so what do we do about this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be fixed with the latest GraphFrames changes (temporarily disabling AQE). Also, I'd like to submit my ConnectedComponents changes (a new experimental implementation) soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, sounds good.
e4a2024 to
3d0dcf4
Compare
| .select(col(s"$ATTR.*"), col(COMPONENT)).persist(intermediateStorageLevel) | ||
|
|
||
| // materialize the output DataFrame | ||
| output.count() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, can we just persist with eager=true instead of calling an action? I think a lot of code were written with old spark and we can later simplify a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Peiyuan! I think this PR isnt getting merged so I'm going to abandon it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SauronShepherd verify this doesn't need merging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think an action is needed in any case, including both the existing code and the newly added count(). Apart from that, I believe it's a good practice to unpersist DataFrames that are no longer accessible once the method ends. So, it would be a good idea to proceed with the merge once the count() is removed.
For my new experimental ConnectedComponents version, I'd like to explore a different way to address this issue (if that's possible).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we intend to merge one of these PRs, I've copied my comment over from the other PR:
I believe persist is lazy and does not offer an eager flag. Will this code actually wind up using the cached dataframes if we dont cache the output df before we unpersist the child dataframes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to add a test to show this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we intend to merge one of these PRs, I've copied my comment over from the other PR:
I believe persist is lazy and does not offer an eager flag. Will this code actually wind up using the cached dataframes if we dont cache the output df before we unpersist the child dataframes?
yeah,the checkpoint function has eager option but not the persist.
3d0dcf4 to
d3bbb00
Compare
|
Closed because #552 was merged. |
…s.run function
Fix for #458