Skip to content

Commit 23acc3f

Browse files
committed
review comments #2
1 parent e714a08 commit 23acc3f

File tree

2 files changed

+19
-14
lines changed

2 files changed

+19
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,18 +718,21 @@ object SQLConf {
718718

719719
val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
720720
buildConf("spark.sql.windowExec.buffer.spill.threshold")
721+
.internal()
721722
.doc("Threshold for number of rows buffered in window operator")
722723
.intConf
723724
.createWithDefault(4096)
724725

725726
val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
726727
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
728+
.internal()
727729
.doc("Threshold for number of rows buffered in sort merge join operator")
728730
.intConf
729731
.createWithDefault(Int.MaxValue)
730732

731733
val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
732734
buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
735+
.internal()
733736
.doc("Threshold for number of rows buffered in cartesian product operator")
734737
.intConf
735738
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)

sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ private[window] abstract class WindowFunctionFrame {
4444
def write(index: Int, current: InternalRow): Unit
4545
}
4646

47+
object WindowFunctionFrame {
48+
def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = {
49+
if (iterator.hasNext) iterator.next() else null
50+
}
51+
}
52+
4753
/**
4854
* The offset window frame calculates frames containing LEAD/LAG statements.
4955
*
@@ -123,7 +129,7 @@ private[window] final class OffsetWindowFunctionFrame(
123129

124130
override def write(index: Int, current: InternalRow): Unit = {
125131
if (inputIndex >= 0 && inputIndex < input.length) {
126-
val r = if (inputIterator.hasNext) inputIterator.next() else null
132+
val r = WindowFunctionFrame.getNextOrNull(inputIterator)
127133
projection(r)
128134
} else {
129135
// Use default values since the offset row does not exist.
@@ -179,9 +185,7 @@ private[window] final class SlidingWindowFunctionFrame(
179185
override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
180186
input = rows
181187
inputIterator = input.generateIterator()
182-
if (inputIterator.hasNext) {
183-
nextRow = inputIterator.next()
184-
}
188+
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
185189
inputHighIndex = 0
186190
inputLowIndex = 0
187191
buffer.clear()
@@ -195,7 +199,7 @@ private[window] final class SlidingWindowFunctionFrame(
195199
// the output row upper bound.
196200
while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
197201
buffer.add(nextRow.copy())
198-
nextRow = if (inputIterator.hasNext) inputIterator.next() else null
202+
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
199203
inputHighIndex += 1
200204
bufferUpdated = true
201205
}
@@ -311,7 +315,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame(
311315
// the output row upper bound.
312316
while (nextRow != null && ubound.compare(nextRow, inputIndex, current, index) <= 0) {
313317
processor.update(nextRow)
314-
nextRow = if (inputIterator.hasNext) inputIterator.next() else null
318+
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
315319
inputIndex += 1
316320
bufferUpdated = true
317321
}
@@ -368,23 +372,21 @@ private[window] final class UnboundedFollowingWindowFunctionFrame(
368372
// the output row lower bound.
369373
val iterator = input.generateIterator(startIndex = inputIndex)
370374

371-
def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = {
372-
if (iterator.hasNext) iterator.next() else null
373-
}
374-
375-
var nextRow = getNextOrNull(iterator)
375+
var nextRow = WindowFunctionFrame.getNextOrNull(iterator)
376376
while (nextRow != null && lbound.compare(nextRow, inputIndex, current, index) < 0) {
377377
inputIndex += 1
378378
bufferUpdated = true
379-
nextRow = getNextOrNull(iterator)
379+
nextRow = WindowFunctionFrame.getNextOrNull(iterator)
380380
}
381381

382382
// Only recalculate and update when the buffer changes.
383383
if (bufferUpdated) {
384384
processor.initialize(input.length)
385-
while (nextRow != null) {
385+
if (nextRow != null) {
386386
processor.update(nextRow)
387-
nextRow = getNextOrNull(iterator)
387+
}
388+
while (iterator.hasNext) {
389+
processor.update(iterator.next())
388390
}
389391
processor.evaluate(target)
390392
}

0 commit comments

Comments
 (0)