-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53898][CORE] Fix race conditions between query cancellation and task completion triggered by eager shuffle cleanup #52606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
Outdated
Show resolved
Hide resolved
|
cc: @cloud-fan |
core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
Outdated
Show resolved
Hide resolved
d8f9e6b to
e4762cc
Compare
karuppayya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Ngone51 , lgtm
|
The test failures seem to be related. The tests pass when I revert to original |
|
The problem is that, in the case of mllib test failures for example, it uses Dataset.rdd to run Spark jobs. And the execution of Dataset.rdd is also protected by Before this PR: After this PR, And this^^^ is the issue is local mode. For non-local mode, it would always call To fix the issue, I think we should not apply shuffle cleanup when the execution trigger is RDD. WDYT? @cloud-fan @karuppayya |
e4762cc to
3fd9103
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
Outdated
Show resolved
Hide resolved
Shouldnt both go hand in hand?, ie shuffle removed shuffle status also cleared. Is there a case when shuffle status be useful when shuffle data is cleaned up? |
@karuppayya They are two different things. Shuffle statuses could refer to the shuffle data files on the remote executors. But shuffle data files on the local executor are generated by tasks which ever run onto it. |
5737ece to
00c4c7f
Compare
sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
Outdated
Show resolved
Hide resolved
|
@Ngone51 can you retrigger the CI jobs? |
|
Triggered the jobs. But I'm afraid there are still related failures. Let's see. |
|
the logs looks suspicious |
df42d98 to
50149cc
Compare
|
I just realize that we can not clean the shuffle files only while leaving the shuffle statuses uncleaned. Because for the case like dataframe queries, it is very common to reuse a dataframe accross the quries. After the dataframe is executed for the first time, its related shuffle files are all cleaned but the shuffle statuses still exists. So when the dataframe is reused to run the queries, it would mistakenly think the shuffle files still there given the existing shuffle statues but failed at runtime due to the missing shuffle files. I have pushed a new proposal, which tries to fail the query (usually the subquery) when the shuffle is no-longer registered. |
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
|
the last commit fixes indentation only, no need to wait for tests. Thanks, merging to master/4.1! |
…d task completion triggered by eager shuffle cleanup ### What changes were proposed in this pull request? This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`. ### Why are the changes needed? When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at #52213 (comment). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52606 from Ngone51/fix-local-shuffle-cleanup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 9a37c3d) Signed-off-by: Wenchen Fan <[email protected]>
|
Thanks for all the discussion and help! |
…d task completion triggered by eager shuffle cleanup ### What changes were proposed in this pull request? This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`. ### Why are the changes needed? When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at apache#52213 (comment). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52606 from Ngone51/fix-local-shuffle-cleanup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…d task completion triggered by eager shuffle cleanup ### What changes were proposed in this pull request? This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`. ### Why are the changes needed? When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at apache#52213 (comment). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52606 from Ngone51/fix-local-shuffle-cleanup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
| doOnReceive(event) | ||
| } catch { | ||
| case ex: ShuffleStatusNotFoundException => | ||
| dagScheduler.handleShuffleStatusNotFoundException(ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We really should not be handling this here - but at the appropriate call sites.
The existing throw new SparkException sort of mishandles it anyway - but as a pattern, this is not great : we will end up moving a lot of fallback catch clauses here otherwise.
What changes were proposed in this pull request?
This PR proposes to explicitly handle the
SparkExceptionthrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing theSparkContext.Why are the changes needed?
When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw
SparkExceptionfromDAGSchedulerdue to the operations (e.g.,MapOutputTrackerMaster.registerMapOutput()) on the non-existent shuffle ID. And this unexpected exception can crash theSparkContext. See the detailed discussion at #52213 (comment).Does this PR introduce any user-facing change?
No.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No.