Skip to content

Conversation

@SauronShepherd
Copy link
Contributor

What changes were proposed in this pull request?

This PR is based on the #459.
Counts to materialize persisted DataFrame have been removed until proven strictly necessary.

Why are the changes needed?

There's a memory leak in GraphFrames, caused by not unpersisting the last round of DFs,

@SauronShepherd SauronShepherd changed the title Cc caching [ConnectedComponents] Memory leak with unpersisted DataFrames in the last round Mar 21, 2025
@SemyonSinchenko
Copy link
Collaborator

@SauronShepherd
Copy link
Contributor Author

JFYI, you can use pre-commit to avoid code-style errors: https://github.com/graphframes/graphframes/blob/master/CONTRIBUTING.md#styleguides

I didn't change a line, only removed the counts. The code-style error was introduced previously in the #459 PR.

Copy link
Collaborator

@SemyonSinchenko SemyonSinchenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please update a docstring with information, that the ouptut is persisted DataFrame? Because it is a user-facing change.

@SauronShepherd
Copy link
Contributor Author

Technically speaking, the last round of DataFrames has already been persisted before this PR, so the only difference now is that it will persist only one DataFrame (the last one). Additionally, this PR is just a small part of the changes I plan to introduce in the ConnectedComponents. I think it's fine to update the documentation, but do you mind if I update it after completing all the changes I need to make?

@SemyonSinchenko
Copy link
Collaborator

I think it's fine to update the documentation, but do you mind if I update it after completing all the changes I need to make?

I'm absolutely fine with it!

.join(ee, vv(ID) === ee(DST), "left_outer")
.select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT))
.select(col(s"$ATTR.*"), col(COMPONENT))
.persist(intermediateStorageLevel)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe persist is lazy and does not offer an eager flag. Will this code actually wind up using the cached dataframes if we dont cache the output df before we unpersist the child dataframes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only when an action is executed the dataframe needs to be persisted, in order to reuse those previous calculations.

