Skip to content

Commit 99f8489

Browse files
yaooqinncloud-fan
authored andcommitted
[SPARK-34003][SQL][FOLLOWUP] Avoid pushing modified Char/Varchar sort attributes into aggregate for existing ones
### What changes were proposed in this pull request? In 0f8e5dd, we partially fix the rule conflicts between `PaddingAndLengthCheckForCharVarchar` and `ResolveAggregateFunctions`, as error still exists in sql like ```SELECT substr(v, 1, 2), sum(i) FROM t GROUP BY v ORDER BY substr(v, 1, 2)``` ```sql [info] Failed to analyze query: org.apache.spark.sql.AnalysisException: expression 'spark_catalog.default.t.`v`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.; [info] Project [substr(v, 1, 2)#100, sum(i)#101L] [info] +- Sort [aggOrder#102 ASC NULLS FIRST], true [info] +- !Aggregate [v#106], [substr(v#106, 1, 2) AS substr(v, 1, 2)#100, sum(cast(i#98 as bigint)) AS sum(i)#101L, substr(v#103, 1, 2) AS aggOrder#102 [info] +- SubqueryAlias spark_catalog.default.t [info] +- Project [if ((length(v#97) <= 3)) v#97 else if ((length(rtrim(v#97, None)) > 3)) cast(raise_error(concat(input string of length , cast(length(v#97) as string), exceeds varchar type length limitation: 3)) as string) else rpad(rtrim(v#97, None), 3, ) AS v#106, i#98] [info] +- Relation[v#97,i#98] parquet [info] [info] Project [substr(v, 1, 2)#100, sum(i)#101L] [info] +- Sort [aggOrder#102 ASC NULLS FIRST], true [info] +- !Aggregate [v#106], [substr(v#106, 1, 2) AS substr(v, 1, 2)#100, sum(cast(i#98 as bigint)) AS sum(i)#101L, substr(v#103, 1, 2) AS aggOrder#102 [info] +- SubqueryAlias spark_catalog.default.t [info] +- Project [if ((length(v#97) <= 3)) v#97 else if ((length(rtrim(v#97, None)) > 3)) cast(raise_error(concat(input string of length , cast(length(v#97) as string), exceeds varchar type length limitation: 3)) as string) else rpad(rtrim(v#97, None), 3, ) AS v#106, i#98] [info] +- Relation[v#97,i#98] parquet ``` We need to look recursively into children to find char/varchars. In this PR, we try to resolve the full attributes including the original `Aggregate` expressions and the candidates in `SortOrder` together, then use the new re-resolved `Aggregate` expressions to determine which candidate in the `SortOrder` shall be pushed. This can avoid mismatch for the same attributes w/o this change, as the expressions returned by `executeSameContext` will change when `PaddingAndLengthCheckForCharVarchar` takes effects. W/ this change, the expressions can be matched correctly. For those unmatched, w need to look recursively into children to find char/varchars instead of the expression itself only. ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add new tests Closes #31129 from yaooqinn/SPARK-34003-F. Authored-by: Kent Yao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 02a17e9 commit 99f8489

File tree

6 files changed

+59
-37
lines changed

6 files changed

+59
-37
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2381,13 +2381,16 @@ class Analyzer(override val catalogManager: CatalogManager)
23812381
val unresolvedSortOrders = sortOrder.filter { s =>
23822382
!s.resolved || !s.references.subsetOf(aggregate.outputSet) || containsAggregate(s)
23832383
}
2384-
val aliasedOrdering =
2385-
unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())
2386-
val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering)
2384+
val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())
2385+
2386+
val aggregateWithExtraOrdering = aggregate.copy(
2387+
aggregateExpressions = aggregate.aggregateExpressions ++ aliasedOrdering)
2388+
23872389
val resolvedAggregate: Aggregate =
2388-
executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate]
2389-
val resolvedAliasedOrdering: Seq[Alias] =
2390-
resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]]
2390+
executeSameContext(aggregateWithExtraOrdering).asInstanceOf[Aggregate]
2391+
2392+
val (reResolvedAggExprs, resolvedAliasedOrdering) =
2393+
resolvedAggregate.aggregateExpressions.splitAt(aggregate.aggregateExpressions.length)
23912394

