Skip to content

Conversation

@james-willis
Copy link
Collaborator

…s.run function

Fix for #458

@james-willis
Copy link
Collaborator Author

@rjurney can you take a look?

@rjurney
Copy link
Collaborator

rjurney commented Jan 24, 2025

@james-willis thanks for this, will look it over and test it

@rjurney
Copy link
Collaborator

rjurney commented Feb 11, 2025

@SemyonSinchenko @bjornjorgensen could one of you plz take a look at this? Is this involved in the recent issue @SauronShepherd investigated in Spark?

vv.join(ee, vv(ID) === ee(DST), "left_outer")
.select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT))
.select(col(s"$ATTR.*"), col(COMPONENT))
.select(col(s"$ATTR.*"), col(COMPONENT)).persist(intermediateStorageLevel)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a user-facing change because from now an output of ConnectedComponents.run() is a persisted DataFrame. It may create problems in some cases, for example, if users are calling it multiple time or for multiple small subgraphs. In that case persisted DataFrame's will be in memory forever. I think we should at least change the documentation of the function to explicitly warning an end user that an output is persisted from now. As a better solution I would like to move it to one of the further releases with breaking changes (like 1.0) and disable persisting of the output for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we, maybe, only unpersist the temporary cache to fix the bug and make persisting of the output optional or disabled until the new big release?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may create problems in some cases, for example, if users are calling it multiple time or for multiple small subgraphs.

This is already the kind of issue users are encountering today because of the orphaned intermediate dataframes. At least with this change the dataframe that is persisted is one that is available in the user's scope. This way the user can unpersist what is persisted by graphframes without nuking their entire cache.

I view this as only an improvement in regards to this kind of memory leak issue caused by this caching bug.

Can we, maybe, only unpersist the temporary cache to fix the bug and make persisting of the output optional or disabled until the new big release?

If you unpersist the temporary cache before you cache the output, you will have a cache misses when you materialize the output dataframe. Because of the checkpointing code this isn't as bad as having no persisting at all, but can be quite expensive on larger graphs. TBH I don't have a good sense of how expensive your proposed solution would be in terms of execution time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually discovered this bug in sedona when our unit tests were going OOM in a feature that uses graphframes' connected components

See here: apache/sedona#1589

Copy link
Contributor

@SauronShepherd SauronShepherd Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote an article discussing this issue. The method tracks every persisted DataFrame, so I don’t see how a memory leak could occur because of that. The OOM issue I’ve been analyzing is not directly related to persisting DataFrames but rather to how Spark constantly generates string representations of execution plans. This behavior is independent of unpersisting DataFrames and is not resolved by doing so. If the bug in Sedona gets fixed by disabling AQE, most probably you're dealing with the same global issue, and it's not a bug of Graphframes, but a bug of Spark SPARK-50992. I've even created a PR

Copy link
Contributor

@SauronShepherd SauronShepherd Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the Spark master to local[4] (instead of utilizing all 16 cores on my laptop) increases the maximum heap size to 6GB, but the test still passes. However, I noticed that the number of SparkSession instances reached 117. Each time a DF is persisted, a new SparkSession is created, which is then destroyed when the DF is unpersisted.

The memory leak in GraphFrames, caused by not unpersisting the last round of DFs, isn't that big. However, I realised that Sedona is using graphframes-0.8.3-spark3.4-s_2.12, which lacks the unpersist loop. This issue was addressed in a fix introduced in 2024, while the ConnectedComponents class used in Sedona dates back to 2023.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heap dump also showed that the memory usage was dominated by UnsafeHashRelations originated from BroadcastExchange. We worked around this problem by disabling broadcast but we'd still like to know the root cause of this problem.

Thanks to your example, I've been able to reproduce the issue, but only for Spark versions 3.5.x. In Spark 3.4.4, the number of UnsafeHashedRelation instances seems to increase only by 1 per iteration.

Also, in Spark 3.5.x, disabling the AQE also fixes the problem. Looks like something changed from 3.4 to 3.5 and now, when a persisted DF with broadcast is not unpersisted, things get somehow out of control ...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SauronShepherd so what do we do about this issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fixed with the latest GraphFrames changes (temporarily disabling AQE). Also, I'd like to submit my ConnectedComponents changes (a new experimental implementation) soon.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds good.

.select(col(s"$ATTR.*"), col(COMPONENT)).persist(intermediateStorageLevel)

// materialize the output DataFrame
output.count()
Copy link

@ericsun95 ericsun95 Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, can we just persist with eager=true instead of calling an action? I think a lot of code were written with old spark and we can later simplify a lot.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Peiyuan! I think this PR isnt getting merged so I'm going to abandon it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SauronShepherd verify this doesn't need merging?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think an action is needed in any case, including both the existing code and the newly added count(). Apart from that, I believe it's a good practice to unpersist DataFrames that are no longer accessible once the method ends. So, it would be a good idea to proceed with the merge once the count() is removed.

For my new experimental ConnectedComponents version, I'd like to explore a different way to address this issue (if that's possible).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we intend to merge one of these PRs, I've copied my comment over from the other PR:

I believe persist is lazy and does not offer an eager flag. Will this code actually wind up using the cached dataframes if we dont cache the output df before we unpersist the child dataframes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to add a test to show this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we intend to merge one of these PRs, I've copied my comment over from the other PR:

I believe persist is lazy and does not offer an eager flag. Will this code actually wind up using the cached dataframes if we dont cache the output df before we unpersist the child dataframes?

yeah,the checkpoint function has eager option but not the persist.

@SauronShepherd
Copy link
Contributor

I've just opened a related PR (#552) with the counts removed. Could we close this PR and, if nobody has any objections, approve the other one (#552)?

@SemyonSinchenko
Copy link
Collaborator

Closed because #552 was merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants