@@ -44,6 +44,12 @@ private[window] abstract class WindowFunctionFrame {
4444 def write (index : Int , current : InternalRow ): Unit
4545}
4646
47+ object WindowFunctionFrame {
48+ def getNextOrNull (iterator : Iterator [UnsafeRow ]): UnsafeRow = {
49+ if (iterator.hasNext) iterator.next() else null
50+ }
51+ }
52+
4753/**
4854 * The offset window frame calculates frames containing LEAD/LAG statements.
4955 *
@@ -123,7 +129,7 @@ private[window] final class OffsetWindowFunctionFrame(
123129
124130 override def write (index : Int , current : InternalRow ): Unit = {
125131 if (inputIndex >= 0 && inputIndex < input.length) {
126- val r = if (inputIterator.hasNext) inputIterator.next() else null
132+ val r = WindowFunctionFrame .getNextOrNull (inputIterator)
127133 projection(r)
128134 } else {
129135 // Use default values since the offset row does not exist.
@@ -179,9 +185,7 @@ private[window] final class SlidingWindowFunctionFrame(
179185 override def prepare (rows : ExternalAppendOnlyUnsafeRowArray ): Unit = {
180186 input = rows
181187 inputIterator = input.generateIterator()
182- if (inputIterator.hasNext) {
183- nextRow = inputIterator.next()
184- }
188+ nextRow = WindowFunctionFrame .getNextOrNull(inputIterator)
185189 inputHighIndex = 0
186190 inputLowIndex = 0
187191 buffer.clear()
@@ -195,7 +199,7 @@ private[window] final class SlidingWindowFunctionFrame(
195199 // the output row upper bound.
196200 while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0 ) {
197201 buffer.add(nextRow.copy())
198- nextRow = if (inputIterator.hasNext) inputIterator.next() else null
202+ nextRow = WindowFunctionFrame .getNextOrNull (inputIterator)
199203 inputHighIndex += 1
200204 bufferUpdated = true
201205 }
@@ -311,7 +315,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame(
311315 // the output row upper bound.
312316 while (nextRow != null && ubound.compare(nextRow, inputIndex, current, index) <= 0 ) {
313317 processor.update(nextRow)
314- nextRow = if (inputIterator.hasNext) inputIterator.next() else null
318+ nextRow = WindowFunctionFrame .getNextOrNull (inputIterator)
315319 inputIndex += 1
316320 bufferUpdated = true
317321 }
@@ -368,23 +372,21 @@ private[window] final class UnboundedFollowingWindowFunctionFrame(
368372 // the output row lower bound.
369373 val iterator = input.generateIterator(startIndex = inputIndex)
370374
371- def getNextOrNull (iterator : Iterator [UnsafeRow ]): UnsafeRow = {
372- if (iterator.hasNext) iterator.next() else null
373- }
374-
375- var nextRow = getNextOrNull(iterator)
375+ var nextRow = WindowFunctionFrame .getNextOrNull(iterator)
376376 while (nextRow != null && lbound.compare(nextRow, inputIndex, current, index) < 0 ) {
377377 inputIndex += 1
378378 bufferUpdated = true
379- nextRow = getNextOrNull(iterator)
379+ nextRow = WindowFunctionFrame . getNextOrNull(iterator)
380380 }
381381
382382 // Only recalculate and update when the buffer changes.
383383 if (bufferUpdated) {
384384 processor.initialize(input.length)
385- while (nextRow != null ) {
385+ if (nextRow != null ) {
386386 processor.update(nextRow)
387- nextRow = getNextOrNull(iterator)
387+ }
388+ while (iterator.hasNext) {
389+ processor.update(iterator.next())
388390 }
389391 processor.evaluate(target)
390392 }
0 commit comments