Skip to content

Commit 1694c9b

Browse files
committed
Fix the cases where operators set up its produce framework.
1 parent 4bef567 commit 1694c9b

File tree

4 files changed

+8
-2
lines changed

4 files changed

+8
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ case class SortExec(
177177
""".stripMargin.trim
178178
}
179179

180-
override protected def effectiveContinueStatement: String = "continue;"
180+
override protected def isConsumeInSeparateFunc: Boolean = false
181181

182182
protected override val shouldStopRequired = false
183183

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ trait CodegenSupport extends SparkPlan {
199199
* ... // logic of Op2Exec to consume rows.
200200
* }
201201
* For now, `isConsumeInSeparateFunc` of Op2Exec will be `true`.
202+
*
203+
* Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions
204+
* but begins with its produce framework. We should override `isConsumeInSeparateFunc` to return
205+
* `false`.
202206
*/
203207
protected def isConsumeInSeparateFunc: Boolean = {
204208
val codegenChildren = children.map(_.asInstanceOf[CodegenSupport])

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ case class HashAggregateExec(
163163
// The variables used as aggregation buffer. Only used for aggregation without keys.
164164
private var bufVars: Seq[ExprCode] = _
165165

166-
override protected def effectiveContinueStatement: String = "continue;"
166+
override protected def isConsumeInSeparateFunc: Boolean = false
167167

168168
private def doProduceWithoutKeys(ctx: CodegenContext): String = {
169169
val initAgg = ctx.freshName("initAgg")

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,8 @@ case class SortMergeJoinExec(
547547
}
548548
}
549549

550+
override protected def isConsumeInSeparateFunc: Boolean = false
551+
550552
override def doProduce(ctx: CodegenContext): String = {
551553
ctx.copyResult = true
552554
val leftInput = ctx.freshName("leftInput")

0 commit comments

Comments
 (0)