Skip to content

Conversation

@Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Oct 14, 2025

What changes were proposed in this pull request?

This PR proposes to explicitly handle the SparkException thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the SparkContext.

Why are the changes needed?

When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw SparkException from DAGScheduler due to the operations (e.g., MapOutputTrackerMaster.registerMapOutput()) on the non-existent shuffle ID. And this unexpected exception can crash the SparkContext. See the detailed discussion at #52213 (comment).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

No.

@karuppayya
Copy link
Contributor

cc: @cloud-fan

@Ngone51
Copy link
Member Author

Ngone51 commented Oct 15, 2025

cc @jiangxb1987 @bozhang2820

@Ngone51 Ngone51 force-pushed the fix-local-shuffle-cleanup branch from d8f9e6b to e4762cc Compare November 4, 2025 06:03
Copy link
Contributor

@karuppayya karuppayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Ngone51 , lgtm

@Ngone51
Copy link
Member Author

Ngone51 commented Nov 4, 2025

The test failures seem to be related. The tests pass when I revert to original mapOutputTracker.unregisterShuffle(shuffleId). I'm looking how the things are correlated.

@Ngone51
Copy link
Member Author

Ngone51 commented Nov 5, 2025

The problem is that, in the case of mllib test failures for example, it uses Dataset.rdd to run Spark jobs. And the execution of Dataset.rdd is also protected by SQLExecution.withNewExecutionId. Thus, it would also trigger shuffle cleanup after the rdd execution is done. However, the shuffle data is actually still needed by the reduce rdd. So when the reduce rdd starts running, the issue/error occurs. And the issue/error behaves differently before and after this PR.

Before this PR: MapOutputTrackerMaster.unregisterShuffle(shuffleId) is called, which clear the single-source-of-truth shuffle statues. And this leads to the rerun of the map rdd due to missing the shuffle statuses. Since the rerun is triggered within the reduce rdd's execution, so the map rdd's shuffle statuses won't be cleaned up at this time. Then, the reduce rdd is able to run successfully. And we'd run into this issue loop for the later on reduce rdds.

After this PR, MapOutputTrackerMaster.clearShuffleStatusCache(shuffleId) is called instead, and it is a No-Op. But the shuffle index/data files are still removed followed by shuffleManager.unregisterShuffle(shuffleId) (this is also invoked before this PR). Since the shuffle statues is not removed, the reduce rdd starts to run but failed to find the shuffle index/data files, results in the shuffle fetch failures.

And this^^^ is the issue is local mode.

For non-local mode, it would always call MapOutputTrackerMaster.unregisterShuffle(shuffleId) since driver would always registers a BlockManagerStorageEndpoint on itself as a part of BlockManager. So it behaves the same as 'Before this PR'.

To fix the issue, I think we should not apply shuffle cleanup when the execution trigger is RDD. WDYT? @cloud-fan @karuppayya

@Ngone51 Ngone51 force-pushed the fix-local-shuffle-cleanup branch from e4762cc to 3fd9103 Compare November 5, 2025 06:16
@karuppayya
Copy link
Contributor

Since the shuffle statues is not removed, the reduce rdd starts to run but failed to find the shuffle index/data files, results in the shuffle fetch failures.

Shouldnt both go hand in hand?, ie shuffle removed shuffle status also cleared. Is there a case when shuffle status be useful when shuffle data is cleaned up?

@Ngone51
Copy link
Member Author

Ngone51 commented Nov 6, 2025

Shouldnt both go hand in hand?, ie shuffle removed shuffle status also cleared. Is there a case when shuffle status be useful when shuffle data is cleaned up?

@karuppayya They are two different things. Shuffle statuses could refer to the shuffle data files on the remote executors. But shuffle data files on the local executor are generated by tasks which ever run onto it.

@Ngone51 Ngone51 force-pushed the fix-local-shuffle-cleanup branch from 5737ece to 00c4c7f Compare November 6, 2025 06:38
@cloud-fan
Copy link
Contributor

@Ngone51 can you retrigger the CI jobs?

@Ngone51
Copy link
Member Author

Ngone51 commented Nov 7, 2025

Triggered the jobs. But I'm afraid there are still related failures. Let's see.

@cloud-fan
Copy link
Contributor

the logs looks suspicious

02:31:08.457 ERROR org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec: Exception in cancelling query stage: BroadcastQueryStage 0
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false], input[1, smallint, true]),false), [plan_id=1067940]
   +- *(1) Project [t1a#349107, t1b#349108]
      +- *(1) LocalTableScan [t1a#349107, t1b#349108, t1c#349109, t1d#349110L, t1e#349111, t1f#349112, t1g#349113, t1h#349114, t1i#349115]

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.sql.SQLQueryTestSuite.beforeAll(SQLQueryTestSuite.scala:620)
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)

@Ngone51 Ngone51 force-pushed the fix-local-shuffle-cleanup branch from df42d98 to 50149cc Compare November 11, 2025 11:49
@github-actions github-actions bot removed the SQL label Nov 11, 2025
@Ngone51
Copy link
Member Author

Ngone51 commented Nov 11, 2025

I just realize that we can not clean the shuffle files only while leaving the shuffle statuses uncleaned. Because for the case like dataframe queries, it is very common to reuse a dataframe accross the quries. After the dataframe is executed for the first time, its related shuffle files are all cleaned but the shuffle statuses still exists. So when the dataframe is reused to run the queries, it would mistakenly think the shuffle files still there given the existing shuffle statues but failed at runtime due to the missing shuffle files.

I have pushed a new proposal, which tries to fail the query (usually the subquery) when the shuffle is no-longer registered.

@Ngone51 Ngone51 changed the title [SPARK-53898][CORE] Shuffle cleanup should not clean MapOutputTrackerMaster.shuffleStatuses in local cluster [SPARK-53898][CORE] Fix race conditions between query cancellation and task completion triggered by eager shuffle cleanup Nov 11, 2025
@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 11, 2025

the last commit fixes indentation only, no need to wait for tests. Thanks, merging to master/4.1!

@cloud-fan cloud-fan closed this in 9a37c3d Nov 11, 2025
cloud-fan added a commit that referenced this pull request Nov 11, 2025
…d task completion triggered by eager shuffle cleanup

### What changes were proposed in this pull request?

This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`.

### Why are the changes needed?

When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at #52213 (comment).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52606 from Ngone51/fix-local-shuffle-cleanup.

Lead-authored-by: Yi Wu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 9a37c3d)
Signed-off-by: Wenchen Fan <[email protected]>
@Ngone51
Copy link
Member Author

Ngone51 commented Nov 12, 2025

Thanks for all the discussion and help!

zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
…d task completion triggered by eager shuffle cleanup

### What changes were proposed in this pull request?

This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`.

### Why are the changes needed?

When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at apache#52213 (comment).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52606 from Ngone51/fix-local-shuffle-cleanup.

Lead-authored-by: Yi Wu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…d task completion triggered by eager shuffle cleanup

### What changes were proposed in this pull request?

This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`.

### Why are the changes needed?

When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at apache#52213 (comment).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52606 from Ngone51/fix-local-shuffle-cleanup.

Lead-authored-by: Yi Wu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
doOnReceive(event)
} catch {
case ex: ShuffleStatusNotFoundException =>
dagScheduler.handleShuffleStatusNotFoundException(ex)
Copy link
Contributor

@mridulm mridulm Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really should not be handling this here - but at the appropriate call sites.
The existing throw new SparkException sort of mishandles it anyway - but as a pattern, this is not great : we will end up moving a lot of fallback catch clauses here otherwise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants