Skip to content

AQE Unable to Rewrite Joins as Broadcast Hash Joins Due to Existing CometBroadcastHashJoin Operator #1589

@Kontinuation

Description

@Kontinuation

Describe the bug

AQE could transform SortMergeJoin or ShuffledHashJoin to BroadcastHashJoin dynamically after discovering that one of the Exchange operator only shuffle writes small amount of data. However, this optimization does not always happen when using Comet.

TPC-H Q7 has an equi-join between supplier and lineitem. Spark could discover that supplier is small enough to be broadcasted after running the Exchange operator, and dynamically change the sort-merge-join to a broadcast hash join (see BroadcastHashJoin Inner BuildLeft (15)):

== Physical Plan ==
AdaptiveSparkPlan (99)
+- == Final Plan ==
   * Sort (62)
   +- AQEShuffleRead (61)
      +- ShuffleQueryStage (60), Statistics(sizeInBytes=288.0 B, rowCount=4)
         +- Exchange (59)
            +- * HashAggregate (58)
               +- AQEShuffleRead (57)
                  +- ShuffleQueryStage (56), Statistics(sizeInBytes=2.8 KiB, rowCount=36)
                     +- Exchange (55)
                        +- * HashAggregate (54)
                           +- * Project (53)
                              +- * BroadcastHashJoin Inner BuildRight (52)
                                 :- * Project (49)
                                 :  +- * BroadcastHashJoin Inner BuildRight (48)
                                 :     :- * Project (42)
                                 :     :  +- * SortMergeJoin Inner (41)
                                 :     :     :- * Sort (33)
                                 :     :     :  +- AQEShuffleRead (32)
                                 :     :     :     +- ShuffleQueryStage (31), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7)
                                 :     :     :        +- Exchange (30)
                                 :     :     :           +- * Project (29)
                                 :     :     :              +- * SortMergeJoin Inner (28)
                                 :     :     :                 :- * Sort (20)
                                 :     :     :                 :  +- AQEShuffleRead (19)
                                 :     :     :                 :     +- ShuffleQueryStage (18), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7)
                                 :     :     :                 :        +- Exchange (17)
                                 :     :     :                 :           +- * Project (16)
                                 :     :     :                 :              +- * BroadcastHashJoin Inner BuildLeft (15)  <-- Transformed from SortMergeJoin by AQE
                                 :     :     :                 :                 :- BroadcastQueryStage (8), Statistics(sizeInBytes=8.0 MiB, rowCount=8.00E+4)
                                 :     :     :                 :                 :  +- BroadcastExchange (7)
                                 :     :     :                 :                 :     +- AQEShuffleRead (6)
                                 :     :     :                 :                 :        +- ShuffleQueryStage (5), Statistics(sizeInBytes=1874.1 KiB, rowCount=8.00E+4)
                                 :     :     :                 :                 :           +- Exchange (4)
                                 :     :     :                 :                 :              +- * Filter (3)
                                 :     :     :                 :                 :                 +- * ColumnarToRow (2)
                                 :     :     :                 :                 :                    +- Scan parquet  (1)
                                 :     :     :                 :                 +- AQEShuffleRead (14)
                                 :     :     :                 :                    +- ShuffleQueryStage (13), Statistics(sizeInBytes=8.1 GiB, rowCount=1.82E+8)
                                 :     :     :                 :                       +- Exchange (12)
                                 :     :     :                 :                          +- * Filter (11)
                                 :     :     :                 :                             +- * ColumnarToRow (10)
                                 :     :     :                 :                                +- Scan parquet  (9)
                                 :     :     :                 +- * Sort (27)
                                 :     :     :                    +- AQEShuffleRead (26)
                                 :     :     :                       +- ShuffleQueryStage (25), Statistics(sizeInBytes=3.4 GiB, rowCount=1.50E+8)
                                 :     :     :                          +- Exchange (24)
                                 :     :     :                             +- * Filter (23)
                                 :     :     :                                +- * ColumnarToRow (22)
                                 :     :     :                                   +- Scan parquet  (21)
                                 :     :     +- * Sort (40)
                                 :     :        +- AQEShuffleRead (39)
                                 :     :           +- ShuffleQueryStage (38), Statistics(sizeInBytes=27.5 MiB, rowCount=1.20E+6)
                                 :     :              +- Exchange (37)
                                 :     :                 +- * Filter (36)
                                 :     :                    +- * ColumnarToRow (35)
                                 :     :                       +- Scan parquet  (34)
                                 :     +- BroadcastQueryStage (47), Statistics(sizeInBytes=1024.0 KiB, rowCount=2)
                                 :        +- BroadcastExchange (46)
                                 :           +- * Filter (45)
                                 :              +- * ColumnarToRow (44)
                                 :                 +- Scan parquet  (43)
                                 +- BroadcastQueryStage (51), Statistics(sizeInBytes=1024.0 KiB, rowCount=2)
                                    +- ReusedExchange (50)

The following query plan is generated for running TPC-H Q7 with Comet enabled, The CometSortMergeJoin was not transformed to CometBroadcastHashJoin:

