-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-45386][SQL][3.5] Fix correctness issue with persist using StorageLevel.NONE on Dataset #43213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…evel.NONE on Dataset (apache#43188) * SPARK-45386: Fix correctness issue with StorageLevel.NONE * Move to CacheManager * Add comment
WeichenXu123
approved these changes
Oct 4, 2023
Contributor
WeichenXu123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
dongjoon-hyun
approved these changes
Oct 4, 2023
Member
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (Pending CIs). Thank you, @eejbyfeldt .
HyukjinKwon
approved these changes
Oct 5, 2023
Member
Member
|
Merged to branch-3.5. |
HyukjinKwon
pushed a commit
that referenced
this pull request
Oct 5, 2023
…ageLevel.NONE on Dataset ### What changes were proposed in this pull request? Support for InMememoryTableScanExec in AQE was added in #39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like: ``` import org.apache.spark.storage.StorageLevel spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count() ``` would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here: https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294 that the input is empty. The problem is that the action that should make sure the statistics are collected here https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291 never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics. The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances. ### Why are the changes needed? The current code has a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixes the correctness issue. ### How was this patch tested? New and existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43213 from eejbyfeldt/SPARK-45386-branch-3.5. Authored-by: Emil Ejbyfeldt <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
turboFei
pushed a commit
to turboFei/spark
that referenced
this pull request
Nov 6, 2025
…ageLevel.NONE on Dataset Support for InMememoryTableScanExec in AQE was added in apache#39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like: ``` import org.apache.spark.storage.StorageLevel spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count() ``` would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here: https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294 that the input is empty. The problem is that the action that should make sure the statistics are collected here https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291 never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics. The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances. The current code has a correctness issue. Yes, fixes the correctness issue. New and existing unit tests. No Closes apache#43213 from eejbyfeldt/SPARK-45386-branch-3.5. Authored-by: Emil Ejbyfeldt <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Support for InMememoryTableScanExec in AQE was added in #39624, but this patch contained a bug when a Dataset is persisted using
StorageLevel.NONE. Before that patch a query like:would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
Line 294 in eac5a8c
that the input is empty. The problem is that the action that should make sure the statistics are collected here
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Lines 285 to 291 in eac5a8c
never use the iterator and when we have
StorageLevel.NONEthe persisting will also not use the iterator and we will not gather the correct statistics.The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances.
Why are the changes needed?
The current code has a correctness issue.
Does this PR introduce any user-facing change?
Yes, fixes the correctness issue.
How was this patch tested?
New and existing unit tests.
Was this patch authored or co-authored using generative AI tooling?
No