Skip to content

Commit 381e672

Browse files
committed
Fix
1 parent 4664a08 commit 381e672

File tree

1 file changed

+49
-53
lines changed

1 file changed

+49
-53
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 49 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -234,30 +234,30 @@ case class HashAggregateExec(
234234
val doAgg = ctx.freshName("doAggregateWithoutKey")
235235
val doAggFuncName = ctx.addNewFunction(doAgg,
236236
s"""
237-
| private void $doAgg() throws java.io.IOException {
238-
| // initialize aggregation buffer
239-
| $initBufVar
237+
|private void $doAgg() throws java.io.IOException {
238+
| // initialize aggregation buffer
239+
| $initBufVar
240240
|
241-
| ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
242-
| }
241+
| ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
242+
|}
243243
""".stripMargin)
244244

245245
val numOutput = metricTerm(ctx, "numOutputRows")
246246
val aggTime = metricTerm(ctx, "aggTime")
247247
val beforeAgg = ctx.freshName("beforeAgg")
248248
s"""
249-
| while (!$initAgg) {
250-
| $initAgg = true;
251-
| long $beforeAgg = System.nanoTime();
252-
| $doAggFuncName();
253-
| $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
249+
|while (!$initAgg) {
250+
| $initAgg = true;
251+
| long $beforeAgg = System.nanoTime();
252+
| $doAggFuncName();
253+
| $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
254254
|
255-
| // output the result
256-
| ${genResult.trim}
255+
| // output the result
256+
| ${genResult.trim}
257257
|
258-
| $numOutput.add(1);
259-
| ${consume(ctx, resultVars).trim}
260-
| }
258+
| $numOutput.add(1);
259+
| ${consume(ctx, resultVars).trim}
260+
|}
261261
""".stripMargin
262262
}
263263

@@ -581,12 +581,12 @@ case class HashAggregateExec(
581581
val evaluateNondeterministicResults =
582582
evaluateNondeterministicVariables(output, resultVars, resultExpressions)
583583
s"""
584-
$evaluateKeyVars
585-
$evaluateBufferVars
586-
$evaluateAggResults
587-
$evaluateNondeterministicResults
588-
${consume(ctx, resultVars)}
589-
"""
584+
|$evaluateKeyVars
585+
|$evaluateBufferVars
586+
|$evaluateAggResults
587+
|$evaluateNondeterministicResults
588+
|${consume(ctx, resultVars)}
589+
""".stripMargin
590590
} else if (modes.contains(Partial) || modes.contains(PartialMerge)) {
591591
// resultExpressions are Attributes of groupingExpressions and aggregateBufferAttributes.
592592
assert(resultExpressions.forall(_.isInstanceOf[Attribute]))
@@ -613,10 +613,10 @@ case class HashAggregateExec(
613613
resultExpressions,
614614
inputAttrs).map(_.genCode(ctx))
615615
s"""
616-
$evaluateKeyVars
617-
$evaluateResultBufferVars
618-
${consume(ctx, resultVars)}
619-
"""
616+
|$evaluateKeyVars
617+
|$evaluateResultBufferVars
618+
|${consume(ctx, resultVars)}
619+
""".stripMargin
620620
} else {
621621
// generate result based on grouping key
622622
ctx.INPUT_ROW = keyTerm
@@ -627,18 +627,18 @@ case class HashAggregateExec(
627627
val evaluateNondeterministicResults =
628628
evaluateNondeterministicVariables(output, resultVars, resultExpressions)
629629
s"""
630-
$evaluateNondeterministicResults
631-
${consume(ctx, resultVars)}
632-
"""
630+
|$evaluateNondeterministicResults
631+
|${consume(ctx, resultVars)}
632+
""".stripMargin
633633
}
634634
ctx.addNewFunction(funcName,
635635
s"""
636-
private void $funcName(UnsafeRow $keyTerm, UnsafeRow $bufferTerm)
637-
throws java.io.IOException {
638-
$numOutput.add(1);
639-
$body
640-
}
641-
""")
636+
|private void $funcName(UnsafeRow $keyTerm, UnsafeRow $bufferTerm)
637+
| throws java.io.IOException {
638+
| $numOutput.add(1);
639+
| $body
640+
|}
641+
""".stripMargin)
642642
}
643643

644644
/**
@@ -829,17 +829,16 @@ case class HashAggregateExec(
829829
val aggTime = metricTerm(ctx, "aggTime")
830830
val beforeAgg = ctx.freshName("beforeAgg")
831831
s"""
832-
if (!$initAgg) {
833-
$initAgg = true;
834-
long $beforeAgg = System.nanoTime();
835-
$doAggFuncName();
836-
$aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
837-
}
838-
839-
// output the result
840-
$outputFromFastHashMap
841-
$outputFromRegularHashMap
842-
"""
832+
|if (!$initAgg) {
833+
| $initAgg = true;
834+
| long $beforeAgg = System.nanoTime();
835+
| $doAggFuncName();
836+
| $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
837+
|}
838+
|// output the result
839+
|$outputFromFastHashMap
840+
|$outputFromRegularHashMap
841+
""".stripMargin
843842
}
844843

845844
private def doConsumeWithKeys(ctx: CodegenContext, input: Seq[ExprCode]): String = {
@@ -1098,14 +1097,11 @@ case class HashAggregateExec(
10981097
// continue to do in-memory aggregation and spilling until all the rows had been processed.
10991098
// Finally, sort the spilled aggregate buffers by key, and merge them together for same key.
11001099
s"""
1101-
$declareRowBuffer
1102-
1103-
$findOrInsertHashMap
1104-
1105-
$incCounter
1106-
1107-
$updateRowInHashMap
1108-
"""
1100+
|$declareRowBuffer
1101+
|$findOrInsertHashMap
1102+
|$incCounter
1103+
|$updateRowInHashMap
1104+
""".stripMargin
11091105
}
11101106

11111107
override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields)

0 commit comments

Comments
 (0)