== Physical Plan ==
AdaptiveSparkPlan (95)
+- == Final Plan ==
   * CometColumnarToRow (58)
   +- CometSort (57)
      +- AQEShuffleRead (56)
         +- ShuffleQueryStage (55), Statistics(sizeInBytes=288.0 B, rowCount=4)
            +- CometColumnarExchange (54)
               +- CometHashAggregate (53)
                  +- AQEShuffleRead (52)
                     +- ShuffleQueryStage (51), Statistics(sizeInBytes=8.5 KiB, rowCount=72)
                        +- CometExchange (50)
                           +- CometHashAggregate (49)
                              +- CometProject (48)
                                 +- CometBroadcastHashJoin (47)
                                    :- CometProject (44)
                                    :  +- CometBroadcastHashJoin (43)
                                    :     :- CometProject (38)
                                    :     :  +- CometSortMergeJoin (37)
                                    :     :     :- CometSort (30)
                                    :     :     :  +- AQEShuffleRead (29)
                                    :     :     :     +- ShuffleQueryStage (28), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7)
                                    :     :     :        +- CometExchange (27)
                                    :     :     :           +- CometProject (26)
                                    :     :     :              +- CometSortMergeJoin (25)
                                    :     :     :                 :- CometSort (18)
                                    :     :     :                 :  +- AQEShuffleRead (17)
                                    :     :     :                 :     +- ShuffleQueryStage (16), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7)
                                    :     :     :                 :        +- CometExchange (15)
                                    :     :     :                 :           +- CometProject (14)
                                    :     :     :                 :              +- CometSortMergeJoin (13)  <-- Not transformed to CometBroadcastHashJoin
                                    :     :     :                 :                 :- CometSort (6)
                                    :     :     :                 :                 :  +- AQEShuffleRead (5)
                                    :     :     :                 :                 :     +- ShuffleQueryStage (4), Statistics(sizeInBytes=1281.0 KiB, rowCount=1.60E+5)
                                    :     :     :                 :                 :        +- CometExchange (3)
                                    :     :     :                 :                 :           +- CometFilter (2)
                                    :     :     :                 :                 :              +- CometScan parquet  (1)
                                    :     :     :                 :                 +- CometSort (12)
                                    :     :     :                 :                    +- AQEShuffleRead (11)
                                    :     :     :                 :                       +- ShuffleQueryStage (10), Statistics(sizeInBytes=8.9 GiB, rowCount=3.65E+8)
                                    :     :     :                 :                          +- CometExchange (9)
                                    :     :     :                 :                             +- CometFilter (8)
                                    :     :     :                 :                                +- CometScan parquet  (7)
                                    :     :     :                 +- CometSort (24)
                                    :     :     :                    +- AQEShuffleRead (23)
                                    :     :     :                       +- ShuffleQueryStage (22), Statistics(sizeInBytes=2.2 GiB, rowCount=3.00E+8)
                                    :     :     :                          +- CometExchange (21)
                                    :     :     :                             +- CometFilter (20)
                                    :     :     :                                +- CometScan parquet  (19)
                                    :     :     +- CometSort (36)
                                    :     :        +- AQEShuffleRead (35)
                                    :     :           +- ShuffleQueryStage (34), Statistics(sizeInBytes=18.7 MiB, rowCount=2.40E+6)
                                    :     :              +- CometExchange (33)
                                    :     :                 +- CometFilter (32)
                                    :     :                    +- CometScan parquet  (31)
                                    :     +- BroadcastQueryStage (42), Statistics(sizeInBytes=337.0 B, rowCount=2)
                                    :        +- CometBroadcastExchange (41)
                                    :           +- CometFilter (40)
                                    :              +- CometScan parquet  (39)
                                    +- BroadcastQueryStage (46), Statistics(sizeInBytes=337.0 B, rowCount=2)
                                       +- ReusedExchange (45)

Steps to reproduce

Run TPC-H Q7 using TPC-H SF=100 dataset. The benchmarking code is in https://github.com/apache/datafusion-benchmarks/tree/main/tpch.

spark-submit \
  --master local[8] \
  --conf spark.driver.memory=3g \
  --conf spark.memory.offHeap.enabled=true \
  --conf spark.memory.offHeap.size=16g \
  --conf spark.jars=$COMET_JAR \
  --conf spark.driver.extraClassPath=$COMET_JAR \
  --conf spark.executor.extraClassPath=$COMET_JAR \
  --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
  --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
  --conf spark.comet.enabled=true \
  --conf spark.comet.exec.shuffle.enabled=true \
  --conf spark.comet.exec.shuffle.mode=auto \
  --conf spark.comet.exec.shuffle.fallbackToColumnar=true \
  --conf spark.comet.exec.shuffle.compression.codec=lz4 \
  --conf spark.comet.exec.replaceSortMergeJoin=false \
  tpcbench.py \
  --benchmark tpch \
  --data /path/to/tpch/sf100_parquet \
  --queries ../../tpch/queries \
  --output tpc-results \
  --iterations 3

Expected behavior

The inner most CometSortMergeJoin gets transformed to CometBroadcastHashJoin.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions