Skip to content

Commit 1f1f093

Browse files
committed
fix strip usage, fix complex ordering projection
1 parent 733ecb5 commit 1f1f093

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
4444
val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]()
4545
outputExpressions.reverse.foreach {
4646
case a @ Alias(child, _) =>
47-
val buffer = aliases.getOrElseUpdate(strip(child.canonicalized), mutable.ArrayBuffer.empty)
47+
val buffer = aliases.getOrElseUpdate(strip(child).canonicalized, mutable.ArrayBuffer.empty)
4848
if (buffer.size < aliasCandidateLimit) {
4949
buffer += a.toAttribute
5050
}
@@ -96,7 +96,11 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
9696

9797
override final def outputOrdering: Seq[SortOrder] = {
9898
if (hasAlias) {
99-
orderingExpressions.flatMap { sortOrder =>
99+
// Take the first `SortOrder`s only until they can be projected.
100+
// E.g. we have child ordering `Seq(SortOrder(a), SortOrder(b))` then
101+
// if only `a AS x` can be projected then we can return Seq(SortOrder(x))`
102+
// but if only `b AS y` can be projected we can't return `Seq(SortOrder(y))`.
103+
orderingExpressions.iterator.map { sortOrder =>
100104
val orderingSet = mutable.Set.empty[Expression]
101105
val sameOrderings = sortOrder.children.toStream
102106
.flatMap(projectExpression)
@@ -108,7 +112,7 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
108112
} else {
109113
None
110114
}
111-
}
115+
}.takeWhile(_.isDefined).flatten.toSeq
112116
} else {
113117
orderingExpressions
114118
}

sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,4 +159,22 @@ class ProjectedOrderingAndPartitioningSuite
159159
assert(outputOrdering.head.sameOrderExpressions.map(_.sql) ==
160160
Seq("aa", "(b + b)", "(a + b)", "(a + a)"))
161161
}
162+
163+
test("SPARK-42049: Improve AliasAwareOutputExpression - ordering partly projected") {
164+
val df = spark.range(2).orderBy($"id" + 1, $"id" + 2)
165+
166+
val df1 = df.selectExpr("id + 1 AS a", "id + 2 AS b")
167+
val outputOrdering1 = df1.queryExecution.optimizedPlan.outputOrdering
168+
assert(outputOrdering1.size == 2)
169+
assert(outputOrdering1.map(_.sql) == Seq("a ASC NULLS FIRST", "b ASC NULLS FIRST"))
170+
171+
val df2 = df.selectExpr("id + 1 AS a")
172+
val outputOrdering2 = df2.queryExecution.optimizedPlan.outputOrdering
173+
assert(outputOrdering2.size == 1)
174+
assert(outputOrdering2.head.sql == "a ASC NULLS FIRST")
175+
176+
val df3 = df.selectExpr("id + 2 AS b")
177+
val outputOrdering3 = df3.queryExecution.optimizedPlan.outputOrdering
178+
assert(outputOrdering3.size == 0)
179+
}
162180
}

0 commit comments

Comments
 (0)