-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53469][SQL] Ability to cleanup shuffle in Thrift server #52213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| @@ -496,7 +496,8 @@ class SparkSession private( | |||
| parsedPlan | |||
| } | |||
| } | |||
| Dataset.ofRows(self, plan, tracker) | |||
| Dataset.ofRows(self, plan, tracker, | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkExecuteStatementOperation's code eventually leads here.
|
cc: @cloud-fan |
|
@cloud-fan Can you help review this change |
|
@cloud-fan Can you please help review this chnage |
| @@ -205,6 +206,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp | |||
| sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5") | |||
| sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5") | |||
|
|
|||
| conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some comments to explain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
...hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
Outdated
Show resolved
Hide resolved
2c6849a to
2841ec3
Compare
| @@ -205,6 +206,9 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp | |||
| sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5") | |||
| sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5") | |||
|
|
|||
| // Tests depend on intermediate results that would otherwise be cleaned up when | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a red light to me. Runtime filter is a very powerful optimization and we should make sure the shuffle cleanup won't break it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent catch! Thanks @cloud-fan
I don't believe the root cause is with shuffle file cleanup itself, but rather with how Adaptive Query Execution handles subquery execution/synchronization.
- During codegen phase, FilterExec looks for subquery results(bloom filter) but doesn't find them (at times), so it skips using the Bloom filter optimization.
The lazy val gets populated based on the subquery execution result ie null if had not complete, a bloom filter otherwise. This is then later used in codegen
// The bloom filter created from `bloomFilterExpression`.
@transient private lazy val bloomFilter = {
val bytes = bloomFilterExpression.eval().asInstanceOf[Array[Byte]]
if (bytes == null) null else deserialize(bytes)
}
- The main query finishes execution while the subquery is still running in the background (separate execution context).
- As part of query completion, shuffle cleanup removes all shuffle files, including those needed by the still-running subquery(while subquery results are also no longer needed as main query has completed, this is a bug in that it doesn't use the bloom filters)
- Subquery execution (that had started earlier) fails with FetchFailedException trying to access cleaned-up shuffle data.
This suites verifies only the logical plan for the presence of BloomfilterAggregate and does not the verify if the code indeed used Bllom filter based filtering.
This can be easily reproduced by running this suite. (Its not consistent, and fails based on when the subquery completes. But I am sure atleast one test would fail and cause a ripple and fail subsequent tests since sc gets stopped)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added loggers to prove and verify that its a bug in this commit
Output
karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 6
karuppayyar: suite run 1 end
karuppayyar: suite run 2 start
karuppayyar: subquery started 25
karuppayyar: subquery ended 24
karuppayyar: query ended 25
karuppayyar: removing shuffle 8,9
karuppayyar: suite run 2 end
karuppayyar: suite run 3 start
17:32:07.521 ERROR org.apache.spark.storage.ShuffleBlockFetcherIterator: Failed to create input stream from local block
java.io.IOException: Error in reading FileSegmentManagedBuffer[file=/private/var/folders/tn/62m7jt2j2b7116x0q6wtzg0c0000gn/T/blockmgr-72dd6798-f43d-48a7-8d4c-0a9c44ba09a9/35/shuffle_8_38_0.data,offset=0,length=5195]
at org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:110)
Ideally it should looks like this(ie with adpative disabled) ie main query starts-> subqueries execute and completes-> main query starts executyion and completes
karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: subquery ended 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 7,8
karuppayyar: suite run 1 end
Every subquery should end before query ends.
You can see that subquery execution doesnot complete before the main query ends and therein not using the subquery result.
The side effect of removing shuffle is that when main query completes, it removes the shuffle of subquery(which has not completed and its result is no longer useful) and subquery execution fails with FetchFailure like above when it tries to run to completion. This helped surfacing the issue.
I am not sure if this is the case with all subqueries(looks like that), this could result in correctness issues cc: @dongjoon-hyun too.
@cloud-fan @dongjoon-hyun Do you thinks its a bug(in which case i can attempt a fix) or am i missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the subquery result is no longer needed, we can swallow any error from it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is an issue with Inject Runtime Filters and Adaptive.
The subquery should populate the bloom filter before the actual query runs.
But when adpative is enabled, the query doesnt wait for the subquery results which is the actual issue.
(This is not related to this PR itself, instead a completely different issue IMO. But this PR cannot be merged before the subquery issue is fixed )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't completely follow the conclusion. Spark local mode is not a testing mode as users can run Spark locally as a single node engine to do some real work. Can we fix this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. I'll open a fix later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opend the fix #52606. PTAL. @cloud-fan @karuppayya
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
687e70a to
23176ed
Compare
|
@cloud-fan can you please take a look. |
|
cc: @somani (long time!) since it touches the Runtime filter tests. |
|
just back from the holiday. @karuppayya can you take a look at #52213 (comment) ? |
…d task completion triggered by eager shuffle cleanup ### What changes were proposed in this pull request? This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`. ### Why are the changes needed? When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at #52213 (comment). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52606 from Ngone51/fix-local-shuffle-cleanup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…d task completion triggered by eager shuffle cleanup ### What changes were proposed in this pull request? This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`. ### Why are the changes needed? When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at #52213 (comment). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52606 from Ngone51/fix-local-shuffle-cleanup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 9a37c3d) Signed-off-by: Wenchen Fan <[email protected]>
fee6ad9 to
5db5af6
Compare
|
@Ngone51 @cloud-fan I rebased this PR with the fix . Please take a look |
|
@karuppayya can you fix the merge conflicts? |
5db5af6 to
dbed5ce
Compare
|
@cloud-fan I have rebased my code and fixed the conflicts. When you get a chance |
|
|
||
| setupTestData() | ||
|
|
||
| protected override def beforeAll(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can override sparkConf with super.sparkConf.set(CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) to have special config for this test suite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, do we have a concrete example about why this test suite can't clean up shuffle files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The emanates from the the folwoing method org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite#checkNumLocalShuffleReads
private def checkNumLocalShuffleReads(
plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
val numShuffles = collect(plan) {
case s: ShuffleQueryStageExec => s
}.length
val numLocalReads = collect(plan) {
case read: AQEShuffleReadExec if read.isLocalRead => read
}
numLocalReads.foreach { r =>
val rdd = r.execute()
val parts = rdd.partitions
assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
}
assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead))
}
Specifically, rdd.preferredLocations(_).nonEmpty) will be empty after the cleanup (aftercollect() executes).
When shuffle clean up is enabled, this will always be empty.
As for concrete example, almost all test in this suite use this method and fail from that assertion.
( its actually a race between when the shuffle cleanup happens and when this assertion executes)
scala.Predef.refArrayOps[org.apache.spark.Partition](parts).forall(((x$1: org.apache.spark.Partition) => rdd.preferredLocations(x$1).nonEmpty)) was false
ScalaTestFailureLocation: org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite at (AdaptiveQueryExecSuite.scala:221)
org.scalatest.exceptions.TestFailedException: scala.Predef.refArrayOps[org.apache.spark.Partition](parts).forall(((x$1: org.apache.spark.Partition) => rdd.preferredLocations(x$1).nonEmpty)) was false
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$checkNumLocalShuffleReads$1(AdaptiveQueryExecSuite.scala:221)
at scala.collection.immutable.List.foreach(List.scala:323)
at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.checkNumLocalShuffleReads(AdaptiveQueryExecSuite.scala:218)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can override sparkConf
I tried setting it on SparkConf, but doesn't seem to take effect.
I guess SparkConf is read when creating the SparkSession(and SQLConf), and setting on spark conf later is ineffective for sql execution(since it looks into SQLConf). Let me know if i am missing something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pointer. Fixed
(I was setting it inside beforeAll earlier)
...hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
Outdated
Show resolved
Hide resolved
|
@karuppayya After a second thought, I think we should add a new config for thriftserver to enable shuffle cleanup. Classic is special that DF reuse may likely happen, but thriftserver is like Spark Connect and can safely enable shuffle cleanup. |
I will add the new configuration. |
…d task completion triggered by eager shuffle cleanup ### What changes were proposed in this pull request? This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`. ### Why are the changes needed? When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at apache#52213 (comment). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52606 from Ngone51/fix-local-shuffle-cleanup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
|
Given Spark Connect JDBC is being added, I think thriftserver will be deprecated eventually. I'd prefer to add a new config for thriftsever, to keep the spark connect config name simpler. |
…d task completion triggered by eager shuffle cleanup ### What changes were proposed in this pull request? This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`. ### Why are the changes needed? When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at apache#52213 (comment). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52606 from Ngone51/fix-local-shuffle-cleanup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
2c2ce10 to
37ff7a1
Compare
962eb0f to
d92cb57
Compare
Remove comment
d92cb57 to
7e7d2e0
Compare
|
@cloud-fan Added a new config for thrift-server. Ready for review. |
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
|
Thanks @cloud-fan for the review. |
|
thanks, merging to master! |
What changes were proposed in this pull request?
We have the ability top clean up shuffle in
spark.sql.classic.shuffleDependency.fileCleanup.enabled.Honoring this in Thrift server and cleaning up shuffle.
Related PR comment here
Why are the changes needed?
This is to bring the behavior in par with other modes of sql execution(classic, connect)
Does this PR introduce any user-facing change?
No
How was this patch tested?
NA
Was this patch authored or co-authored using generative AI tooling?
No