Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Jan 17, 2023

What changes were proposed in this pull request?

This pr adds a new abstract ReusableQueryStageExec which is the parent of ShuffleQueryStageExec and BroadcastQueryStageExec.

Add a new query stage TableCacheQueryStageExec which is used to wrap InMemoryTableScanExec

The InMemoryTableScanExec has some issues in AQE:

  1. The first access to the cached plan is tricky. Currently, we can not preverse it's output partitioning and ordering. It's due to we planned the query with a un-materialized cached plan
  2. Miss updating plan info in AQE if the final stage incude InMemoryTableScan which breaks UI
  3. Miss propagate metrics in AQE since the InMemoryTableScanExec execution id does not map to current query execution
  4. The cached plan misses lots of optimization in AQE framework in AQEOptimizer. Ideally, we konw it's accurate statistics

Why are the changes needed?

fix the issues above

Does this PR introduce any user-facing change?

yes, fix the issues above

How was this patch tested?

add test and test manually

val df = spark.sql("select * from t1 join t2 on t1.c1 = t2.c1").cache
df.groupBy($"t1.c1").agg(max($"t2.c2")).collect

image

@maryannxue
Copy link
Contributor

maryannxue commented Jan 18, 2023

Can you list all the issues we currently have for InMemoryTableScanExec in AQE? We need to justify the need to expand the semantics of QueryStage here. And also what assumptions we might be breaking if we did so.

@maryannxue
Copy link
Contributor

maryannxue commented Jan 18, 2023

Another way to achieve this is instead of wrapping, we can have a common interface (e.g., Materializable, or BlockingDependency) for QueryStageExec and InMemoryTableScanExec so that other call sites that match QueryStageExec will not be affected. Also QueryStageExec currently strictly maps to a Spark Job/Stage in AQE, so we need to be careful here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to and cannot cancel(). Similarly, "reuse" and "getRuntimeStats" are meaningless for SQL cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • cancel; shall we keep it same with other querystage that if it's materialized then do nothing, otherwise cancel it ?
  • reuse; yeah it's meaningless
  • getRuntimeStats; if it's materialized then its statsitcs is accurate so we can mark it as isRuntime right ?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you reply to @maryannxue 's comment, @ulysses-you ?

@ulysses-you ulysses-you changed the title [SPARK-42101][SQL] Wrap InMemoryTableScanExec with QueryStage [SPARK-42101][SQL] Introduce Materializable and MaterializableQueryStage for AQE framework Jan 30, 2023
@ulysses-you
Copy link
Contributor Author

@maryannxue @dongjoon-hyun sorry for the so late response. I'm working on this pr now !

@ulysses-you
Copy link
Contributor Author

ulysses-you commented Jan 30, 2023

@maryannxue Materializablesounds good to me, I updated this pr with it. do you have time to take a another look ? thank you. also cc @cloud-fan @viirya @dongjoon-hyun

@ulysses-you ulysses-you force-pushed the cache-query-stage branch 2 times, most recently from 42e36c9 to b9bc6e7 Compare February 28, 2023 12:14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we consider relation.isMaterialized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the InMemoryTableScanExec is materialized, we should not run a new job again. Add the check is to avoid AQE framework call doMaterialize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the comment, !isSubquery is sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cached plan in AdaptiveSparkPlan is not a subquery but we can not update plan which would overwrite the whole query plan.

e.g. we can not update plan for query execution 0. Instead, we should update metrics for it.

          ...
           |
  AdaptiveSparkPlanExec (query execution 0, no execution id)
           |
  InMemoryTableScanExec
           |
          ...
           |
  AdaptiveSparkPlanExec (query execution 1, execution id 0)

@ulysses-you
Copy link
Contributor Author

@cloud-fan it seems the main concern is that, shall we make Materializable as first-class citizen in AQE framework ? if so, then all code place in AQE framework should use Materializable instead of QueryStage.

In fact, I'm not sure it is fine. There are a lot of code name depend on query stage. e.g., queryStagePreparationRules -> materializablePreparationRules. It actually the developer api

@ulysses-you ulysses-you force-pushed the cache-query-stage branch 2 times, most recently from 8fe51c0 to 1aa33b7 Compare March 2, 2023 03:50
}
}

