-
Notifications
You must be signed in to change notification settings - Fork 287
Description
Describe the bug
AQE could transform SortMergeJoin or ShuffledHashJoin to BroadcastHashJoin dynamically after discovering that one of the Exchange operator only shuffle writes small amount of data. However, this optimization does not always happen when using Comet.
TPC-H Q7 has an equi-join between supplier and lineitem. Spark could discover that supplier is small enough to be broadcasted after running the Exchange operator, and dynamically change the sort-merge-join to a broadcast hash join (see BroadcastHashJoin Inner BuildLeft (15)):
== Physical Plan ==
AdaptiveSparkPlan (99)
+- == Final Plan ==
* Sort (62)
+- AQEShuffleRead (61)
+- ShuffleQueryStage (60), Statistics(sizeInBytes=288.0 B, rowCount=4)
+- Exchange (59)
+- * HashAggregate (58)
+- AQEShuffleRead (57)
+- ShuffleQueryStage (56), Statistics(sizeInBytes=2.8 KiB, rowCount=36)
+- Exchange (55)
+- * HashAggregate (54)
+- * Project (53)
+- * BroadcastHashJoin Inner BuildRight (52)
:- * Project (49)
: +- * BroadcastHashJoin Inner BuildRight (48)
: :- * Project (42)
: : +- * SortMergeJoin Inner (41)
: : :- * Sort (33)
: : : +- AQEShuffleRead (32)
: : : +- ShuffleQueryStage (31), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7)
: : : +- Exchange (30)
: : : +- * Project (29)
: : : +- * SortMergeJoin Inner (28)
: : : :- * Sort (20)
: : : : +- AQEShuffleRead (19)
: : : : +- ShuffleQueryStage (18), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7)
: : : : +- Exchange (17)
: : : : +- * Project (16)
: : : : +- * BroadcastHashJoin Inner BuildLeft (15) <-- Transformed from SortMergeJoin by AQE
: : : : :- BroadcastQueryStage (8), Statistics(sizeInBytes=8.0 MiB, rowCount=8.00E+4)
: : : : : +- BroadcastExchange (7)
: : : : : +- AQEShuffleRead (6)
: : : : : +- ShuffleQueryStage (5), Statistics(sizeInBytes=1874.1 KiB, rowCount=8.00E+4)
: : : : : +- Exchange (4)
: : : : : +- * Filter (3)
: : : : : +- * ColumnarToRow (2)
: : : : : +- Scan parquet (1)
: : : : +- AQEShuffleRead (14)
: : : : +- ShuffleQueryStage (13), Statistics(sizeInBytes=8.1 GiB, rowCount=1.82E+8)
: : : : +- Exchange (12)
: : : : +- * Filter (11)
: : : : +- * ColumnarToRow (10)
: : : : +- Scan parquet (9)
: : : +- * Sort (27)
: : : +- AQEShuffleRead (26)
: : : +- ShuffleQueryStage (25), Statistics(sizeInBytes=3.4 GiB, rowCount=1.50E+8)
: : : +- Exchange (24)
: : : +- * Filter (23)
: : : +- * ColumnarToRow (22)
: : : +- Scan parquet (21)
: : +- * Sort (40)
: : +- AQEShuffleRead (39)
: : +- ShuffleQueryStage (38), Statistics(sizeInBytes=27.5 MiB, rowCount=1.20E+6)
: : +- Exchange (37)
: : +- * Filter (36)
: : +- * ColumnarToRow (35)
: : +- Scan parquet (34)
: +- BroadcastQueryStage (47), Statistics(sizeInBytes=1024.0 KiB, rowCount=2)
: +- BroadcastExchange (46)
: +- * Filter (45)
: +- * ColumnarToRow (44)
: +- Scan parquet (43)
+- BroadcastQueryStage (51), Statistics(sizeInBytes=1024.0 KiB, rowCount=2)
+- ReusedExchange (50)
The following query plan is generated for running TPC-H Q7 with Comet enabled, The CometSortMergeJoin was not transformed to CometBroadcastHashJoin:
== Physical Plan ==
AdaptiveSparkPlan (95)
+- == Final Plan ==
* CometColumnarToRow (58)
+- CometSort (57)
+- AQEShuffleRead (56)
+- ShuffleQueryStage (55), Statistics(sizeInBytes=288.0 B, rowCount=4)
+- CometColumnarExchange (54)
+- CometHashAggregate (53)
+- AQEShuffleRead (52)
+- ShuffleQueryStage (51), Statistics(sizeInBytes=8.5 KiB, rowCount=72)
+- CometExchange (50)
+- CometHashAggregate (49)
+- CometProject (48)
+- CometBroadcastHashJoin (47)
:- CometProject (44)
: +- CometBroadcastHashJoin (43)
: :- CometProject (38)
: : +- CometSortMergeJoin (37)
: : :- CometSort (30)
: : : +- AQEShuffleRead (29)
: : : +- ShuffleQueryStage (28), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7)
: : : +- CometExchange (27)
: : : +- CometProject (26)
: : : +- CometSortMergeJoin (25)
: : : :- CometSort (18)
: : : : +- AQEShuffleRead (17)
: : : : +- ShuffleQueryStage (16), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7)
: : : : +- CometExchange (15)
: : : : +- CometProject (14)
: : : : +- CometSortMergeJoin (13) <-- Not transformed to CometBroadcastHashJoin
: : : : :- CometSort (6)
: : : : : +- AQEShuffleRead (5)
: : : : : +- ShuffleQueryStage (4), Statistics(sizeInBytes=1281.0 KiB, rowCount=1.60E+5)
: : : : : +- CometExchange (3)
: : : : : +- CometFilter (2)
: : : : : +- CometScan parquet (1)
: : : : +- CometSort (12)
: : : : +- AQEShuffleRead (11)
: : : : +- ShuffleQueryStage (10), Statistics(sizeInBytes=8.9 GiB, rowCount=3.65E+8)
: : : : +- CometExchange (9)
: : : : +- CometFilter (8)
: : : : +- CometScan parquet (7)
: : : +- CometSort (24)
: : : +- AQEShuffleRead (23)
: : : +- ShuffleQueryStage (22), Statistics(sizeInBytes=2.2 GiB, rowCount=3.00E+8)
: : : +- CometExchange (21)
: : : +- CometFilter (20)
: : : +- CometScan parquet (19)
: : +- CometSort (36)
: : +- AQEShuffleRead (35)
: : +- ShuffleQueryStage (34), Statistics(sizeInBytes=18.7 MiB, rowCount=2.40E+6)
: : +- CometExchange (33)
: : +- CometFilter (32)
: : +- CometScan parquet (31)
: +- BroadcastQueryStage (42), Statistics(sizeInBytes=337.0 B, rowCount=2)
: +- CometBroadcastExchange (41)
: +- CometFilter (40)
: +- CometScan parquet (39)
+- BroadcastQueryStage (46), Statistics(sizeInBytes=337.0 B, rowCount=2)
+- ReusedExchange (45)
Steps to reproduce
Run TPC-H Q7 using TPC-H SF=100 dataset. The benchmarking code is in https://github.com/apache/datafusion-benchmarks/tree/main/tpch.
spark-submit \
--master local[8] \
--conf spark.driver.memory=3g \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=16g \
--conf spark.jars=$COMET_JAR \
--conf spark.driver.extraClassPath=$COMET_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
--conf spark.comet.exec.shuffle.fallbackToColumnar=true \
--conf spark.comet.exec.shuffle.compression.codec=lz4 \
--conf spark.comet.exec.replaceSortMergeJoin=false \
tpcbench.py \
--benchmark tpch \
--data /path/to/tpch/sf100_parquet \
--queries ../../tpch/queries \
--output tpc-results \
--iterations 3Expected behavior
The inner most CometSortMergeJoin gets transformed to CometBroadcastHashJoin.
Additional context
No response