@@ -63,29 +63,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
6363 */
6464 object SpecialLimits extends Strategy {
6565 override def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
66- case logical.ReturnAnswer (rootPlan) => rootPlan match {
67- case logical.Limit (IntegerLiteral (limit), logical.Sort (order, true , child)) =>
68- execution.TakeOrderedAndProjectExec (limit, order, child.output, planLater(child)) :: Nil
69- case logical.Limit (
70- IntegerLiteral (limit),
71- logical.Project (projectList, logical.Sort (order, true , child))) =>
72- execution.TakeOrderedAndProjectExec (
73- limit, order, projectList, planLater(child)) :: Nil
74- case logical.Limit (IntegerLiteral (limit), child) =>
75- // Normally wrapping child with `LocalLimitExec` here is a no-op, because
76- // `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which
77- // calls `child.executeTake`. If child supports whole stage codegen, adding this
78- // `LocalLimitExec` can stop the processing of whole stage codegen and trigger the
79- // resource releasing work, after we consume `limit` rows.
80- execution.CollectLimitExec (limit, LocalLimitExec (limit, planLater(child))) :: Nil
66+ case ReturnAnswer (rootPlan) => rootPlan match {
67+ case Limit (IntegerLiteral (limit), Sort (order, true , child)) =>
68+ TakeOrderedAndProjectExec (limit, order, child.output, planLater(child)) :: Nil
69+ case Limit (IntegerLiteral (limit), Project (projectList, Sort (order, true , child))) =>
70+ TakeOrderedAndProjectExec (limit, order, projectList, planLater(child)) :: Nil
71+ case Limit (IntegerLiteral (limit), child) =>
72+ // With whole stage codegen, Spark releases resources only when all the output data of the
73+ // query plan are consumed. It's possible that `CollectLimitExec` only consumes a little
74+ // data from child plan and finishes the query without releasing resources. Here we wrap
75+ // the child plan with `LocalLimitExec`, to stop the processing of whole stage codegen and
76+ // trigger the resource releasing work, after we consume `limit` rows.
77+ CollectLimitExec (limit, LocalLimitExec (limit, planLater(child))) :: Nil
8178 case other => planLater(other) :: Nil
8279 }
83- case logical.Limit (IntegerLiteral (limit), logical.Sort (order, true , child)) =>
84- execution.TakeOrderedAndProjectExec (limit, order, child.output, planLater(child)) :: Nil
85- case logical.Limit (
86- IntegerLiteral (limit), logical.Project (projectList, logical.Sort (order, true , child))) =>
87- execution.TakeOrderedAndProjectExec (
88- limit, order, projectList, planLater(child)) :: Nil
80+ case Limit (IntegerLiteral (limit), Sort (order, true , child)) =>
81+ TakeOrderedAndProjectExec (limit, order, child.output, planLater(child)) :: Nil
82+ case Limit (IntegerLiteral (limit), Project (projectList, Sort (order, true , child))) =>
83+ TakeOrderedAndProjectExec (limit, order, projectList, planLater(child)) :: Nil
8984 case _ => Nil
9085 }
9186 }
0 commit comments