23922395
// If we pass the analysis check, then the ordering expressions should only reference to
23932396
// aggregate expressions or grouping expressions, and it's safe to push them down to
@@ -2401,24 +2404,25 @@ class Analyzer(override val catalogManager: CatalogManager)
24012404
// expression instead.
24022405
val needsPushDown = ArrayBuffer.empty[NamedExpression]
24032406
val orderToAlias = unresolvedSortOrders.zip(aliasedOrdering)
2404-
val evaluatedOrderings = resolvedAliasedOrdering.zip(orderToAlias).map {
2405-
case (evaluated, (order, aliasOrder)) =>
2406-
val index = originalAggExprs.indexWhere {
2407-
case Alias(child, _) => child semanticEquals evaluated.child
2408-
case other => other semanticEquals evaluated.child
2409-
}
2407+
val evaluatedOrderings =
2408+
resolvedAliasedOrdering.asInstanceOf[Seq[Alias]].zip(orderToAlias).map {
2409+
case (evaluated, (order, aliasOrder)) =>
2410+
val index = reResolvedAggExprs.indexWhere {
2411+
case Alias(child, _) => child semanticEquals evaluated.child
2412+
case other => other semanticEquals evaluated.child
2413+
}
24102414

2411-
if (index == -1) {
2412-
if (CharVarcharUtils.getRawType(evaluated.metadata).nonEmpty) {
2413-
needsPushDown += aliasOrder
2414-
order.copy(child = aliasOrder)
2415+
if (index == -1) {
2416+
if (hasCharVarchar(evaluated)) {
2417+
needsPushDown += aliasOrder
2418+
order.copy(child = aliasOrder)
2419+
} else {
2420+
needsPushDown += evaluated
2421+
order.copy(child = evaluated.toAttribute)
2422+
}
24152423
} else {
2416-
needsPushDown += evaluated
2417-
order.copy(child = evaluated.toAttribute)
2424+
order.copy(child = originalAggExprs(index).toAttribute)
24182425
}
2419-
} else {
2420-
order.copy(child = originalAggExprs(index).toAttribute)
2421-
}
24222426
}
24232427

24242428
val sortOrdersMap = unresolvedSortOrders
@@ -2443,6 +2447,13 @@ class Analyzer(override val catalogManager: CatalogManager)
24432447
}
24442448
}
24452449

2450+
def hasCharVarchar(expr: Alias): Boolean = {
2451+
expr.find {
2452+
case ne: NamedExpression => CharVarcharUtils.getRawType(ne.metadata).nonEmpty
2453+
case _ => false
2454+
}.nonEmpty
2455+
}
2456+
24462457
def containsAggregate(condition: Expression): Boolean = {
24472458
condition.find(_.isInstanceOf[AggregateExpression]).isDefined
24482459
}

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/explain.txt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ Input [8]: [ws_sold_date_sk#1, ws_item_sk#2, ws_web_page_sk#3, ws_order_number#4
101101

102102
(10) Exchange
103103
Input [6]: [ws_sold_date_sk#1, ws_item_sk#2, ws_order_number#4, ws_quantity#5, ws_sales_price#6, ws_net_profit#7]
104-
Arguments: hashpartitioning(cast(ws_item_sk#2 as bigint), cast(ws_order_number#4 as bigint), 5), true, [id=#10]
104+
Arguments: hashpartitioning(cast(ws_item_sk#2 as bigint), cast(ws_order_number#4 as bigint), 5), ENSURE_REQUIREMENTS, [id=#10]
105105

106106
(11) Sort [codegen id : 3]
107107
Input [6]: [ws_sold_date_sk#1, ws_item_sk#2, ws_order_number#4, ws_quantity#5, ws_sales_price#6, ws_net_profit#7]
@@ -123,7 +123,7 @@ Condition : (((((isnotnull(wr_item_sk#11) AND isnotnull(wr_order_number#16)) AND
123123

124124
(15) Exchange
125125
Input [8]: [wr_item_sk#11, wr_refunded_cdemo_sk#12, wr_refunded_addr_sk#13, wr_returning_cdemo_sk#14, wr_reason_sk#15, wr_order_number#16, wr_fee#17, wr_refunded_cash#18]
126-
Arguments: hashpartitioning(wr_item_sk#11, wr_order_number#16, 5), true, [id=#19]
126+
Arguments: hashpartitioning(wr_item_sk#11, wr_order_number#16, 5), ENSURE_REQUIREMENTS, [id=#19]
127127

128128
(16) Sort [codegen id : 5]
129129
Input [8]: [wr_item_sk#11, wr_refunded_cdemo_sk#12, wr_refunded_addr_sk#13, wr_returning_cdemo_sk#14, wr_reason_sk#15, wr_order_number#16, wr_fee#17, wr_refunded_cash#18]
@@ -229,7 +229,7 @@ Input [11]: [ws_quantity#5, ws_sales_price#6, ws_net_profit#7, wr_refunded_cdemo
229229

230230
(39) Exchange
231231
Input [7]: [ws_quantity#5, ws_sales_price#6, wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, wr_fee#17, wr_refunded_cash#18, r_reason_desc#24]
232-
Arguments: hashpartitioning(wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, 5), true, [id=#30]
232+
Arguments: hashpartitioning(wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, 5), ENSURE_REQUIREMENTS, [id=#30]
233233

234234
(40) Sort [codegen id : 10]
235235
Input [7]: [ws_quantity#5, ws_sales_price#6, wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, wr_fee#17, wr_refunded_cash#18, r_reason_desc#24]
@@ -278,7 +278,7 @@ Input [6]: [cd_demo_sk#31, cd_marital_status#32, cd_education_status#33, cd_demo
278278

279279
(50) Exchange
280280
Input [4]: [cd_demo_sk#31, cd_marital_status#32, cd_education_status#33, cd_demo_sk#35]
281-
Arguments: hashpartitioning(cast(cd_demo_sk#31 as bigint), cast(cd_demo_sk#35 as bigint), 5), true, [id=#38]
281+
Arguments: hashpartitioning(cast(cd_demo_sk#31 as bigint), cast(cd_demo_sk#35 as bigint), 5), ENSURE_REQUIREMENTS, [id=#38]
282282

283283
(51) Sort [codegen id : 13]
284284
Input [4]: [cd_demo_sk#31, cd_marital_status#32, cd_education_status#33, cd_demo_sk#35]
@@ -302,16 +302,16 @@ Results [7]: [r_reason_desc#24, sum#45, count#46, sum#47, count#48, sum#49, coun
302302

303303
(55) Exchange
304304
Input [7]: [r_reason_desc#24, sum#45, count#46, sum#47, count#48, sum#49, count#50]
305-
Arguments: hashpartitioning(r_reason_desc#24, 5), true, [id=#51]
305+
Arguments: hashpartitioning(r_reason_desc#24, 5), ENSURE_REQUIREMENTS, [id=#51]
306306

307307
(56) HashAggregate [codegen id : 15]
308308
Input [7]: [r_reason_desc#24, sum#45, count#46, sum#47, count#48, sum#49, count#50]
309309
Keys [1]: [r_reason_desc#24]
310310
Functions [3]: [avg(cast(ws_quantity#5 as bigint)), avg(UnscaledValue(wr_refunded_cash#18)), avg(UnscaledValue(wr_fee#17))]
311311
Aggregate Attributes [3]: [avg(cast(ws_quantity#5 as bigint))#52, avg(UnscaledValue(wr_refunded_cash#18))#53, avg(UnscaledValue(wr_fee#17))#54]
312-
Results [5]: [substr(r_reason_desc#24, 1, 20) AS substr(r_reason_desc, 1, 20)#55, avg(cast(ws_quantity#5 as bigint))#52 AS avg(ws_quantity)#56, cast((avg(UnscaledValue(wr_refunded_cash#18))#53 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#57, cast((avg(UnscaledValue(wr_fee#17))#54 / 100.0) as decimal(11,6)) AS avg(wr_fee)#58, avg(cast(ws_quantity#5 as bigint))#52 AS aggOrder#59]
312+
Results [4]: [substr(r_reason_desc#24, 1, 20) AS substr(r_reason_desc, 1, 20)#55, avg(cast(ws_quantity#5 as bigint))#52 AS avg(ws_quantity)#56, cast((avg(UnscaledValue(wr_refunded_cash#18))#53 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#57, cast((avg(UnscaledValue(wr_fee#17))#54 / 100.0) as decimal(11,6)) AS avg(wr_fee)#58]
313313

314314
(57) TakeOrderedAndProject
315-
Input [5]: [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58, aggOrder#59]
316-
Arguments: 100, [substr(r_reason_desc, 1, 20)#55 ASC NULLS FIRST, aggOrder#59 ASC NULLS FIRST, avg(wr_refunded_cash)#57 ASC NULLS FIRST, avg(wr_fee)#58 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58]
315+
Input [4]: [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58]
316+
Arguments: 100, [substr(r_reason_desc, 1, 20)#55 ASC NULLS FIRST, avg(ws_quantity)#56 ASC NULLS FIRST, avg(wr_refunded_cash)#57 ASC NULLS FIRST, avg(wr_fee)#58 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58]
317317

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/simplified.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
TakeOrderedAndProject [substr(r_reason_desc, 1, 20),aggOrder,avg(wr_refunded_cash),avg(wr_fee),avg(ws_quantity)]
1+
TakeOrderedAndProject [substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee)]
22
WholeStageCodegen (15)
3-
HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),aggOrder,sum,count,sum,count,sum,count]
3+
HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),sum,count,sum,count,sum,count]
44
InputAdapter
55
Exchange [r_reason_desc] #1
66
WholeStageCodegen (14)

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/explain.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,16 +272,16 @@ Results [7]: [r_reason_desc#35, sum#43, count#44, sum#45, count#46, sum#47, coun
272272

273273
(49) Exchange
274274
Input [7]: [r_reason_desc#35, sum#43, count#44, sum#45, count#46, sum#47, count#48]
275-
Arguments: hashpartitioning(r_reason_desc#35, 5), true, [id=#49]
275+
Arguments: hashpartitioning(r_reason_desc#35, 5), ENSURE_REQUIREMENTS, [id=#49]
276276

277277
(50) HashAggregate [codegen id : 9]
278278
Input [7]: [r_reason_desc#35, sum#43, count#44, sum#45, count#46, sum#47, count#48]
279279
Keys [1]: [r_reason_desc#35]
280280
Functions [3]: [avg(cast(ws_quantity#5 as bigint)), avg(UnscaledValue(wr_refunded_cash#15)), avg(UnscaledValue(wr_fee#14))]
281281
Aggregate Attributes [3]: [avg(cast(ws_quantity#5 as bigint))#50, avg(UnscaledValue(wr_refunded_cash#15))#51, avg(UnscaledValue(wr_fee#14))#52]
282-
Results [5]: [substr(r_reason_desc#35, 1, 20) AS substr(r_reason_desc, 1, 20)#53, avg(cast(ws_quantity#5 as bigint))#50 AS avg(ws_quantity)#54, cast((avg(UnscaledValue(wr_refunded_cash#15))#51 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#55, cast((avg(UnscaledValue(wr_fee#14))#52 / 100.0) as decimal(11,6)) AS avg(wr_fee)#56, avg(cast(ws_quantity#5 as bigint))#50 AS aggOrder#57]
282+
Results [4]: [substr(r_reason_desc#35, 1, 20) AS substr(r_reason_desc, 1, 20)#53, avg(cast(ws_quantity#5 as bigint))#50 AS avg(ws_quantity)#54, cast((avg(UnscaledValue(wr_refunded_cash#15))#51 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#55, cast((avg(UnscaledValue(wr_fee#14))#52 / 100.0) as decimal(11,6)) AS avg(wr_fee)#56]
283283

284284
(51) TakeOrderedAndProject
285-
Input [5]: [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56, aggOrder#57]
286-
Arguments: 100, [substr(r_reason_desc, 1, 20)#53 ASC NULLS FIRST, aggOrder#57 ASC NULLS FIRST, avg(wr_refunded_cash)#55 ASC NULLS FIRST, avg(wr_fee)#56 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56]
285+
Input [4]: [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56]
286+
Arguments: 100, [substr(r_reason_desc, 1, 20)#53 ASC NULLS FIRST, avg(ws_quantity)#54 ASC NULLS FIRST, avg(wr_refunded_cash)#55 ASC NULLS FIRST, avg(wr_fee)#56 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56]
287287

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85/simplified.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
TakeOrderedAndProject [substr(r_reason_desc, 1, 20),aggOrder,avg(wr_refunded_cash),avg(wr_fee),avg(ws_quantity)]
1+
TakeOrderedAndProject [substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee)]
22
WholeStageCodegen (9)
3-
HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),aggOrder,sum,count,sum,count,sum,count]
3+
HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),sum,count,sum,count,sum,count]
44
InputAdapter
55
Exchange [r_reason_desc] #1
66
WholeStageCodegen (8)

sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,17 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
474474
checkAnswer(sql("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v"), Row("c", 1))
475475
}
476476
}
477+
478+
test("SPARK-34003: fix char/varchar fails w/ order by functions") {
479+
withTable("t") {
480+
sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format")
481+
sql("INSERT INTO t VALUES ('c', 1)")
482+
checkAnswer(sql("SELECT substr(v, 1, 2), sum(i) FROM t GROUP BY v ORDER BY substr(v, 1, 2)"),
483+
Row("c", 1))
484+
checkAnswer(sql("SELECT sum(i) FROM t GROUP BY v ORDER BY substr(v, 1, 2)"),
485+
Row(1))
486+
}
487+
}
477488
}
478489

479490
// Some basic char/varchar tests which doesn't rely on table implementation.

0 commit comments

Comments
 (0)