Skip to content

Conversation

@karuppayya
Copy link
Contributor

@karuppayya karuppayya commented Sep 3, 2025

What changes were proposed in this pull request?

We have the ability top clean up shuffle in spark.sql.classic.shuffleDependency.fileCleanup.enabled.
Honoring this in Thrift server and cleaning up shuffle.
Related PR comment here

Why are the changes needed?

This is to bring the behavior in par with other modes of sql execution(classic, connect)

Does this PR introduce any user-facing change?

No

How was this patch tested?

NA

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Sep 3, 2025
@@ -496,7 +496,8 @@ class SparkSession private(
parsedPlan
}
}
Dataset.ofRows(self, plan, tracker)
Dataset.ofRows(self, plan, tracker,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkExecuteStatementOperation's code eventually leads here.

@karuppayya
Copy link
Contributor Author

cc: @cloud-fan

@HyukjinKwon HyukjinKwon changed the title [SPARK-53469] Ability to cleanup shuffle in Thrift server [SPARK-53469][SQL] Ability to cleanup shuffle in Thrift server Sep 3, 2025
@karuppayya
Copy link
Contributor Author

@cloud-fan Can you help review this change

@karuppayya
Copy link
Contributor Author

@cloud-fan Can you please help review this chnage

@@ -205,6 +206,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5")
sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5")

conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some comments to explain it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -205,6 +206,9 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5")
sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5")

// Tests depend on intermediate results that would otherwise be cleaned up when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a red light to me. Runtime filter is a very powerful optimization and we should make sure the shuffle cleanup won't break it.

Copy link
Contributor Author

@karuppayya karuppayya Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent catch! Thanks @cloud-fan

I don't believe the root cause is with shuffle file cleanup itself, but rather with how Adaptive Query Execution handles subquery execution/synchronization.

  1. During codegen phase, FilterExec looks for subquery results(bloom filter) but doesn't find them (at times), so it skips using the Bloom filter optimization.
    The lazy val gets populated based on the subquery execution result ie null if had not complete, a bloom filter otherwise. This is then later used in codegen
  // The bloom filter created from `bloomFilterExpression`.
  @transient private lazy val bloomFilter = {
    val bytes = bloomFilterExpression.eval().asInstanceOf[Array[Byte]]
    if (bytes == null) null else deserialize(bytes)
  }

  1. The main query finishes execution while the subquery is still running in the background (separate execution context).
  2. As part of query completion, shuffle cleanup removes all shuffle files, including those needed by the still-running subquery(while subquery results are also no longer needed as main query has completed, this is a bug in that it doesn't use the bloom filters)
  3. Subquery execution (that had started earlier) fails with FetchFailedException trying to access cleaned-up shuffle data.

This suites verifies only the logical plan for the presence of BloomfilterAggregate and does not the verify if the code indeed used Bllom filter based filtering.

This can be easily reproduced by running this suite. (Its not consistent, and fails based on when the subquery completes. But I am sure atleast one test would fail and cause a ripple and fail subsequent tests since sc gets stopped)

Copy link
Contributor Author

@karuppayya karuppayya Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added loggers to prove and verify that its a bug in this commit

Output

karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 6
karuppayyar: suite run 1 end

karuppayyar: suite run 2 start
karuppayyar: subquery started 25
karuppayyar: subquery ended 24
karuppayyar: query ended 25
karuppayyar: removing shuffle 8,9
karuppayyar: suite run 2 end

karuppayyar: suite run 3 start
17:32:07.521 ERROR org.apache.spark.storage.ShuffleBlockFetcherIterator: Failed to create input stream from local block
java.io.IOException: Error in reading FileSegmentManagedBuffer[file=/private/var/folders/tn/62m7jt2j2b7116x0q6wtzg0c0000gn/T/blockmgr-72dd6798-f43d-48a7-8d4c-0a9c44ba09a9/35/shuffle_8_38_0.data,offset=0,length=5195]
	at org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:110)

Ideally it should looks like this(ie with adpative disabled) ie main query starts-> subqueries execute and completes-> main query starts executyion and completes

karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: subquery ended 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 7,8
karuppayyar: suite run 1 end

Every subquery should end before query ends.
You can see that subquery execution doesnot complete before the main query ends and therein not using the subquery result.

The side effect of removing shuffle is that when main query completes, it removes the shuffle of subquery(which has not completed and its result is no longer useful) and subquery execution fails with FetchFailure like above when it tries to run to completion. This helped surfacing the issue.

I am not sure if this is the case with all subqueries(looks like that), this could result in correctness issues cc: @dongjoon-hyun too.

@cloud-fan @dongjoon-hyun Do you thinks its a bug(in which case i can attempt a fix) or am i missing something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the subquery result is no longer needed, we can swallow any error from it?

Copy link
Contributor Author

@karuppayya karuppayya Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an issue with Inject Runtime Filters and Adaptive.
The subquery should populate the bloom filter before the actual query runs.
But when adpative is enabled, the query doesnt wait for the subquery results which is the actual issue.
(This is not related to this PR itself, instead a completely different issue IMO. But this PR cannot be merged before the subquery issue is fixed )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't completely follow the conclusion. Spark local mode is not a testing mode as users can run Spark locally as a single node engine to do some real work. Can we fix this issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I'll open a fix later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opend the fix #52606. PTAL. @cloud-fan @karuppayya

Copy link
Contributor Author

@karuppayya karuppayya Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Ngone51. I did a pass of the PR.
I also verified the chnage withe InjectRuntimeFilterSuite and reverted my testdata chnages. (I will retrigger test once the changes are merged)

@karuppayya karuppayya force-pushed the SPARK-53469 branch 5 times, most recently from 687e70a to 23176ed Compare September 30, 2025 04:19
@karuppayya
Copy link
Contributor Author

@cloud-fan can you please take a look.

@karuppayya
Copy link
Contributor Author

cc: @somani (long time!) since it touches the Runtime filter tests.
tl; dr: Fixing the test data to return atlease one row to get all the tests run. Even without it queries run, but subquery runs behind the scenes, even after main query completes.

@cloud-fan
Copy link
Contributor

just back from the holiday. @karuppayya can you take a look at #52213 (comment) ?

cloud-fan added a commit that referenced this pull request Nov 11, 2025
…d task completion triggered by eager shuffle cleanup

### What changes were proposed in this pull request?

This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`.

### Why are the changes needed?

When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at #52213 (comment).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52606 from Ngone51/fix-local-shuffle-cleanup.

Lead-authored-by: Yi Wu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit that referenced this pull request Nov 11, 2025
…d task completion triggered by eager shuffle cleanup

### What changes were proposed in this pull request?

This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`.

### Why are the changes needed?

When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at #52213 (comment).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52606 from Ngone51/fix-local-shuffle-cleanup.

Lead-authored-by: Yi Wu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 9a37c3d)
Signed-off-by: Wenchen Fan <[email protected]>
@karuppayya
Copy link
Contributor Author

@Ngone51 @cloud-fan I rebased this PR with the fix . Please take a look

@cloud-fan
Copy link
Contributor

@karuppayya can you fix the merge conflicts?

@karuppayya
Copy link
Contributor Author

@cloud-fan I have rebased my code and fixed the conflicts. When you get a chance


setupTestData()

protected override def beforeAll(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can override sparkConf with super.sparkConf.set(CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) to have special config for this test suite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do we have a concrete example about why this test suite can't clean up shuffle files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The emanates from the the folwoing method org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite#checkNumLocalShuffleReads

  private def checkNumLocalShuffleReads(
      plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
    val numShuffles = collect(plan) {
      case s: ShuffleQueryStageExec => s
    }.length

    val numLocalReads = collect(plan) {
      case read: AQEShuffleReadExec if read.isLocalRead => read
    }
    numLocalReads.foreach { r =>
      val rdd = r.execute()
      val parts = rdd.partitions
      assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
    }
    assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead))
  }

Specifically, rdd.preferredLocations(_).nonEmpty) will be empty after the cleanup (aftercollect() executes).
When shuffle clean up is enabled, this will always be empty.

As for concrete example, almost all test in this suite use this method and fail from that assertion.
( its actually a race between when the shuffle cleanup happens and when this assertion executes)

