Skip to content

Conversation

@karuppayya
Copy link
Contributor

What changes were proposed in this pull request?

This change enables shuffle cleanup mode configuration in regular Spark SQL execution

Why are the changes needed?

Currently, shuffle cleanup mode configuration only works in Spark Connect but ignored in reguklar SQL execution

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added Unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@HyukjinKwon HyukjinKwon changed the title [SPARK-52777] Enable shuffle cleanup mode configuration in Spark SQL [SPARK-52777][SQL] Enable shuffle cleanup mode configuration in Spark SQL Jul 14, 2025
@karuppayya
Copy link
Contributor Author

@dongjoon-hyun @sunchao Can you of help review this change?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make a CI happy please, @karuppayya ?

cc @peter-toth

@karuppayya
Copy link
Contributor Author

karuppayya commented Jul 17, 2025

[info] - SPARK-33551: Do not use AQE shuffle read for repartition *** FAILED *** (78 milliseconds)
[info]   scala.Predef.refArrayOps[org.apache.spark.Partition](parts).forall(((x$1: org.apache.spark.Partition) => rdd.preferredLocations(x$1).nonEmpty)) was false (AdaptiveQueryExecSuite.scala:205)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$checkNumLocalShuffleReads$1(AdaptiveQueryExecSuite.scala:205)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.checkNumLocalShuffleReads(AdaptiveQueryExecSuite.scala:202)
[info]   at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.checkBHJ$1(AdaptiveQueryExecSuite.scala:1828)
[info]   at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$new$191(AdaptiveQueryExecSuite.scala:1888)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)
[info]   at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(AdaptiveQueryExecSuite.scala:60)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:253)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:251)
[info]   at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.withSQLConf(AdaptiveQueryExecSuite.scala:60)
[info]   at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$new$190(AdaptiveQueryExecSuite.scala:1884)

The failure seems unrelated(and seems to run fine locally). But let me rebase master and retrigger the CI

@karuppayya
Copy link
Contributor Author

cc: @bozhang2820 and @cloud-fan since it was introduced with #45930

@karuppayya
Copy link
Contributor Author

karuppayya commented Jul 18, 2025

The test failure was from my chnage.
Looks like default for SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, SHUFFLE_DEPENDENCY_SKIP_MIGRATION_ENABLED is Utils.isTesting,
which was forcing cleanup before the test assertions

Copy link
Contributor Author

@karuppayya karuppayya Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of today, my chnage affects only tests which have Adaptive execution enabled. The following test failed in CI(specific to adaptive)

SPARK-35695: get observable metrics with adaptive execution by callback

But i have another PR, which does the shuffle cleanup for non-adaptive paths.
So the conf have to set at the top level and not specific to adaptive execution. And so added to BeforeAll

Why the test failed?
org.apache.spark.sql.util.DataFrameCallbackSuite#validateObservedMetrics does collect() on same df twice.
With shuffle cleanup(enabled by default with tests), it result in recomputation, resulting in double counting of metrics(using CollectMetricsExec) and assertions fail
(This is potentially an issue with org.apache.spark.sql.execution.CollectMetricsExec#collectedMetrics? The accumulator needs to be reset in such case, otherwise the metrics are going to be inconsistent based on expression used(for eg: sum). cc: @dongjoon-hyun for your thoughts, which i can take in a subequent PR if needed)

@violet-nspct
Copy link

Should add unit tests to cover the following scenarios of handlePlan method in SparkConnectPlanExecution.scala?

  // Execution Paths
  test("execution paths properly handle shuffle cleanup modes") {
    // Tests different execution paths with cleanup modes
    // Covers: SparkSession.execute scenarios
  }

  // Query Interruption
  test("query interruption handles cleanup modes correctly") {
    // Tests interruption behavior
    // Covers: Query cancellation and cleanup
  }

  // Configuration
  test("handlePlan should use correct shuffle cleanup mode from configuration") {
    // Tests configuration combinations
    // Covers: Configuration handling
  }

  // Consistency
  test("shuffle cleanup mode should be consistent between Spark Connect and regular SQL") {
    // Tests consistency across execution methods
    // Covers: Cross-execution consistency
  }

  // Long-running queries with configuration changes
  test("long running queries handle configuration changes correctly") {
    // Tests long-running queries with dynamic configuration
    // Covers: Both runtime and configuration aspects
  }

  // Edge case handling
  test("handles edge cases with different cleanup modes") {
    // Tests various edge cases from both plans
    // Covers: Comprehensive edge case handling
  }

@karuppayya
Copy link
Contributor Author

@violetnspct the shuffle modes were added in #45930, this is an change to make it configurable for SQL quey execution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should access the conf from the passed in sparkSession. One idea is

    ...
    val specifiedShuffleCleanupMode: Option[ShuffleCleanupMode] = None) extends Logging {
  ...
  def shuffleCleanupMode = specifiedShuffleCleanupMode.getOrElse(
    determineShuffleCleanupMode(sparkSession.sessionState.conf))
}

@cloud-fan
Copy link
Contributor

The reason why the config applies to Spark Connect only: With classic Spark SQL, users can hold a DataFrame instance forever and we can never clean up the shuffle as the df instance still need to read it when being executed.

@karuppayya
Copy link
Contributor Author

Thanks @cloud-fan for the comments. I think it would be still be beneficial in the normal execution
For example in notebooks, the reference could remain active forever and delaying cleanup.
In suhc cases, making this configurable can cleanup shuffle, and providing for aggressive executor downscaling(This also prevents org.apache.spark.storage.FallbackStorage from having to copy shuffle blocks to remote whne not needed. )

Also, this would be an opt-in(per SQL) with default of org.apache.spark.sql.execution.DoNotCleanup. Users control when aggressive cleanup is appropriate, maintaining backward compatibility.

@cloud-fan
Copy link
Contributor

then at least we should have separate configs for classic and connect, as it's much safer to enable it for connect.

@karuppayya
Copy link
Contributor Author

@cloud-fan looks like the connect specific configs are prefixed with spark.sql.connect.* .
In this case the config is named spark.sql.shuffleDependency.fileCleanup.enabled(without the connect) though its specific to connect. Can you advise on how we could name this new config.

@cloud-fan
Copy link
Contributor

how about we add spark.sql.connect.* and spark.sql.classic.* configs to control the connect and classic behavior separately? We then deprecate spark.sql.shuffleDependency.fileCleanup.enabled, and let spark.sql.connect.* fallback to it for backward compatibility.

@karuppayya
Copy link
Contributor Author

@cloud-fan I have addressed the comments. Can you please take a look?

withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we leave a code comment to explain it?

Copy link
Contributor Author

@karuppayya karuppayya Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a comment, let me know if that sounds ok. I will will then have this conversation resolved

@karuppayya
Copy link
Contributor Author

karuppayya commented Aug 20, 2025

I reran that workflow that was failing and it passed in the 2nd attempt.
I think its good to be merged
edit: actually the PR got updated that all checks turned green after the retry passed

@karuppayya
Copy link
Contributor Author

@cloud-fan Can we have this merged, if you didn't have any other comments

@karuppayya
Copy link
Contributor Author

@cloud-fan Can you please help merge this

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 5337a57 Aug 27, 2025
.createWithDefault(Utils.isTesting)

val CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED =
buildConf("spark.sql.classic.shuffleDependency.fileCleanup.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya shall we also add configs for thriftserver? I think it's another entry point that we can safely enable file cleanup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 , thanks for bringing this up
I will have this handled in a follow up PR soon

val CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED =
buildConf("spark.sql.classic.shuffleDependency.fileCleanup.enabled")
.doc("When enabled, shuffle files will be cleaned up at the end of classic " +
"SQL executions.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also mention the caveats? The eager shuffle cleanup will trigger stage retry if users repeatedly execute the same dataframe instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have this added in this PR, if thats ok.

Copy link
Contributor

@cloud-fan cloud-fan Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please!

cloud-fan pushed a commit that referenced this pull request Nov 27, 2025
### What changes were proposed in this pull request?
We have the ability top clean up shuffle in `spark.sql.classic.shuffleDependency.fileCleanup.enabled`.
Honoring this in Thrift server and cleaning up shuffle.
Related PR comment [here](#51458 (comment))

### Why are the changes needed?
This is to bring the behavior in par with other modes of sql execution(classic, connect)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
NA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #52213 from karuppayya/SPARK-53469.

Authored-by: Karuppayya Rajendran <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants