-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-42101][SQL] Make AQE support InMemoryTableScanExec #39624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e2356d4 to
721e3e7
Compare
|
Can you list all the issues we currently have for |
|
Another way to achieve this is instead of wrapping, we can have a common interface (e.g., |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need to and cannot cancel(). Similarly, "reuse" and "getRuntimeStats" are meaningless for SQL cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- cancel; shall we keep it same with other querystage that if it's materialized then do nothing, otherwise cancel it ?
- reuse; yeah it's meaningless
- getRuntimeStats; if it's materialized then its statsitcs is accurate so we can mark it as
isRuntimeright ?
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you reply to @maryannxue 's comment, @ulysses-you ?
721e3e7 to
a64dac8
Compare
|
@maryannxue @dongjoon-hyun sorry for the so late response. I'm working on this pr now ! |
|
@maryannxue |
42e36c9 to
b9bc6e7
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/Materializable.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/Materializable.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/Materializable.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/Materializable.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/Materializable.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/Materializable.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/Materializable.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we consider relation.isMaterialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the InMemoryTableScanExec is materialized, we should not run a new job again. Add the check is to avoid AQE framework call doMaterialize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the comment, !isSubquery is sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A cached plan in AdaptiveSparkPlan is not a subquery but we can not update plan which would overwrite the whole query plan.
e.g. we can not update plan for query execution 0. Instead, we should update metrics for it.
...
|
AdaptiveSparkPlanExec (query execution 0, no execution id)
|
InMemoryTableScanExec
|
...
|
AdaptiveSparkPlanExec (query execution 1, execution id 0)
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
|
@cloud-fan it seems the main concern is that, shall we make In fact, I'm not sure it is fine. There are a lot of code name depend on query stage. e.g., |
8fe51c0 to
1aa33b7
Compare
| } | ||
| } | ||
|
|
||
| case i: InMemoryTableScanExec => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: if the table cache is already materialized (second access of the cache), do we still need to wrap it with TableCacheQueryStage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableCacheQueryStage provides a base framework for runtime statistics, so I think wrap it should be more suitable for AQE framework. e.g., mark isRuntime = true in Statistics.
|
|
||
| private lazy val shouldUpdatePlan: Boolean = { | ||
| // Only the root `AdaptiveSparkPlanExec` of the main query that triggers this query execution | ||
| // should update UI. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // should update UI. | |
| // need to do a final plan update for the UI. |
| batch | ||
| val cached = cb.mapPartitionsInternal { it => | ||
| new Iterator[CachedBatch] { | ||
| TaskContext.get().addTaskCompletionListener[Unit](_ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can register this listener before returning the wrapping iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, somehow I put the code here...
| * This method is only used by AQE which executes the actually cached RDD that without filter and | ||
| * serialization of row/columnar. | ||
| */ | ||
| def executeCache(): RDD[CachedBatch] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
baseCacheRDD?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't execute anything and the current name is confusing.
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks pretty good, only some nit comments.
| */ | ||
| private def onUpdatePlan(executionId: Long, newSubPlans: Seq[SparkPlan]): Unit = { | ||
| if (isSubquery) { | ||
| if (!needFinalPlanUpdate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I was wrong, the previous name is better. But we should update the comment here. It's not only for subquery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combined the comments to the definition of shouldUpdatePlan
| // last UI update in `getFinalPhysicalPlan`, so we need to update UI here again to make sure | ||
| // the newly generated nodes of those subqueries are updated. | ||
| if (!isSubquery && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) { | ||
| if (shouldUpdatePlan && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldUpdatePlan is not required since we already checked it inside onUpdatePlan. The reason leave it here is to fast skip currentPhysicalPlan.exists(_.subqueries.nonEmpty)
| // of the new plan nodes, so that it can track the valid accumulator updates later | ||
| // and display SQL metrics correctly. | ||
| // 2. If the `QueryExecution` does not match the current execution ID, it means the execution | ||
| // ID belongs to another (parent) query, and we should not call update UI in this query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we mention that this can happen with table cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, added
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
cc @viirya , too
…e/AdaptiveSparkPlanExec.scala Co-authored-by: Wenchen Fan <[email protected]>
|
thanks, merging to master! |
### What changes were proposed in this pull request? This is a followup of #39624 . `TableCacheQueryStageExec.cancel` is a noop and we can move `def cancel` out from `QueryStageExec`. Due to this movement, I renamed `ReusableQueryStageExec` to `ExchangeQueryStageExec` ### Why are the changes needed? type safe ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #40399 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…eterialized consistent ### What changes were proposed in this pull request? This is a followup of #39624 . `QueryStageExec.isMeterialized` should only return true if `resultOption` is assigned. It can be a potential bug to have this inconsistency. ### Why are the changes needed? fix potential bug ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #40522 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…ageLevel.NONE on Dataset ### What changes were proposed in this pull request? Support for InMememoryTableScanExec in AQE was added in #39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like: ``` import org.apache.spark.storage.StorageLevel spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count() ``` would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here: https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294 that the input is empty. The problem is that the action that should make sure the statistics are collected here https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291 never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics. The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances. ### Why are the changes needed? The current code has a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixes the correctness issue. ### How was this patch tested? New and existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43213 from eejbyfeldt/SPARK-45386-branch-3.5. Authored-by: Emil Ejbyfeldt <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…ageLevel.NONE on Dataset Support for InMememoryTableScanExec in AQE was added in apache#39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like: ``` import org.apache.spark.storage.StorageLevel spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count() ``` would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here: https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294 that the input is empty. The problem is that the action that should make sure the statistics are collected here https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291 never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics. The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances. The current code has a correctness issue. Yes, fixes the correctness issue. New and existing unit tests. No Closes apache#43213 from eejbyfeldt/SPARK-45386-branch-3.5. Authored-by: Emil Ejbyfeldt <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This pr adds a new abstract
ReusableQueryStageExecwhich is the parent ofShuffleQueryStageExecandBroadcastQueryStageExec.Add a new query stage
TableCacheQueryStageExecwhich is used to wrapInMemoryTableScanExecThe
InMemoryTableScanExechas some issues in AQE:Why are the changes needed?
fix the issues above
Does this PR introduce any user-facing change?
yes, fix the issues above
How was this patch tested?
add test and test manually