Skip to content

fix: Make AQE capable of converting Comet shuffled joins to Comet broadcast hash joins#1605

Merged
andygrove merged 6 commits intoapache:mainfrom
Kontinuation:fix-aqe-bhj
Apr 4, 2025
Merged

fix: Make AQE capable of converting Comet shuffled joins to Comet broadcast hash joins#1605
andygrove merged 6 commits intoapache:mainfrom
Kontinuation:fix-aqe-bhj

Conversation

@Kontinuation
Copy link
Member

@Kontinuation Kontinuation commented Apr 3, 2025

Notice

This PR should be merged after #1606

Which issue does this PR close?

Closes #1589.

Rationale for this change

CometBroadcastExchangeExec didn't implement outputPartitioning method, this prevents CometBroadcastExchangeExec from being correctly generated in AQE optimization. This patch fixes this problem to make shuffled equi-joins being able to be optimized to CometBroadcastHashJoin by AQE.

What changes are included in this PR?

This PR contains 2 fixes to make AQE broadcast join optimization work correctly for Comet:

  1. Implement outputPartitioning method of CometBroadcastExchangeExec, also fixes other places that prevents the AQE optimization from happening.
  2. Implement doExecuteBroadcast method of CometColumnarToRowExec. The parent of CometBroadcastExchangeExec may change to Spark BroadcastHashJoinExec during AQE optimization, this requires inserting a CometColumnarToRowExec above CometBroadcastExchangeExec to broadcast the data as rows instead of column batches.

Example AQE plans:

Query main PR
Q7 q7_aqe_main q7_aqe_pr

How are these changes tested?

  1. Added unit tests
  2. Tested using TPC-H SF=100 locally

@codecov-commenter
Copy link

codecov-commenter commented Apr 3, 2025

Codecov Report

Attention: Patch coverage is 52.38095% with 40 lines in your changes missing coverage. Please review.

Project coverage is 58.53%. Comparing base (f09f8af) to head (f73c750).
Report is 121 commits behind head on main.

Files with missing lines Patch % Lines
...pache/spark/sql/comet/CometColumnarToRowExec.scala 61.40% 18 Missing and 4 partials ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 28.57% 7 Missing and 3 partials ⚠️
.../scala/org/apache/spark/sql/comet/util/Utils.scala 0.00% 6 Missing ⚠️
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 0.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1605      +/-   ##
============================================
+ Coverage     56.12%   58.53%   +2.40%     
- Complexity      976     1051      +75     
============================================
  Files           119      124       +5     
  Lines         11743    12559     +816     
  Branches       2251     2360     +109     
============================================
+ Hits           6591     7351     +760     
- Misses         4012     4024      +12     
- Partials       1140     1184      +44     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Kontinuation Kontinuation marked this pull request as ready for review April 3, 2025 15:27
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM. I tested with TPC-H and did not notice any significant performance difference. Thanks @Kontinuation.

@andygrove andygrove merged commit 4639a24 into apache:main Apr 4, 2025
78 checks passed
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
…adcast hash joins (apache#1605)

* Override outputPartitioning in CometBroadcastExchangeExec to make AQE capable of converting comet shuffled joins to comet broadcast hash joins

* Support executeBroadcast for CometColumnarToRow

* Add tests, revert my changes to how CometBroadcastExchange is displayed

* Making newly added test fail before applying this fix

* Remove unused imports

* Fix test failure caused by spark conf pollution
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AQE Unable to Rewrite Joins as Broadcast Hash Joins Due to Existing CometBroadcastHashJoin Operator

3 participants