-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-43021][SQL] CoalesceBucketsInJoin not work when using AQE
#40688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
CoalesceBucketsInJoin not work when using AQECoalesceBucketsInJoin not work when using AQE
|
cc @imback82, @cloud-fan , @viirya , @sunchao |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
Outdated
Show resolved
Hide resolved
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need adding test case.
yeah , i will add UT later |
|
One more question , it time to make the default value of |
|
Maybe, no? If this is not working properly before, we cannot enable this configuration at Apache Spark 3.5.0. Since we need to wait for one release cycle, we may be able to do that at Apache Spark 3.6.0 if we want.
|
Yes, this is the more logical way. |
|
The CI build failure doesn't seem to be caused by this patch, can you take a look? |
|
Please rebase to the
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we put it in queryStageOptimizerRules?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rules in queryStageOptimizerRules are invoked less often which is more efficient. The rule CoalesceBucketsInJoin does not change plan partitioning and seems can be put in queryStageOptimizerRules
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my test , the UT run failed if CoalesceBucketsInJoin add in queryStageOptimizerRules .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we spend a bit of time understanding why? Then we can write a code comment to explain it and future developers won't try to move this rule to queryStageOptimizerRules ever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah , I will provide detailed information and supplement it .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because queryStageOptimizerRules is not applied at the beginning of the init plan. Instead, they are applied in the createQueryStages() method. And createQueryStages() is bottom-up, which causes the exchange to be eliminated to be wrapped in a layer of ShuffleQueryStage first, making CoalesceBucketsInJoin unrecognizable. And I have added these to the notes at the top. thanks @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CoalesceBucketsInJoin should before EnsureRequirements.
|
@cloud-fan @dongjoon-hyun @viirya Please merge to master . Thanks ~ |
|
To be clear, this PR didn't get any approval yet, @zzzzming95 .
|
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Show resolved
Hide resolved
| assert(scans.head.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) | ||
| } else { | ||
| assert(scans.isEmpty) | ||
| query: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the indentation is wrong now, can we restore to 4 spaces as before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original DisableAdaptiveExecution logic of this UT is removed here. The current implementation does both.
| query: String, | ||
| expectedNumShuffles: Int, | ||
| expectedCoalescedNumBuckets: Option[Int]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| query: String, | |
| expectedNumShuffles: Int, | |
| expectedCoalescedNumBuckets: Option[Int]): Unit = { | |
| query: String, | |
| expectedNumShuffles: Int, | |
| expectedCoalescedNumBuckets: Option[Int]): Unit = { |
|
thanks, merging to master! |
|
I was the OP of the issue in the jira. Thank you for the fix, but I discovered a weird behavior when hints applied and I don't know how to interpret it. Please check SPARK-43326 I filled |
Okay, I will follow up on this issue |
What changes were proposed in this pull request?
Add
CoalesceBucketsInJointo AQEpreprocessingRules.Why are the changes needed?
Previously optimized bucket join: 'CoalesceBucketsInJoin'` : #28123
But when using AQE ,
CoalesceBucketsInJoincan not match beacuse the top of the spark plan isAdaptiveSparkPlan.The code :
Before the PR
After the PR output:
Additional Notes:
We don't add CoalesceBucketsInJoin to
AdaptiveSparkPlanExec#queryStageOptimizerRulesbecause queryStageOptimizerRules is not applied at the beginning of the init plan. Instead, they are applied in the createQueryStages() method. And createQueryStages() is bottom-up, which causes the exchange to be eliminated to be wrapped in a layer of ShuffleQueryStage first, making CoalesceBucketsInJoin unrecognizable.Does this PR introduce any user-facing change?
No
How was this patch tested?
add UT