Calculations are performed in the last round and, on that dataframe once the loop ends, another transformation is applied and then cached (but no new calculations have been performed because there's no action involved).

Nothing changes, only that the persisted dataframe is the one the method is returning, instead of the previous dataframe the last transformations are applied and then, the resulting dataframe returning to the user. So the user can unpersist the dataframe.

Copy link
Collaborator

@james-willis james-willis Mar 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the diff with and without the count call. removing the count call causes a cache miss: https://www.diffchecker.com/i57B411V/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you see a cache miss? Because I'm debugging the "single vertex" unit test and there's one DataFrame cached and a InMemoryTableScan in the plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [id#1136L, vattr#1137, gender#1138, component#1133L]
+- InMemoryRelation [id#1136L, vattr#1137, gender#1138, component#1133L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- LocalTableScan [id#1136L, vattr#1137, gender#1138, component#1133L]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I am just struggling to understand but I think the count is necessary.

Only when an action is executed the dataframe needs to be persisted, in order to reuse those previous calculations.

If you want the output dataframe to leverage the persisted child dataframes in its query plan, you need to call an action on the output dataframe before those children have unpersist called. Without the count call you will not utilize the cached version of the children dataframes when caching the output dataframe.

another transformation is applied and then cached

I don't agree. cache and unpersist are lazy in spark, so the dataframe is only marked for caching. It is not actually cached until some action is called. Without the count call the action will always be after the children query plans have been unpersisted and so they will be recalculated by the engine. This defeats the purpose of those persist calls.

I tried to add a test for this in my PR:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm debugging the "single vertex" unit test and there's one DataFrame cached and a InMemoryTableScan in the plan

I believe this is an edge case because spark is optimizing away the second child of the join because ee is an empty LocalRelation.

I believe the chain graph test is more representative because there are edges in the table. There you will see only the top-level InMemoryRelation when the count call is removed and 16 when it is in place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the count being necessary. I think it might be the case the counts inside the loop aren't needed, as other actions like _calcMinNbrSum will trigger the DataFrame to cache. But in this case at the end, since everything is being unpersisted, output will be completely calculated from the last checkpoint when the user does something with it with none of the intermediate caching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's simple ... let's probe it with a long dataset a then see if it takes longer or not.

.join(ee, vv(ID) === ee(DST), "left_outer")
.select(vv(ATTR), when(ee(SRC).isNull, vv(ID)).otherwise(ee(SRC)).as(COMPONENT))
.select(col(s"$ATTR.*"), col(COMPONENT))
.persist(intermediateStorageLevel)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I am just struggling to understand but I think the count is necessary.

Only when an action is executed the dataframe needs to be persisted, in order to reuse those previous calculations.

If you want the output dataframe to leverage the persisted child dataframes in its query plan, you need to call an action on the output dataframe before those children have unpersist called. Without the count call you will not utilize the cached version of the children dataframes when caching the output dataframe.

another transformation is applied and then cached

I don't agree. cache and unpersist are lazy in spark, so the dataframe is only marked for caching. It is not actually cached until some action is called. Without the count call the action will always be after the children query plans have been unpersisted and so they will be recalculated by the engine. This defeats the purpose of those persist calls.

I tried to add a test for this in my PR:

@SauronShepherd
Copy link
Contributor Author

I totally understand your doubts, because it's not easy to explain, don't worry. I'm not 100% sure either, but this is how I think this works:

This algorithm persists intermediate results of the iteration i, executes the _calcMinNbrSum (that contains an action) to check whether the algorithm has converged or not and then, unpersists the intermediate results of the iteration i-1 (or the ones previous to the loop in the first iteration).

Once the loop has ended and the last action is performed in _calcMinNbrSum, the last resulting dataframe is obtained and persisted and then - and only then - the previous ones are unpersisted.

As you said, given that persist and unpersist are lazy operations, once you have persisted a new dataframe, you can perfectly unpersist the previous dataframes that generated it. So, I don't think an action is needed every time we persist a dataframe in order to unpersist it.

@SauronShepherd
Copy link
Contributor Author

This is a super interesting topic, btw. I have in mind to write a couple of articles about how cache works in Spark.
I think I'm right, but it would be great if you can probe me wrong ... because I'd learn something new about how persist really works in Spark.

@james-willis
Copy link
Collaborator

james-willis commented Mar 24, 2025

As you said, given that persist and unpersist are lazy operations, once you have persisted a new dataframe, you can perfectly unpersist the previous dataframes that generated it.

This is not what I am saying. I say "given that persist and unpersist are lazy operations, once you have persisted a new dataframe, you can" NOT "perfectly unpersist", because persisting the dataframe does not place the dataframe into the cache! When query execution occurs, the cache lookup will not return the unpersisted dataframe and the results are calculated with 0 cache hits.

it would be great if you can [prove] me wrong

I think the test that I added to my PR and the executed plans does prove you wrong, but I have gone ahead and create a minimal example showing how caching works in spark. you can add this to the ConnectedComponents test suite and play with it yourself:

  test("query planning and cache") {
    spark.conf.set("spark.sql.adaptive.enabled", "false")
    spark.catalog.clearCache()
    val df1 = spark.sql("SELECT 1 * 10 AS col1").cache()
    df1.count()

    val df2 = df1.withColumn("col2", expr("col1 * 1000"))

    df2.persist()
    df1.unpersist()
    df2.collect()
    val noCountExecutedPlan = df2.queryExecution.executedPlan

    spark.catalog.clearCache()
    val df3 = spark.sql("SELECT 1 * 10 AS col1").cache()
    df3.count()

    val df4 = df3.withColumn("col2", expr("col1 * 1000"))

    df4.persist().count()
    df3.unpersist()
    df4.collect()
    val countExecutedPlan = df4.queryExecution.executedPlan

    println("---Plan without Count---")
    println(noCountExecutedPlan)
    println("---Plan with Count---")
    println(countExecutedPlan)

  }

gives the printed output:

---Plan without Count---
InMemoryTableScan [col1#0, col2#38]
   +- InMemoryRelation [col1#0, col2#38], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [10 AS col1#0, 10000 AS col2#38]
            +- *(1) Scan OneRowRelation[]

---Plan with Count---
InMemoryTableScan [col1#119, col2#157]
   +- InMemoryRelation [col1#119, col2#157], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [col1#119, (col1#119 * 1000) AS col2#157]
            +- InMemoryTableScan [col1#119]
                  +- InMemoryRelation [col1#119], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(1) Project [10 AS col1#119]
                           +- *(1) Scan OneRowRelation[]

Edit: updated example to disable AQE to be easier to understand

@SauronShepherd
Copy link
Contributor Author

Have you had a look at the comments in the unpersist method?

   * Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
   * This will not un-persist any cached data that is built upon this Dataset.

So, theoretically, if we build another Dataset based on the persisted one and persist it, we could perfectly unpersist the previous one.

@SauronShepherd
Copy link
Contributor Author

SauronShepherd commented Mar 24, 2025

However ... I think you're right and I was wrong. Seems like while persist is lazy, unpersist is not.

In the following test, if persisting/unpersisting dataframes worked as I thought, only one message should be printed, instead we have two. Not only that but, as you also showed in your test, the explain plan changes after unpersisting the first dataframe. When an action is perfomed on the second dataframe before unpersisting the first one, the plan doesn't change.
THANKS FOR PROBING ME WRONG!

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

object SparkCacheExample2 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Create the first DataFrame with one row
    var firstDF = spark.range(0,1)

    firstDF = firstDF.map(x => {
      print("I'm the first DataFrame!!");
      x
    })

    // Cache and show the first DataFrame
    firstDF.cache().show()

    // Create a second DataFrame based on the first
    val secondDF = firstDF.select(firstDF("value").cast(StringType))

    // Cache the second DataFrame, unpersist the first DataFrame and show the second DataFrame
    secondDF.cache()
    secondDF.explain()
    firstDF.unpersist()
    secondDF.explain()
    secondDF.show()

    // Stop Spark session
    spark.stop()
  }
}

I think the comments of these methods are wrong or, at least, misleading.

Btw, are we sure performing a count is caching everything from the dataframes?

@SauronShepherd
Copy link
Contributor Author

Before merging this PR, I'd like to ensure that we're persisting everything necessary and that Spark is not recalculating anything.

@james-willis
Copy link
Collaborator

Im going to speak from experience rather than any inspection of source code in this comment.

  • This will not un-persist any cached data that is built upon this Dataset.

A dataframe that has had cache or persist called is not built yet. its is only built when actions are called on it. certain actions only build part of the dataframe (show for example might not cache all of the df).

are we sure performing a count is caching everything from the dataframes?

When I have checked, count has always cached 100% of the data for me as long as the storage is available (and of course there are no executor failures.).

@SauronShepherd
Copy link
Contributor Author

SauronShepherd commented Mar 24, 2025

There's still something weird ... when I replace the map transformation with transform in my test - replacing also the "value" reference with "id"- the message gets printed only once.

Not only that, but the same behaviour is observed in the ConnectedComponents tests "single vertex" and "intermediate storage level": there are no repeated messages - per run execution - when I add the following code just after the checkpoint block inside the loop:

          System.gc() // hint Spark to clean shuffle directories
        }
        ee = ee.transform(x => {
          println(s"Iteration: $iteration")
          x
        })

        ee.persist(intermediateStorageLevel)

How can we explain that?

@james-willis
Copy link
Collaborator

james-willis commented Mar 24, 2025

As I said earlier, single vertex case is not representative. I prefer the chain graph case.

I believe this is an edge case because spark is optimizing away the second child of the join because ee is an empty LocalRelation.

You ask

How can we explain that?

You are returning the same dataframe in your lambda. so the print isn't part of the query plan. pyspark docs are more helpful than scala docs at describing what the transform method does: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.transform.html

Try this instead:

        val myUdf = udf((x: Int) => {
          println("&tk")
          x
        })
        ee = ee.withColumn(SRC, myUdf(col(SRC)))

then count the &tks that come out. It is 12 w/ count and 28 w/o when using the chain graph case.

@SauronShepherd
Copy link
Contributor Author

I got mixed up and thought I was using the transform UDF (I didn't know the DataFrame had that method too). Sorry.

I've discovered how to determine whether the cache buffers of a DataFrame have been loaded or not:

spark.sharedState.cacheManager.lookupCachedData(df).get.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded

When no action is performed on df, this returns false. After performing some action on the DataFrame, it returns true."

We only need to execute count on the output DataFrame to achieve both goals:

  • A persisted and loaded in-scope DataFrame for the caller
  • A significant reduction in the number of DataFrames with redundant actions performed

Btw, what about logging a warning message to inform the user that the returned DataFrame is persisted? Something like ...

logWarn("The DataFrame returned by ConnectedComponents is persisted and loaded.")

@codecov-commenter
Copy link

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 89.85%. Comparing base (bc487ef) to head (788378a).
Report is 17 commits behind head on master.

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #552      +/-   ##
==========================================
- Coverage   91.43%   89.85%   -1.58%     
==========================================
  Files          18       18              
  Lines         829      907      +78     
  Branches       52      118      +66     
==========================================
+ Hits          758      815      +57     
- Misses         71       92      +21     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@james-willis
Copy link
Collaborator

james-willis commented Mar 25, 2025

Im'm trying to see how we can be confident that each of these dataframes is 100% cached (rather than partially)

Edit: it does, as seen in InMemoryRelation.isCachedRDDLoaded

@SauronShepherd
Copy link
Contributor Author

Well, as you mentioned, the Spark UI shows 100% is cached. Besides, because the dataframes have only a few columns ... there shouldn't be any problem.

We could replace the counts with write.format("noop").mode("overwite").save() that will assure us the 100%.

@rjurney
Copy link
Collaborator

rjurney commented Mar 25, 2025

@james-willis you have permissions to approve? Can you squash and merge? Can you cut a release?

@SemyonSinchenko
Copy link
Collaborator

@james-willis you have permissions to approve? Can you squash and merge? Can you cut a release?

I think only you, @rjurney, have a bage "Collaborator" and the write access. All of us have only "Contributor" badges.

@james-willis
Copy link
Collaborator

I am not even a contributor to this repo yet! haha

@SauronShepherd
Copy link
Contributor Author

I am not even a contributor to this repo yet! haha

I liked the "yet" part 😂. I had a look at Sedona also to contribute somehow but ... maybe it's to high level for me ... 😂

@james-willis
Copy link
Collaborator

It seems that @SemyonSinchenko is a Collaborator now. Perhaps we can finally get this fix merged after 6 months

@SemyonSinchenko
Copy link
Collaborator

@SauronShepherd Are you going to do anything else here? Or can it be merged?

@SauronShepherd
Copy link
Contributor Author

SauronShepherd commented Mar 28, 2025

I have in mind adding a few more changes, but I'll open one (or probably more) PRs. Please, go ahead with the merge. Thanks

This PR is related to #459 (maybe I shouldn't have opened a new one,). We should close both once the merge is carried out.

@SemyonSinchenko SemyonSinchenko merged commit 8a719fb into graphframes:master Mar 28, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants