Skip to content

[CH] Support WindowGroupLimit for row_number, rank and dense_rank #7087

@lgbo-ustc

Description

@lgbo-ustc

Description

Part of #6067.

For ds q44, the phyiscal plan is

CHNativeColumnarToRow
+- TakeOrderedAndProjectExecTransformer (limit=100, orderBy=[rnk#74 ASC NULLS FIRST], output=[rnk#74,best_performing#80,worst_performing#81])
   +- ^(16) ProjectExecTransformer [rnk#74, i_product_name#135 AS best_performing#80, i_product_name#180 AS worst_performing#81]
      +- ^(16) CHSortMergeJoinExecTransformer [item_sk#75L], [i_item_sk#159L], Inner, false
         :- ^(16) SortExecTransformer [item_sk#75L ASC NULLS FIRST], false, 0
         :  +- ^(16) InputIteratorTransformer[rnk#74, item_sk#75L, i_product_name#135]
         :     +- ColumnarExchange hashpartitioning(item_sk#75L, 5), ENSURE_REQUIREMENTS, [plan_id=1465], [shuffle_writer_type=hash], [OUTPUT] List(rnk:IntegerType, item_sk:LongType, i_product_name:StringType)
         :        +- ^(14) ProjectExecTransformer [rnk#74, item_sk#75L, i_product_name#135]
         :           +- ^(14) CHSortMergeJoinExecTransformer [item_sk#70L], [i_item_sk#114L], Inner, false
         :              :- ^(14) SortExecTransformer [item_sk#70L ASC NULLS FIRST], false, 0
         :              :  +- ^(14) InputIteratorTransformer[item_sk#70L, rnk#74, item_sk#75L]
         :              :     +- ColumnarExchange hashpartitioning(item_sk#70L, 5), ENSURE_REQUIREMENTS, [plan_id=1407], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rnk:IntegerType, item_sk:LongType)
         :              :        +- ^(12) ProjectExecTransformer [item_sk#70L, rnk#74, item_sk#75L]
         :              :           +- ^(12) CHSortMergeJoinExecTransformer [rnk#74], [rnk#79], Inner, false
         :              :              :- ^(12) SortExecTransformer [rnk#74 ASC NULLS FIRST], false, 0
         :              :              :  +- ^(12) ProjectExecTransformer [item_sk#70L, rnk#74]
         :              :              :     +- ^(12) FilterExecTransformer ((rnk#74 < 11) AND isnotnull(item_sk#70L))
         :              :              :        +- ^(12) WindowExecTransformer [rank(rank_col#71) windowspecdefinition(rank_col#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#74], [rank_col#71 ASC NULLS FIRST]
         :              :              :           +- ^(12) InputIteratorTransformer[item_sk#70L, rank_col#71]
         :              :              :              +- RowToCHNativeColumnar
         :              :              :                 +- WindowGroupLimit [rank_col#71 ASC NULLS FIRST], rank(rank_col#71), 10, Final
         :              :              :                    +- CHNativeColumnarToRow
         :              :              :                       +- ^(8) SortExecTransformer [rank_col#71 ASC NULLS FIRST], false, 0
         :              :              :                          +- ^(8) InputIteratorTransformer[item_sk#70L, rank_col#71]
         :              :              :                             +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=1210], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :              :              :                                +- RowToCHNativeColumnar
         :              :              :                                   +- WindowGroupLimit [rank_col#71 ASC NULLS FIRST], rank(rank_col#71), 10, Partial
         :              :              :                                      +- CHNativeColumnarToRow
         :              :              :                                         +- ^(7) SortExecTransformer [rank_col#71 ASC NULLS FIRST], false, 0
         :              :              :                                            +- ^(7) FilterExecTransformer (isnotnull(rank_col#71) AND (cast(rank_col#71 as decimal(13,7)) > (0.9 * Subquery scalar-subquery#73, [id=#294])))
         :              :              :                                               :  +- Subquery scalar-subquery#73, [id=#294]
         :              :              :                                               :     +- CHNativeColumnarToRow
         :              :              :                                               :        +- ^(3) ProjectExecTransformer [cast((avg(UnscaledValue(ss_net_profit#209))#185 / 100.0) as decimal(11,6)) AS rank_col#72]
         :              :              :                                               :           +- ^(3) HashAggregateTransformer(keys=[ss_store_sk#195L], functions=[avg(UnscaledValue(ss_net_profit#209))], isStreamingAgg=false)
         :              :              :                                               :              +- ^(3) InputIteratorTransformer[ss_store_sk#195L, sum#265, count#266L]
         :              :              :                                               :                 +- ColumnarExchange hashpartitioning(ss_store_sk#195L, 5), ENSURE_REQUIREMENTS, [plan_id=287], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_store_sk:LongType, sum:DoubleType, count:LongType)
         :              :              :                                               :                    +- ^(2) HashAggregateTransformer(keys=[ss_store_sk#195L], functions=[partial_avg(_pre_0#267L)], isStreamingAgg=false)
         :              :              :                                               :                       +- ^(2) ProjectExecTransformer [ss_store_sk#195L, ss_net_profit#209, UnscaledValue(ss_net_profit#209) AS _pre_0#267L]
         :              :              :                                               :                          +- ^(2) FilterExecTransformer ((isnotnull(ss_store_sk#195L) AND (ss_store_sk#195L = cast(2 as bigint))) AND isnull(ss_hdemo_sk#193L))
         :              :              :                                               :                             +- ^(2) NativeFileScan parquet spark_catalog.tpcds_pq100.store_sales[ss_hdemo_sk#193L,ss_store_sk#195L,ss_net_profit#209] Batched: true, DataFilters: [isnotnull(ss_store_sk#195L), isnull(ss_hdemo_sk#193L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), IsNull(ss_hdemo_sk)], ReadSchema: struct<ss_hdemo_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :              :              :                                               +- ^(7) ProjectExecTransformer [ss_item_sk#91L AS item_sk#70L, cast((avg(UnscaledValue(ss_net_profit#113))#183 / 100.0) as decimal(11,6)) AS rank_col#71]
         :              :              :                                                  +- ^(7) HashAggregateTransformer(keys=[ss_item_sk#91L], functions=[avg(UnscaledValue(ss_net_profit#113))], isStreamingAgg=false)
         :              :              :                                                     +- ^(7) InputIteratorTransformer[ss_item_sk#91L, sum#257, count#258L]
         :              :              :                                                        +- ColumnarExchange hashpartitioning(ss_item_sk#91L, 5), ENSURE_REQUIREMENTS, [plan_id=1199], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
         :              :              :                                                           +- ^(6) HashAggregateTransformer(keys=[ss_item_sk#91L], functions=[partial_avg(_pre_2#277L)], isStreamingAgg=false)
         :              :              :                                                              +- ^(6) ProjectExecTransformer [ss_item_sk#91L, ss_net_profit#113, UnscaledValue(ss_net_profit#113) AS _pre_2#277L]
         :              :              :                                                                 +- ^(6) FilterExecTransformer (isnotnull(ss_store_sk#99L) AND (ss_store_sk#99L = cast(2 as bigint)))
         :              :              :                                                                    +- ^(6) NativeFileScan parquet spark_catalog.tpcds_pq100.store_sales[ss_item_sk#91L,ss_store_sk#99L,ss_net_profit#113] Batched: true, DataFilters: [isnotnull(ss_store_sk#99L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk)], ReadSchema: struct<ss_item_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :              :              +- ^(12) SortExecTransformer [rnk#79 ASC NULLS FIRST], false, 0
         :              :                 +- ^(12) ProjectExecTransformer [item_sk#75L, rnk#79]
         :              :                    +- ^(12) FilterExecTransformer ((rnk#79 < 11) AND isnotnull(item_sk#75L))
         :              :                       +- ^(12) WindowExecTransformer [rank(rank_col#76) windowspecdefinition(rank_col#76 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#79], [rank_col#76 DESC NULLS LAST]
         :              :                          +- ^(12) InputIteratorTransformer[item_sk#75L, rank_col#76]
         :              :                             +- RowToCHNativeColumnar
         :              :                                +- WindowGroupLimit [rank_col#76 DESC NULLS LAST], rank(rank_col#76), 10, Final
         :              :                                   +- CHNativeColumnarToRow
         :              :                                      +- ^(11) SortExecTransformer [rank_col#76 DESC NULLS LAST], false, 0
         :              :                                         +- ^(11) InputIteratorTransformer[item_sk#75L, rank_col#76]
         :              :                                            +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=1374], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :              :                                               +- RowToCHNativeColumnar
         :              :                                                  +- WindowGroupLimit [rank_col#76 DESC NULLS LAST], rank(rank_col#76), 10, Partial
         :              :                                                     +- CHNativeColumnarToRow
         :              :                                                        +- ^(10) SortExecTransformer [rank_col#76 DESC NULLS LAST], false, 0
         :              :                                                           +- ^(10) FilterExecTransformer (isnotnull(rank_col#76) AND (cast(rank_col#76 as decimal(13,7)) > (0.9 * ReusedSubquery Subquery scalar-subquery#73, [id=#294])))
         :              :                                                              :  +- ReusedSubquery Subquery scalar-subquery#73, [id=#294]
         :              :                                                              +- ^(10) ProjectExecTransformer [ss_item_sk#136L AS item_sk#75L, cast((avg(UnscaledValue(ss_net_profit#158))#184 / 100.0) as decimal(11,6)) AS rank_col#76]
         :              :                                                                 +- ^(10) HashAggregateTransformer(keys=[ss_item_sk#136L], functions=[avg(UnscaledValue(ss_net_profit#158))], isStreamingAgg=false)
         :              :                                                                    +- ^(10) InputIteratorTransformer[ss_item_sk#136L, sum#261, count#262L]
         :              :                                                                       +- ReusedExchange [ss_item_sk#136L, sum#261, count#262L], ColumnarExchange hashpartitioning(ss_item_sk#91L, 5), ENSURE_REQUIREMENTS, [plan_id=1199], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
         :              +- ^(14) SortExecTransformer [i_item_sk#114L ASC NULLS FIRST], false, 0
         :                 +- ^(14) InputIteratorTransformer[i_item_sk#114L, i_product_name#135]
         :                    +- ColumnarExchange hashpartitioning(i_item_sk#114L, 5), ENSURE_REQUIREMENTS, [plan_id=1258], [shuffle_writer_type=hash], [OUTPUT] List(i_item_sk:LongType, i_product_name:StringType)
         :                       +- ^(13) FilterExecTransformer isnotnull(i_item_sk#114L)
         :                          +- ^(13) NativeFileScan parquet spark_catalog.tpcds_pq100.item[i_item_sk#114L,i_product_name#135] Batched: true, DataFilters: [isnotnull(i_item_sk#114L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_product_name:string>
         +- ^(16) SortExecTransformer [i_item_sk#159L ASC NULLS FIRST], false, 0
            +- ^(16) InputIteratorTransformer[i_item_sk#159L, i_product_name#180]
               +- ReusedExchange [i_item_sk#159L, i_product_name#180], ColumnarExchange hashpartitioning(i_item_sk#114L, 5), ENSURE_REQUIREMENTS, [plan_id=1258], [shuffle_writer_type=hash], [OUTPUT] List(i_item_sk:LongType, i_product_name:StringType)

q67 and q70 alse fallback on WindowGroupLimit

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions