-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-52777][SQL] Enable shuffle cleanup mode configuration in Spark SQL #51458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@dongjoon-hyun @sunchao Can you of help review this change? |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make a CI happy please, @karuppayya ?
cc @peter-toth
The failure seems unrelated(and seems to run fine locally). But let me rebase master and retrigger the CI |
|
cc: @bozhang2820 and @cloud-fan since it was introduced with #45930 |
|
The test failure was from my chnage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of today, my chnage affects only tests which have Adaptive execution enabled. The following test failed in CI(specific to adaptive)
SPARK-35695: get observable metrics with adaptive execution by callback
But i have another PR, which does the shuffle cleanup for non-adaptive paths.
So the conf have to set at the top level and not specific to adaptive execution. And so added to BeforeAll
Why the test failed?
org.apache.spark.sql.util.DataFrameCallbackSuite#validateObservedMetrics does collect() on same df twice.
With shuffle cleanup(enabled by default with tests), it result in recomputation, resulting in double counting of metrics(using CollectMetricsExec) and assertions fail
(This is potentially an issue with org.apache.spark.sql.execution.CollectMetricsExec#collectedMetrics? The accumulator needs to be reset in such case, otherwise the metrics are going to be inconsistent based on expression used(for eg: sum). cc: @dongjoon-hyun for your thoughts, which i can take in a subequent PR if needed)
|
Should add unit tests to cover the following scenarios of handlePlan method in SparkConnectPlanExecution.scala? // Execution Paths
test("execution paths properly handle shuffle cleanup modes") {
// Tests different execution paths with cleanup modes
// Covers: SparkSession.execute scenarios
}
// Query Interruption
test("query interruption handles cleanup modes correctly") {
// Tests interruption behavior
// Covers: Query cancellation and cleanup
}
// Configuration
test("handlePlan should use correct shuffle cleanup mode from configuration") {
// Tests configuration combinations
// Covers: Configuration handling
}
// Consistency
test("shuffle cleanup mode should be consistent between Spark Connect and regular SQL") {
// Tests consistency across execution methods
// Covers: Cross-execution consistency
}
// Long-running queries with configuration changes
test("long running queries handle configuration changes correctly") {
// Tests long-running queries with dynamic configuration
// Covers: Both runtime and configuration aspects
}
// Edge case handling
test("handles edge cases with different cleanup modes") {
// Tests various edge cases from both plans
// Covers: Comprehensive edge case handling
} |
|
@violetnspct the shuffle modes were added in #45930, this is an change to make it configurable for SQL quey execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should access the conf from the passed in sparkSession. One idea is
...
val specifiedShuffleCleanupMode: Option[ShuffleCleanupMode] = None) extends Logging {
...
def shuffleCleanupMode = specifiedShuffleCleanupMode.getOrElse(
determineShuffleCleanupMode(sparkSession.sessionState.conf))
}
|
The reason why the config applies to Spark Connect only: With classic Spark SQL, users can hold a |
|
Thanks @cloud-fan for the comments. I think it would be still be beneficial in the normal execution Also, this would be an opt-in(per SQL) with default of |
|
then at least we should have separate configs for classic and connect, as it's much safer to enable it for connect. |
|
@cloud-fan looks like the connect specific configs are prefixed with |
|
how about we add |
|
@cloud-fan I have addressed the comments. Can you please take a look? |
ebaa89e to
f8ebc84
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
Outdated
Show resolved
Hide resolved
| withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", | ||
| SQLConf.SHUFFLE_PARTITIONS.key -> "5") { | ||
| SQLConf.SHUFFLE_PARTITIONS.key -> "5", | ||
| SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we leave a code comment to explain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a comment, let me know if that sounds ok. I will will then have this conversation resolved
|
I reran that workflow that was failing and it passed in the 2nd attempt. |
|
@cloud-fan Can we have this merged, if you didn't have any other comments |
|
@cloud-fan Can you please help merge this |
|
thanks, merging to master! |
| .createWithDefault(Utils.isTesting) | ||
|
|
||
| val CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED = | ||
| buildConf("spark.sql.classic.shuffleDependency.fileCleanup.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@karuppayya shall we also add configs for thriftserver? I think it's another entry point that we can safely enable file cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 , thanks for bringing this up
I will have this handled in a follow up PR soon
| val CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED = | ||
| buildConf("spark.sql.classic.shuffleDependency.fileCleanup.enabled") | ||
| .doc("When enabled, shuffle files will be cleaned up at the end of classic " + | ||
| "SQL executions.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also mention the caveats? The eager shuffle cleanup will trigger stage retry if users repeatedly execute the same dataframe instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will have this added in this PR, if thats ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes please!
### What changes were proposed in this pull request? We have the ability top clean up shuffle in `spark.sql.classic.shuffleDependency.fileCleanup.enabled`. Honoring this in Thrift server and cleaning up shuffle. Related PR comment [here](#51458 (comment)) ### Why are the changes needed? This is to bring the behavior in par with other modes of sql execution(classic, connect) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? NA ### Was this patch authored or co-authored using generative AI tooling? No Closes #52213 from karuppayya/SPARK-53469. Authored-by: Karuppayya Rajendran <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This change enables shuffle cleanup mode configuration in regular Spark SQL execution
Why are the changes needed?
Currently, shuffle cleanup mode configuration only works in Spark Connect but ignored in reguklar SQL execution
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added Unit tests
Was this patch authored or co-authored using generative AI tooling?
No