case i: InMemoryTableScanExec =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: if the table cache is already materialized (second access of the cache), do we still need to wrap it with TableCacheQueryStage?

Copy link
Contributor Author

@ulysses-you ulysses-you Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableCacheQueryStage provides a base framework for runtime statistics, so I think wrap it should be more suitable for AQE framework. e.g., mark isRuntime = true in Statistics.


private lazy val shouldUpdatePlan: Boolean = {
// Only the root `AdaptiveSparkPlanExec` of the main query that triggers this query execution
// should update UI.
Copy link
Contributor

@cloud-fan cloud-fan Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// should update UI.
// need to do a final plan update for the UI.

batch
val cached = cb.mapPartitionsInternal { it =>
new Iterator[CachedBatch] {
TaskContext.get().addTaskCompletionListener[Unit](_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can register this listener before returning the wrapping iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, somehow I put the code here...

* This method is only used by AQE which executes the actually cached RDD that without filter and
* serialization of row/columnar.
*/
def executeCache(): RDD[CachedBatch] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseCacheRDD?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't execute anything and the current name is confusing.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks pretty good, only some nit comments.

*/
private def onUpdatePlan(executionId: Long, newSubPlans: Seq[SparkPlan]): Unit = {
if (isSubquery) {
if (!needFinalPlanUpdate) {
Copy link
Contributor

@cloud-fan cloud-fan Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I was wrong, the previous name is better. But we should update the comment here. It's not only for subquery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combined the comments to the definition of shouldUpdatePlan

// last UI update in `getFinalPhysicalPlan`, so we need to update UI here again to make sure
// the newly generated nodes of those subqueries are updated.
if (!isSubquery && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) {
if (shouldUpdatePlan && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldUpdatePlan is not required since we already checked it inside onUpdatePlan. The reason leave it here is to fast skip currentPhysicalPlan.exists(_.subqueries.nonEmpty)

// of the new plan nodes, so that it can track the valid accumulator updates later
// and display SQL metrics correctly.
// 2. If the `QueryExecution` does not match the current execution ID, it means the execution
// ID belongs to another (parent) query, and we should not call update UI in this query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mention that this can happen with table cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, added

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

cc @viirya , too

…e/AdaptiveSparkPlanExec.scala

Co-authored-by: Wenchen Fan <[email protected]>
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in b397832 Mar 13, 2023
@ulysses-you ulysses-you deleted the cache-query-stage branch March 13, 2023 04:13
cloud-fan added a commit that referenced this pull request Mar 13, 2023
### What changes were proposed in this pull request?

This is a followup of #39624 .

`TableCacheQueryStageExec.cancel` is a noop and we can move `def cancel` out from `QueryStageExec`. Due to this movement, I renamed `ReusableQueryStageExec` to `ExchangeQueryStageExec`

### Why are the changes needed?

type safe

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #40399 from cloud-fan/follow.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit that referenced this pull request Mar 24, 2023
…eterialized consistent

### What changes were proposed in this pull request?

This is a followup of #39624 . `QueryStageExec.isMeterialized` should only return true if `resultOption` is assigned. It can be a potential bug to have this inconsistency.

### Why are the changes needed?

fix potential bug

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #40522 from cloud-fan/follow.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Oct 5, 2023
…ageLevel.NONE on Dataset

### What changes were proposed in this pull request?
Support for InMememoryTableScanExec in AQE was added in #39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like:
```
import org.apache.spark.storage.StorageLevel
spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count()
```
would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here:
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294
that the input is empty. The problem is that the action that should make sure the statistics are collected here
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291
never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics.

The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances.

### Why are the changes needed?
The current code has a correctness issue.

### Does this PR introduce _any_ user-facing change?
Yes, fixes the correctness issue.

### How was this patch tested?
New and existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43213 from eejbyfeldt/SPARK-45386-branch-3.5.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…ageLevel.NONE on Dataset

Support for InMememoryTableScanExec in AQE was added in apache#39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like:
```
import org.apache.spark.storage.StorageLevel
spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count()
```
would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here:
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294
that the input is empty. The problem is that the action that should make sure the statistics are collected here
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291
never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics.

The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances.

The current code has a correctness issue.

Yes, fixes the correctness issue.

New and existing unit tests.

No

Closes apache#43213 from eejbyfeldt/SPARK-45386-branch-3.5.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants