Skip to content

Commit 550ff99

Browse files
committed
[SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer
1 parent 4b4ee26 commit 550ff99

File tree

4 files changed

+40
-6
lines changed

4 files changed

+40
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,9 @@ object EliminateSorts extends Rule[LogicalPlan] {
730730
case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) =>
731731
val newOrders = orders.filterNot(_.child.foldable)
732732
if (newOrders.isEmpty) child else s.copy(order = newOrders)
733+
case Sort(orders, true, child) if child.isSorted && child.sortedOrder.get.zip(orders).forall {
734+
case (s1, s2) => s1.satisfies(s2) } =>
735+
child
733736
}
734737
}
735738

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ abstract class LogicalPlan
219219
* Refreshes (or invalidates) any metadata/data cached in the plan recursively.
220220
*/
221221
def refresh(): Unit = children.foreach(_.refresh())
222+
223+
/**
224+
* If the current plan contains sorted data, it contains the sorted order.
225+
*/
226+
def sortedOrder: Option[Seq[SortOrder]] = None
227+
228+
final def isSorted: Boolean = sortedOrder.isDefined
222229
}
223230

224231
/**
@@ -274,3 +281,7 @@ abstract class BinaryNode extends LogicalPlan {
274281

275282
override final def children: Seq[LogicalPlan] = Seq(left, right)
276283
}
284+
285+
abstract class KeepOrderUnaryNode extends UnaryNode {
286+
override final def sortedOrder: Option[Seq[SortOrder]] = child.sortedOrder
287+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
4343
* This node is inserted at the top of a subquery when it is optimized. This makes sure we can
4444
* recognize a subquery as such, and it allows us to write subquery aware transformations.
4545
*/
46-
case class Subquery(child: LogicalPlan) extends UnaryNode {
46+
case class Subquery(child: LogicalPlan) extends KeepOrderUnaryNode {
4747
override def output: Seq[Attribute] = child.output
4848
}
4949

50-
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
50+
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
51+
extends KeepOrderUnaryNode {
5152
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
5253
override def maxRows: Option[Long] = child.maxRows
5354

@@ -125,7 +126,7 @@ case class Generate(
125126
}
126127

127128
case class Filter(condition: Expression, child: LogicalPlan)
128-
extends UnaryNode with PredicateHelper {
129+
extends KeepOrderUnaryNode with PredicateHelper {
129130
override def output: Seq[Attribute] = child.output
130131

131132
override def maxRows: Option[Long] = child.maxRows
@@ -469,6 +470,7 @@ case class Sort(
469470
child: LogicalPlan) extends UnaryNode {
470471
override def output: Seq[Attribute] = child.output
471472
override def maxRows: Option[Long] = child.maxRows
473+
override def sortedOrder: Option[Seq[SortOrder]] = Some(order)
472474
}
473475

474476
/** Factory for constructing new `Range` nodes. */
@@ -728,7 +730,7 @@ object Limit {
728730
*
729731
* See [[Limit]] for more information.
730732
*/
731-
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
733+
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrderUnaryNode {
732734
override def output: Seq[Attribute] = child.output
733735
override def maxRows: Option[Long] = {
734736
limitExpr match {
@@ -764,7 +766,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
764766
case class SubqueryAlias(
765767
alias: String,
766768
child: LogicalPlan)
767-
extends UnaryNode {
769+
extends KeepOrderUnaryNode {
768770

769771
override def doCanonicalize(): LogicalPlan = child.canonicalized
770772

@@ -867,6 +869,11 @@ case class RepartitionByExpression(
867869

868870
override def maxRows: Option[Long] = child.maxRows
869871
override def shuffle: Boolean = true
872+
873+
override def sortedOrder: Option[Seq[SortOrder]] = partitioning match {
874+
case RangePartitioning(sortedOrder, _) => Some(sortedOrder)
875+
case _ => None
876+
}
870877
}
871878

872879
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ class EliminateSortsSuite extends PlanTest {
3737
val batches =
3838
Batch("Eliminate Sorts", FixedPoint(10),
3939
FoldablePropagation,
40-
EliminateSorts) :: Nil
40+
EliminateSorts,
41+
CollapseProject) :: Nil
4142
}
4243

4344
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
@@ -83,4 +84,16 @@ class EliminateSortsSuite extends PlanTest {
8384

8485
comparePlans(optimized, correctAnswer)
8586
}
87+
88+
test("remove redundant order by") {
89+
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
90+
val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst)
91+
val optimized = Optimize.execute(analyzer.execute(unnecessaryReordered))
92+
val correctAnswer = analyzer.execute(orderedPlan.select('a))
93+
comparePlans(Optimize.execute(optimized), correctAnswer)
94+
val reorderedDifferently = orderedPlan.select('a).orderBy('a.asc, 'b.desc)
95+
val nonOptimized = Optimize.execute(analyzer.execute(reorderedDifferently))
96+
val correctAnswerNonOptimized = analyzer.execute(reorderedDifferently)
97+
comparePlans(nonOptimized, correctAnswerNonOptimized)
98+
}
8699
}

0 commit comments

Comments
 (0)