-
Notifications
You must be signed in to change notification settings - Fork 257
[ConnectedComponents] Memory leak with unpersisted DataFrames in the last round #552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ConnectedComponents] Memory leak with unpersisted DataFrames in the last round #552
Conversation
|
JFYI, you can use |
I didn't change a line, only removed the counts. The code-style error was introduced previously in the #459 PR. |
SemyonSinchenko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please update a docstring with information, that the ouptut is persisted DataFrame? Because it is a user-facing change.
|
Technically speaking, the last round of DataFrames has already been persisted before this PR, so the only difference now is that it will persist only one DataFrame (the last one). Additionally, this PR is just a small part of the changes I plan to introduce in the ConnectedComponents. I think it's fine to update the documentation, but do you mind if I update it after completing all the changes I need to make? |
I'm absolutely fine with it! |
| .join(ee, vv(ID) === ee(DST), "left_outer") | ||
| .select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT)) | ||
| .select(col(s"$ATTR.*"), col(COMPONENT)) | ||
| .persist(intermediateStorageLevel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe persist is lazy and does not offer an eager flag. Will this code actually wind up using the cached dataframes if we dont cache the output df before we unpersist the child dataframes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only when an action is executed the dataframe needs to be persisted, in order to reuse those previous calculations.
Calculations are performed in the last round and, on that dataframe once the loop ends, another transformation is applied and then cached (but no new calculations have been performed because there's no action involved).
Nothing changes, only that the persisted dataframe is the one the method is returning, instead of the previous dataframe the last transformations are applied and then, the resulting dataframe returning to the user. So the user can unpersist the dataframe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the diff with and without the count call. removing the count call causes a cache miss: https://www.diffchecker.com/i57B411V/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you see a cache miss? Because I'm debugging the "single vertex" unit test and there's one DataFrame cached and a InMemoryTableScan in the plan:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [id#1136L, vattr#1137, gender#1138, component#1133L]
+- InMemoryRelation [id#1136L, vattr#1137, gender#1138, component#1133L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- LocalTableScan [id#1136L, vattr#1137, gender#1138, component#1133L]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if I am just struggling to understand but I think the count is necessary.
Only when an action is executed the dataframe needs to be persisted, in order to reuse those previous calculations.
If you want the output dataframe to leverage the persisted child dataframes in its query plan, you need to call an action on the output dataframe before those children have unpersist called. Without the count call you will not utilize the cached version of the children dataframes when caching the output dataframe.
another transformation is applied and then cached
I don't agree. cache and unpersist are lazy in spark, so the dataframe is only marked for caching. It is not actually cached until some action is called. Without the count call the action will always be after the children query plans have been unpersisted and so they will be recalculated by the engine. This defeats the purpose of those persist calls.
I tried to add a test for this in my PR:
| test("uses intermediate caches") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm debugging the "single vertex" unit test and there's one DataFrame cached and a InMemoryTableScan in the plan
I believe this is an edge case because spark is optimizing away the second child of the join because ee is an empty LocalRelation.
I believe the chain graph test is more representative because there are edges in the table. There you will see only the top-level InMemoryRelation when the count call is removed and 16 when it is in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the count being necessary. I think it might be the case the counts inside the loop aren't needed, as other actions like _calcMinNbrSum will trigger the DataFrame to cache. But in this case at the end, since everything is being unpersisted, output will be completely calculated from the last checkpoint when the user does something with it with none of the intermediate caching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it's simple ... let's probe it with a long dataset a then see if it takes longer or not.
| .join(ee, vv(ID) === ee(DST), "left_outer") | ||
| .select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT)) | ||
| .select(col(s"$ATTR.*"), col(COMPONENT)) | ||
| .persist(intermediateStorageLevel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if I am just struggling to understand but I think the count is necessary.
Only when an action is executed the dataframe needs to be persisted, in order to reuse those previous calculations.
If you want the output dataframe to leverage the persisted child dataframes in its query plan, you need to call an action on the output dataframe before those children have unpersist called. Without the count call you will not utilize the cached version of the children dataframes when caching the output dataframe.
another transformation is applied and then cached
I don't agree. cache and unpersist are lazy in spark, so the dataframe is only marked for caching. It is not actually cached until some action is called. Without the count call the action will always be after the children query plans have been unpersisted and so they will be recalculated by the engine. This defeats the purpose of those persist calls.
I tried to add a test for this in my PR:
| test("uses intermediate caches") { |
|
I totally understand your doubts, because it's not easy to explain, don't worry. I'm not 100% sure either, but this is how I think this works: This algorithm persists intermediate results of the iteration i, executes the Once the loop has ended and the last action is performed in As you said, given that persist and unpersist are lazy operations, once you have persisted a new dataframe, you can perfectly unpersist the previous dataframes that generated it. So, I don't think an action is needed every time we persist a dataframe in order to unpersist it. |
|
This is a super interesting topic, btw. I have in mind to write a couple of articles about how cache works in Spark. |
This is not what I am saying. I say "given that persist and unpersist are lazy operations, once you have persisted a new dataframe, you can" NOT "perfectly unpersist", because persisting the dataframe does not place the dataframe into the cache! When query execution occurs, the cache lookup will not return the unpersisted dataframe and the results are calculated with 0 cache hits.
I think the test that I added to my PR and the executed plans does prove you wrong, but I have gone ahead and create a minimal example showing how caching works in spark. you can add this to the ConnectedComponents test suite and play with it yourself: test("query planning and cache") {
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.catalog.clearCache()
val df1 = spark.sql("SELECT 1 * 10 AS col1").cache()
df1.count()
val df2 = df1.withColumn("col2", expr("col1 * 1000"))
df2.persist()
df1.unpersist()
df2.collect()
val noCountExecutedPlan = df2.queryExecution.executedPlan
spark.catalog.clearCache()
val df3 = spark.sql("SELECT 1 * 10 AS col1").cache()
df3.count()
val df4 = df3.withColumn("col2", expr("col1 * 1000"))
df4.persist().count()
df3.unpersist()
df4.collect()
val countExecutedPlan = df4.queryExecution.executedPlan
println("---Plan without Count---")
println(noCountExecutedPlan)
println("---Plan with Count---")
println(countExecutedPlan)
}gives the printed output: Edit: updated example to disable AQE to be easier to understand |
|
Have you had a look at the comments in the So, theoretically, if we build another Dataset based on the persisted one and persist it, we could perfectly unpersist the previous one. |
|
However ... I think you're right and I was wrong. Seems like while In the following test, if persisting/unpersisting dataframes worked as I thought, only one message should be printed, instead we have two. Not only that but, as you also showed in your test, the explain plan changes after unpersisting the first dataframe. When an action is perfomed on the second dataframe before unpersisting the first one, the plan doesn't change. I think the comments of these methods are wrong or, at least, misleading. Btw, are we sure performing a |
|
Before merging this PR, I'd like to ensure that we're persisting everything necessary and that Spark is not recalculating anything. |
|
Im going to speak from experience rather than any inspection of source code in this comment.
A dataframe that has had cache or persist called is not built yet. its is only built when actions are called on it. certain actions only build part of the dataframe (show for example might not cache all of the df).
When I have checked, count has always cached 100% of the data for me as long as the storage is available (and of course there are no executor failures.). |
|
There's still something weird ... when I replace the Not only that, but the same behaviour is observed in the ConnectedComponents tests "single vertex" and "intermediate storage level": there are no repeated messages - per How can we explain that? |
|
As I said earlier, single vertex case is not representative. I prefer the chain graph case.
You ask
You are returning the same dataframe in your lambda. so the print isn't part of the query plan. pyspark docs are more helpful than scala docs at describing what the transform method does: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.transform.html Try this instead: val myUdf = udf((x: Int) => {
println("&tk")
x
})
ee = ee.withColumn(SRC, myUdf(col(SRC)))then count the |
|
I got mixed up and thought I was using the transform UDF (I didn't know the DataFrame had that method too). Sorry. I've discovered how to determine whether the cache buffers of a DataFrame have been loaded or not: When no action is performed on We only need to execute
Btw, what about logging a warning message to inform the user that the returned DataFrame is persisted? Something like ... |
|
Codecov ReportAll modified and coverable lines are covered by tests ✅
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## master #552 +/- ##
==========================================
- Coverage 91.43% 89.85% -1.58%
==========================================
Files 18 18
Lines 829 907 +78
Branches 52 118 +66
==========================================
+ Hits 758 815 +57
- Misses 71 92 +21 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Im'm trying to see how we can be confident that each of these dataframes is 100% cached (rather than partially) Edit: it does, as seen in InMemoryRelation.isCachedRDDLoaded |
|
Well, as you mentioned, the Spark UI shows 100% is cached. Besides, because the dataframes have only a few columns ... there shouldn't be any problem. We could replace the counts with |
|
@james-willis you have permissions to approve? Can you squash and merge? Can you cut a release? |
I think only you, @rjurney, have a bage "Collaborator" and the write access. All of us have only "Contributor" badges. |
|
I am not even a contributor to this repo yet! haha |
I liked the "yet" part 😂. I had a look at Sedona also to contribute somehow but ... maybe it's to high level for me ... 😂 |
|
It seems that @SemyonSinchenko is a Collaborator now. Perhaps we can finally get this fix merged after 6 months |
|
@SauronShepherd Are you going to do anything else here? Or can it be merged? |
|
I have in mind adding a few more changes, but I'll open one (or probably more) PRs. Please, go ahead with the merge. Thanks This PR is related to #459 (maybe I shouldn't have opened a new one,). We should close both once the merge is carried out. |
What changes were proposed in this pull request?
This PR is based on the #459.
Counts to materialize persisted DataFrame have been removed until proven strictly necessary.
Why are the changes needed?
There's a memory leak in GraphFrames, caused by not unpersisting the last round of DFs,