Skip to content

Commit 6d08848

Browse files
address comments
1 parent bf6e45a commit 6d08848

File tree

2 files changed

+47
-27
lines changed

2 files changed

+47
-27
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,9 +1041,7 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
10411041
* Note that changes in the final output ordering may affect the file size (SPARK-32318).
10421042
* This rule handles the following cases:
10431043
* 1) if the sort order is empty or the sort order does not have any reference
1044-
* 2) if the Sort operator is a local sort and the child is already sorted, or
1045-
* the Sort operator is a global sort with the child being another global Sort operator or
1046-
* a Range operator that satisfies the parent sort orders.
1044+
* 2) if the Sort operator is a local sort and the child is already sorted
10471045
* 3) if there is another Sort operator separated by 0...n Project, Filter, Repartition or
10481046
* RepartitionByExpression (with deterministic expressions) operators
10491047
* 4) if the Sort operator is within Join separated by 0...n Project, Filter, Repartition or
@@ -1054,18 +1052,12 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
10541052
* function is order irrelevant
10551053
*/
10561054
object EliminateSorts extends Rule[LogicalPlan] {
1057-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1055+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
10581056
case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) =>
10591057
val newOrders = orders.filterNot(_.child.foldable)
10601058
if (newOrders.isEmpty) child else s.copy(order = newOrders)
1061-
case s @ Sort(orders, global, child)
1062-
if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
1063-
(global, child) match {
1064-
case (false, _) => child
1065-
case (true, r: Range) => r
1066-
case (true, s @ Sort(_, true, _)) => s
1067-
case (true, _) => s.copy(child = recursiveRemoveSort(child))
1068-
}
1059+
case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
1060+
child
10691061
case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
10701062
case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) =>
10711063
j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -97,20 +97,27 @@ class EliminateSortsSuite extends PlanTest {
9797
comparePlans(optimized, correctAnswer)
9898
}
9999

