Skip to content

Commit 3640ae2

Browse files
authored
Merge pull request #13523: Simplify LateDataDropping runner.
2 parents 9aa9c86 + 4dd7789 commit 3640ae2

1 file changed

Lines changed: 29 additions & 45 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 29 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
*/
1818
package org.apache.beam.runners.core;
1919

20-
import java.util.stream.Collectors;
21-
import java.util.stream.StreamSupport;
20+
import java.util.ArrayList;
21+
import java.util.List;
2222
import org.apache.beam.sdk.metrics.Counter;
2323
import org.apache.beam.sdk.metrics.Metrics;
2424
import org.apache.beam.sdk.state.TimeDomain;
@@ -29,7 +29,6 @@
2929
import org.apache.beam.sdk.values.KV;
3030
import org.apache.beam.sdk.values.WindowingStrategy;
3131
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
32-
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
3332
import org.joda.time.Instant;
3433

3534
/**
@@ -124,52 +123,37 @@ public LateDataFilter(
124123
*/
125124
public <K, InputT> Iterable<WindowedValue<InputT>> filter(
126125
final K key, Iterable<WindowedValue<InputT>> elements) {
127-
Iterable<Iterable<WindowedValue<InputT>>> windowsExpandedElements =
128-
StreamSupport.stream(elements.spliterator(), false)
129-
.map(
130-
input ->
131-
input.getWindows().stream()
132-
.map(
133-
window ->
134-
WindowedValue.of(
135-
input.getValue(),
136-
input.getTimestamp(),
137-
window,
138-
input.getPane()))
139-
.collect(Collectors.toList()))
140-
.collect(Collectors.toList());
141-
Iterable<WindowedValue<InputT>> concatElements = Iterables.concat(windowsExpandedElements);
142-
143-
// Bump the counter separately since we don't want multiple iterations to
144-
// increase it multiple times.
145-
for (WindowedValue<InputT> input : concatElements) {
146-
BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
147-
if (canDropDueToExpiredWindow(window)) {
148-
// The element is too late for this window.
149-
droppedDueToLateness.inc();
150-
WindowTracing.debug(
151-
"{}: Dropping element at {} for key:{}; window:{} "
152-
+ "since too far behind inputWatermark:{}; outputWatermark:{}",
153-
LateDataFilter.class.getSimpleName(),
154-
input.getTimestamp(),
155-
key,
156-
window,
157-
timerInternals.currentInputWatermarkTime(),
158-
timerInternals.currentOutputWatermarkTime());
126+
final List<WindowedValue<InputT>> nonLateElements = new ArrayList<>();
127+
for (WindowedValue<InputT> element : elements) {
128+
for (BoundedWindow window : element.getWindows()) {
129+
if (canDropDueToExpiredWindow(window)) {
130+
// The element is too late for this window.
131+
droppedDueToLateness.inc();
132+
WindowTracing.debug(
133+
"{}: Dropping element at {} for key:{}; window:{} "
134+
+ "since too far behind inputWatermark:{}; outputWatermark:{}",
135+
LateDataFilter.class.getSimpleName(),
136+
element.getTimestamp(),
137+
key,
138+
window,
139+
timerInternals.currentInputWatermarkTime(),
140+
timerInternals.currentOutputWatermarkTime());
141+
} else {
142+
nonLateElements.add(
143+
WindowedValue.of(
144+
element.getValue(), element.getTimestamp(), window, element.getPane()));
145+
}
159146
}
160147
}
161-
162-
// return nonLateElements
163-
return StreamSupport.stream(concatElements.spliterator(), false)
164-
.filter(
165-
input -> {
166-
BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
167-
return !canDropDueToExpiredWindow(window);
168-
})
169-
.collect(Collectors.toList());
148+
return nonLateElements;
170149
}
171150

172-
/** Is {@code window} expired w.r.t. the garbage collection watermark? */
151+
/**
152+
* Is {@code window} expired w.r.t. the garbage collection watermark?
153+
*
154+
* @param window Window to check for expiration.
155+
* @return True if element can be dropped.
156+
*/
173157
private boolean canDropDueToExpiredWindow(BoundedWindow window) {
174158
Instant inputWM = timerInternals.currentInputWatermarkTime();
175159
return LateDataUtils.garbageCollectionTime(window, windowingStrategy).isBefore(inputWM);

0 commit comments

Comments
 (0)