scala.Predef.refArrayOps[org.apache.spark.Partition](parts).forall(((x$1: org.apache.spark.Partition) => rdd.preferredLocations(x$1).nonEmpty)) was false
ScalaTestFailureLocation: org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite at (AdaptiveQueryExecSuite.scala:221)
org.scalatest.exceptions.TestFailedException: scala.Predef.refArrayOps[org.apache.spark.Partition](parts).forall(((x$1: org.apache.spark.Partition) => rdd.preferredLocations(x$1).nonEmpty)) was false
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
	at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
	at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$checkNumLocalShuffleReads$1(AdaptiveQueryExecSuite.scala:221)
	at scala.collection.immutable.List.foreach(List.scala:323)
	at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.checkNumLocalShuffleReads(AdaptiveQueryExecSuite.scala:218)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can override sparkConf

I tried setting it on SparkConf, but doesn't seem to take effect.

I guess SparkConf is read when creating the SparkSession(and SQLConf), and setting on spark conf later is ineffective for sql execution(since it looks into SQLConf). Let me know if i am missing something

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer. Fixed
(I was setting it inside beforeAll earlier)

@karuppayya karuppayya requested a review from cloud-fan November 19, 2025 07:06
@cloud-fan
Copy link
Contributor

@karuppayya After a second thought, I think we should add a new config for thriftserver to enable shuffle cleanup. Classic is special that DF reuse may likely happen, but thriftserver is like Spark Connect and can safely enable shuffle cleanup.

@karuppayya
Copy link
Contributor Author

but thriftserver is like Spark Connect and can safely enable shuffle cleanup

I will add the new configuration.
With this, we'll have three configs covering native, Connect, and Thrift modes. (and another which was fallsback to Connect's)
Since both Spark Connect and Thriftserver share the same rationale for shuffle cleanup, and because an application cannot utilize both modes simultaneously, it seems logical to consolidate them into a single configuration?

zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
…d task completion triggered by eager shuffle cleanup

### What changes were proposed in this pull request?

This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`.

### Why are the changes needed?

When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at apache#52213 (comment).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52606 from Ngone51/fix-local-shuffle-cleanup.

Lead-authored-by: Yi Wu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan
Copy link
Contributor

Given Spark Connect JDBC is being added, I think thriftserver will be deprecated eventually. I'd prefer to add a new config for thriftsever, to keep the spark connect config name simpler.

huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…d task completion triggered by eager shuffle cleanup

### What changes were proposed in this pull request?

This PR proposes to explicitly handle the `SparkException` thrown by the shuffle statues operations on the non-existent shuffle ID to avoid crashing the `SparkContext`.

### Why are the changes needed?

When the main query completes, we cleanup its shuffle statuses and the data files. If there is subquery ongoing before it gets completely cancelled, the subquery can throw `SparkException` from `DAGScheduler` due to the operations (e.g., `MapOutputTrackerMaster.registerMapOutput()`) on the non-existent shuffle ID. And this unexpected exception can crash the `SparkContext`. See the detailed discussion at apache#52213 (comment).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52606 from Ngone51/fix-local-shuffle-cleanup.

Lead-authored-by: Yi Wu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@karuppayya karuppayya force-pushed the SPARK-53469 branch 3 times, most recently from 962eb0f to d92cb57 Compare November 26, 2025 01:42
Remove comment
@karuppayya
Copy link
Contributor Author

@cloud-fan Added a new config for thrift-server. Ready for review.

@karuppayya
Copy link
Contributor Author

Thanks @cloud-fan for the review.
Thanks @Ngone51 for the fixing the issues.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 8940dad Nov 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants