Skip to content

Commit d5a0fbe

Browse files
committed
add fix for sorting also
1 parent 9c3f15f commit d5a0fbe

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ trait AliasAwareOutputExpression extends UnaryExecNode {
3131

3232
protected def hasAlias: Boolean = aliasMap.nonEmpty
3333

34-
protected def replaceAlias(attr: AttributeReference): Option[Attribute] = {
35-
aliasMap.get(attr)
34+
protected def normalizeExpression(exp: Expression): Expression = {
35+
exp.transform {
36+
case attr: AttributeReference => aliasMap.get(attr).getOrElse(attr)
37+
}
3638
}
3739
}
3840

@@ -45,10 +47,7 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
4547
if (hasAlias) {
4648
child.outputPartitioning match {
4749
case e: Expression =>
48-
val normalizedExpr = e.transform {
49-
case attr: AttributeReference => replaceAlias(attr).getOrElse(attr)
50-
}
51-
normalizedExpr.asInstanceOf[Partitioning]
50+
normalizeExpression(e).asInstanceOf[Partitioning]
5251
case other => other
5352
}
5453
} else {
@@ -66,11 +65,8 @@ trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {
6665

6766
final override def outputOrdering: Seq[SortOrder] = {
6867
if (hasAlias) {
69-
orderingExpressions.map { s =>
70-
s.child match {
71-
case a: AttributeReference => s.copy(child = replaceAlias(a).getOrElse(a))
72-
case _ => s
73-
}
68+
orderingExpressions.map { sortOrder =>
69+
normalizeExpression(sortOrder).asInstanceOf[SortOrder]
7470
}
7571
} else {
7672
orderingExpressions

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -953,6 +953,31 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
953953
}
954954
}
955955

956+
test("SPARK-33399: aliased should be handled properly " +
957+
"for partitioning and sortorder involving complex expressions") {
958+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
959+
withTempView("t1", "t2", "t3") {
960+
spark.range(10).createTempView("t1")
961+
spark.range(20).createTempView("t2")
962+
spark.range(30).createTempView("t3")
963+
val planned = sql(
964+
"""
965+
|SELECT t3.id as t3id
966+
|FROM (
967+
| SELECT t1.id as t1id
968+
| FROM t1, t2
969+
| WHERE t1.id % 10 = t2.id % 10
970+
|) t12, t3
971+
|WHERE t1id % 10 = t3.id % 10
972+
""".stripMargin).queryExecution.executedPlan
973+
val sortNodes = planned.collect { case s: SortExec => s }
974+
assert(sortNodes.size == 3)
975+
val exchangeNodes = planned.collect { case e: ShuffleExchangeExec => e }
976+
assert(exchangeNodes.size == 3)
977+
}
978+
}
979+
}
980+
956981
test("SPARK-33399: alias handling should happen properly for SinglePartition") {
957982
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
958983
val df = spark.range(1, 100, 1, 1)

0 commit comments

Comments
 (0)