|
17 | 17 | */ |
18 | 18 | package org.apache.beam.runners.core; |
19 | 19 |
|
20 | | -import java.util.stream.Collectors; |
21 | | -import java.util.stream.StreamSupport; |
| 20 | +import java.util.ArrayList; |
| 21 | +import java.util.List; |
22 | 22 | import org.apache.beam.sdk.metrics.Counter; |
23 | 23 | import org.apache.beam.sdk.metrics.Metrics; |
24 | 24 | import org.apache.beam.sdk.state.TimeDomain; |
|
29 | 29 | import org.apache.beam.sdk.values.KV; |
30 | 30 | import org.apache.beam.sdk.values.WindowingStrategy; |
31 | 31 | import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
32 | | -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; |
33 | 32 | import org.joda.time.Instant; |
34 | 33 |
|
35 | 34 | /** |
@@ -124,52 +123,37 @@ public LateDataFilter( |
124 | 123 | */ |
125 | 124 | public <K, InputT> Iterable<WindowedValue<InputT>> filter( |
126 | 125 | final K key, Iterable<WindowedValue<InputT>> elements) { |
127 | | - Iterable<Iterable<WindowedValue<InputT>>> windowsExpandedElements = |
128 | | - StreamSupport.stream(elements.spliterator(), false) |
129 | | - .map( |
130 | | - input -> |
131 | | - input.getWindows().stream() |
132 | | - .map( |
133 | | - window -> |
134 | | - WindowedValue.of( |
135 | | - input.getValue(), |
136 | | - input.getTimestamp(), |
137 | | - window, |
138 | | - input.getPane())) |
139 | | - .collect(Collectors.toList())) |
140 | | - .collect(Collectors.toList()); |
141 | | - Iterable<WindowedValue<InputT>> concatElements = Iterables.concat(windowsExpandedElements); |
142 | | - |
143 | | - // Bump the counter separately since we don't want multiple iterations to |
144 | | - // increase it multiple times. |
145 | | - for (WindowedValue<InputT> input : concatElements) { |
146 | | - BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); |
147 | | - if (canDropDueToExpiredWindow(window)) { |
148 | | - // The element is too late for this window. |
149 | | - droppedDueToLateness.inc(); |
150 | | - WindowTracing.debug( |
151 | | - "{}: Dropping element at {} for key:{}; window:{} " |
152 | | - + "since too far behind inputWatermark:{}; outputWatermark:{}", |
153 | | - LateDataFilter.class.getSimpleName(), |
154 | | - input.getTimestamp(), |
155 | | - key, |
156 | | - window, |
157 | | - timerInternals.currentInputWatermarkTime(), |
158 | | - timerInternals.currentOutputWatermarkTime()); |
| 126 | + final List<WindowedValue<InputT>> nonLateElements = new ArrayList<>(); |
| 127 | + for (WindowedValue<InputT> element : elements) { |
| 128 | + for (BoundedWindow window : element.getWindows()) { |
| 129 | + if (canDropDueToExpiredWindow(window)) { |
| 130 | + // The element is too late for this window. |
| 131 | + droppedDueToLateness.inc(); |
| 132 | + WindowTracing.debug( |
| 133 | + "{}: Dropping element at {} for key:{}; window:{} " |
| 134 | + + "since too far behind inputWatermark:{}; outputWatermark:{}", |
| 135 | + LateDataFilter.class.getSimpleName(), |
| 136 | + element.getTimestamp(), |
| 137 | + key, |
| 138 | + window, |
| 139 | + timerInternals.currentInputWatermarkTime(), |
| 140 | + timerInternals.currentOutputWatermarkTime()); |
| 141 | + } else { |
| 142 | + nonLateElements.add( |
| 143 | + WindowedValue.of( |
| 144 | + element.getValue(), element.getTimestamp(), window, element.getPane())); |
| 145 | + } |
159 | 146 | } |
160 | 147 | } |
161 | | - |
162 | | - // return nonLateElements |
163 | | - return StreamSupport.stream(concatElements.spliterator(), false) |
164 | | - .filter( |
165 | | - input -> { |
166 | | - BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); |
167 | | - return !canDropDueToExpiredWindow(window); |
168 | | - }) |
169 | | - .collect(Collectors.toList()); |
| 148 | + return nonLateElements; |
170 | 149 | } |
171 | 150 |
|
172 | | - /** Is {@code window} expired w.r.t. the garbage collection watermark? */ |
| 151 | + /** |
| 152 | + * Is {@code window} expired w.r.t. the garbage collection watermark? |
| 153 | + * |
| 154 | + * @param window Window to check for expiration. |
| 155 | + * @return True if element can be dropped. |
| 156 | + */ |
173 | 157 | private boolean canDropDueToExpiredWindow(BoundedWindow window) { |
174 | 158 | Instant inputWM = timerInternals.currentInputWatermarkTime(); |
175 | 159 | return LateDataUtils.garbageCollectionTime(window, windowingStrategy).isBefore(inputWM); |
|
0 commit comments