fix: Make AQE capable of converting Comet shuffled joins to Comet broadcast hash joins#1605
Merged
andygrove merged 6 commits intoapache:mainfrom Apr 4, 2025
Merged
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1605 +/- ##
============================================
+ Coverage 56.12% 58.53% +2.40%
- Complexity 976 1051 +75
============================================
Files 119 124 +5
Lines 11743 12559 +816
Branches 2251 2360 +109
============================================
+ Hits 6591 7351 +760
- Misses 4012 4024 +12
- Partials 1140 1184 +44 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
… capable of converting comet shuffled joins to comet broadcast hash joins
a75959e to
f73c750
Compare
andygrove
approved these changes
Apr 3, 2025
Member
andygrove
left a comment
There was a problem hiding this comment.
Changes LGTM. I tested with TPC-H and did not notice any significant performance difference. Thanks @Kontinuation.
coderfender
pushed a commit
to coderfender/datafusion-comet
that referenced
this pull request
Dec 13, 2025
…adcast hash joins (apache#1605) * Override outputPartitioning in CometBroadcastExchangeExec to make AQE capable of converting comet shuffled joins to comet broadcast hash joins * Support executeBroadcast for CometColumnarToRow * Add tests, revert my changes to how CometBroadcastExchange is displayed * Making newly added test fail before applying this fix * Remove unused imports * Fix test failure caused by spark conf pollution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Notice
This PR should be merged after #1606
Which issue does this PR close?
Closes #1589.
Rationale for this change
CometBroadcastExchangeExecdidn't implementoutputPartitioningmethod, this prevents CometBroadcastExchangeExec from being correctly generated in AQE optimization. This patch fixes this problem to make shuffled equi-joins being able to be optimized to CometBroadcastHashJoin by AQE.What changes are included in this PR?
This PR contains 2 fixes to make AQE broadcast join optimization work correctly for Comet:
outputPartitioningmethod ofCometBroadcastExchangeExec, also fixes other places that prevents the AQE optimization from happening.doExecuteBroadcastmethod ofCometColumnarToRowExec. The parent ofCometBroadcastExchangeExecmay change to SparkBroadcastHashJoinExecduring AQE optimization, this requires inserting aCometColumnarToRowExecaboveCometBroadcastExchangeExecto broadcast the data as rows instead of column batches.Example AQE plans:
How are these changes tested?