100-
test("remove redundant local sort") {
100+
test("SPARK-33183: remove redundant sortBy") {
101101
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
102102
val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst)
103103
val optimized = Optimize.execute(unnecessaryReordered.analyze)
104104
val correctAnswer = orderedPlan.limit(2).select('a).analyze
105-
comparePlans(Optimize.execute(optimized), correctAnswer)
105+
comparePlans(optimized, correctAnswer)
106+
}
107+
108+
test("SPARK-33183: remove all redundant local sorts") {
109+
val orderedPlan = testRelation.sortBy('a.asc).orderBy('a.asc).sortBy('a.asc)
110+
val optimized = Optimize.execute(orderedPlan.analyze)
111+
val correctAnswer = testRelation.orderBy('a.asc).analyze
112+
comparePlans(optimized, correctAnswer)
106113
}
107114

108-
test("should not remove global sort") {
115+
test("SPARK-33183: should not remove global sort") {
109116
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
110117
val reordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst)
111118
val optimized = Optimize.execute(reordered.analyze)
112119
val correctAnswer = reordered.analyze
113-
comparePlans(Optimize.execute(optimized), correctAnswer)
120+
comparePlans(optimized, correctAnswer)
114121
}
115122

116123
test("do not remove sort if the order is different") {
@@ -121,15 +128,15 @@ class EliminateSortsSuite extends PlanTest {
121128
comparePlans(optimized, correctAnswer)
122129
}
123130

124-
test("filters don't affect order for local sort") {
131+
test("SPARK-33183: filters should not affect order for local sort") {
125132
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
126133
val filteredAndReordered = orderedPlan.where('a > Literal(10)).sortBy('a.asc, 'b.desc)
127134
val optimized = Optimize.execute(filteredAndReordered.analyze)
128135
val correctAnswer = orderedPlan.where('a > Literal(10)).analyze
129136
comparePlans(optimized, correctAnswer)
130137
}
131138

132-
test("should keep global sort when child is a filter operator with the same ordering") {
139+
test("SPARK-33183: should not remove global sort with filter operators") {
133140
val projectPlan = testRelation.select('a, 'b)
134141
val orderedPlan = projectPlan.orderBy('a.asc, 'b.desc)
135142
val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc)
@@ -138,15 +145,15 @@ class EliminateSortsSuite extends PlanTest {
138145
comparePlans(optimized, correctAnswer)
139146
}
140147

141-
test("limits don't affect order for local sort") {
148+
test("SPARK-33183: limits should not affect order for local sort") {
142149
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
143150
val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy('a.asc, 'b.desc)
144151
val optimized = Optimize.execute(filteredAndReordered.analyze)
145152
val correctAnswer = orderedPlan.limit(Literal(10)).analyze
146153
comparePlans(optimized, correctAnswer)
147154
}
148155

149-
test("should keep global sort when child is a limit operator with the same ordering") {
156+
test("SPARK-33183: should not remove global sort with limit operators") {
150157
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
151158
val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc)
152159
val optimized = Optimize.execute(filteredAndReordered.analyze)
@@ -162,11 +169,11 @@ class EliminateSortsSuite extends PlanTest {
162169
comparePlans(optimized, correctAnswer)
163170
}
164171

165-
test("range is already sorted") {
172+
test("SPARK-33183: should not remove global sort with range operator") {
166173
val inputPlan = Range(1L, 1000L, 1, 10)
167174
val orderedPlan = inputPlan.orderBy('id.asc)
168175
val optimized = Optimize.execute(orderedPlan.analyze)
169-
val correctAnswer = inputPlan.analyze
176+
val correctAnswer = orderedPlan.analyze
170177
comparePlans(optimized, correctAnswer)
171178

172179
val reversedPlan = inputPlan.orderBy('id.desc)
@@ -177,10 +184,18 @@ class EliminateSortsSuite extends PlanTest {
177184
val negativeStepInputPlan = Range(10L, 1L, -1, 10)
178185
val negativeStepOrderedPlan = negativeStepInputPlan.orderBy('id.desc)
179186
val negativeStepOptimized = Optimize.execute(negativeStepOrderedPlan.analyze)
180-
val negativeStepCorrectAnswer = negativeStepInputPlan.analyze
187+
val negativeStepCorrectAnswer = negativeStepOrderedPlan.analyze
181188
comparePlans(negativeStepOptimized, negativeStepCorrectAnswer)
182189
}
183190

191+
test("SPARK-33183: remove local sort with range operator") {
192+
val inputPlan = Range(1L, 1000L, 1, 10)
193+
val orderedPlan = inputPlan.sortBy('id.asc)
194+
val optimized = Optimize.execute(orderedPlan.analyze)
195+
val correctAnswer = inputPlan.analyze
196+
comparePlans(optimized, correctAnswer)
197+
}
198+
184199
test("sort should not be removed when there is a node which doesn't guarantee any order") {
185200
val orderedPlan = testRelation.select('a, 'b)
186201
val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc)
@@ -357,18 +372,31 @@ class EliminateSortsSuite extends PlanTest {
357372
comparePlans(optimized, correctAnswer)
358373
}
359374

360-
test("remove two consecutive global sorts with same ordering") {
375+
test("SPARK-33183: remove consecutive global sorts with the same ordering") {
361376
Seq(
362377
(testRelation.orderBy('a.asc).orderBy('a.asc), testRelation.orderBy('a.asc)),
363-
(testRelation.orderBy('a.asc, 'b.desc).orderBy('a.asc),
364-
testRelation.orderBy('a.asc, 'b.desc))
378+
(testRelation.orderBy('a.asc, 'b.desc).orderBy('a.asc), testRelation.orderBy('a.asc))
365379
).foreach { case (ordered, answer) =>
366380
val optimized = Optimize.execute(ordered.analyze)
367381
comparePlans(optimized, answer.analyze)
368382
}
369383
}
370384

371-
test("should keep global sort when child is a local sort with the same ordering") {
385+
test("SPARK-33183: remove consecutive local sorts with the same ordering") {
386+
val orderedPlan = testRelation.sortBy('a.asc).sortBy('a.asc).sortBy('a.asc)
387+
val optimized = Optimize.execute(orderedPlan.analyze)
388+
val correctAnswer = testRelation.sortBy('a.asc).analyze
389+
comparePlans(optimized, correctAnswer)
390+
}
391+
392+
test("SPARK-33183: remove consecutive local sorts with different ordering") {
393+
val orderedPlan = testRelation.sortBy('b.asc).sortBy('a.desc).sortBy('a.asc)
394+
val optimized = Optimize.execute(orderedPlan.analyze)
395+
val correctAnswer = testRelation.sortBy('a.asc).analyze
396+
comparePlans(optimized, correctAnswer)
397+
}
398+
399+
test("SPARK-33183: should keep global sort when child is a local sort with the same ordering") {
372400
val correctAnswer = testRelation.orderBy('a.asc).analyze
373401
Seq(
374402
testRelation.sortBy('a.asc).orderBy('a.asc),

0 commit comments

Comments
 (0)