Skip to content

Commit 6812836

Browse files
authored
[Spark runner] Support for ProcessingTime Timer for Spark Runner (#35316)
* refactor : extract `AbstractInOutIterator` base class for Spark input processing * feat : implement processing time timers in Spark runner * test : add test for processing time timers * chore : touch trigger files * chore : edit CHANGES.md * chore : replace ByteUtils * refactor: extract AbstractInOutIterator to separate file * test: add unit tests for AbstractInOutIterator timer functionality * feat: add timer expiration handling and tests for TimerUtils * refactor: improve timer handling in Spark streaming runner * refactor: remove redundant @NotNull annotation in TimerUtils
1 parent db0c17f commit 6812836

File tree

13 files changed

+991
-124
lines changed

13 files changed

+991
-124
lines changed

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@
1111
"https://github.com/apache/beam/pull/34080": "noting that PR #34080 should run this test",
1212
"https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test",
1313
"https://github.com/apache/beam/pull/34560": "noting that PR #34560 should run this test",
14-
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
14+
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface",
15+
"https://github.com/apache/beam/pull/35316": "noting that PR #35316 should run this test"
1516
}

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark_Java11.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@
77
"https://github.com/apache/beam/pull/33322": "noting that PR #33322 should run this test",
88
"https://github.com/apache/beam/pull/34080": "noting that PR #34080 should run this test",
99
"https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test",
10-
"https://github.com/apache/beam/pull/34560": "noting that PR #34560 should run this test"
10+
"https://github.com/apache/beam/pull/34560": "noting that PR #34560 should run this test",
11+
"https://github.com/apache/beam/pull/35316": "noting that PR #35316 should run this test"
1112
}

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474

7575
## New Features / Improvements
7676

77+
* Added support for Processing time Timer in the Spark Classic runner ([#33633](https://github.com/apache/beam/issues/33633)).
7778
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
7879
* Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)).
7980
* [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496))

runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.Collection;
2323
import java.util.Collections;
24+
import java.util.Comparator;
2425
import java.util.Iterator;
2526
import java.util.List;
2627
import java.util.Map;
@@ -43,7 +44,7 @@
4344
public class SparkTimerInternals implements TimerInternals {
4445
private final Instant highWatermark;
4546
private final Instant synchronizedProcessingTime;
46-
private final Set<TimerData> timers = Sets.newHashSet();
47+
private final Set<TimerData> timers = Sets.newConcurrentHashSet();
4748

4849
private Instant inputWatermark;
4950

@@ -122,7 +123,14 @@ public void setTimer(TimerData timer) {
122123
@Override
123124
public void deleteTimer(
124125
StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) {
125-
throw new UnsupportedOperationException("Deleting a timer by ID is not yet supported.");
126+
this.timers.stream()
127+
.filter(
128+
timer ->
129+
namespace.equals(timer.getNamespace())
130+
&& timerId.equals(timer.getTimerId())
131+
&& timerFamilyId.equals(timer.getTimerFamilyId())
132+
&& timeDomain.equals(timer.getDomain()))
133+
.forEach(this::deleteTimer);
126134
}
127135

128136
@Override
@@ -182,6 +190,43 @@ public static Iterator<TimerData> deserializeTimers(
182190
return CoderHelpers.fromByteArrays(serTimers, timerDataCoder).iterator();
183191
}
184192

193+
/**
194+
* Checks if there are any expired timers in the {@link TimeDomain#PROCESSING_TIME} domain.
195+
*
196+
* <p>A timer is considered expired when its timestamp is less than the current processing time.
197+
*
198+
* @return {@code true} if at least one expired processing timer exists, {@code false} otherwise.
199+
*/
200+
public boolean hasNextProcessingTimer() {
201+
final Instant currentProcessingTime = this.currentProcessingTime();
202+
return this.timers.stream()
203+
.anyMatch(
204+
(TimerData timerData) ->
205+
timerData.getDomain().equals(TimeDomain.PROCESSING_TIME)
206+
&& currentProcessingTime.isAfter(timerData.getTimestamp()));
207+
}
208+
209+
/**
210+
* Finds the latest timer in {@link TimeDomain#PROCESSING_TIME} domain that has expired based on
211+
* the current processing time.
212+
*
213+
* <p>A timer is considered expired when its timestamp is less than the current processing time.
214+
* If multiple expired timers exist, the one with the latest timestamp will be returned.
215+
*
216+
* @return The expired processing timer with the latest timestamp if one exists, or {@code null}
217+
* if no processing timers are ready to fire.
218+
*/
219+
public @Nullable TimerData getNextProcessingTimer() {
220+
final Instant currentProcessingTime = this.currentProcessingTime();
221+
return this.timers.stream()
222+
.filter(
223+
(TimerData timerData) ->
224+
timerData.getDomain().equals(TimeDomain.PROCESSING_TIME)
225+
&& currentProcessingTime.isAfter(timerData.getTimestamp()))
226+
.max(Comparator.comparing(TimerData::getTimestamp))
227+
.orElse(null);
228+
}
229+
185230
@Override
186231
public String toString() {
187232
return "SparkTimerInternals{"
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.spark.translation;
19+
20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21+
22+
import org.apache.beam.runners.core.StateNamespace;
23+
import org.apache.beam.runners.core.StateNamespaces;
24+
import org.apache.beam.runners.core.TimerInternals;
25+
import org.apache.beam.runners.spark.translation.streaming.ParDoStateUpdateFn;
26+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
27+
import org.apache.beam.sdk.values.TupleTag;
28+
import org.apache.beam.sdk.values.WindowedValue;
29+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
30+
import scala.Tuple2;
31+
32+
/**
33+
* Abstract base class for iterators that process Spark input data and produce corresponding output
34+
* values. This class serves as a common base for both bounded and unbounded processing strategies
35+
* in the Spark runner.
36+
*
37+
* <p>The class extends Guava's {@link AbstractIterator} and provides common functionality for
38+
* iterating through input elements, processing them using a DoFnRunner, and producing output
39+
* elements as tuples of {@link TupleTag} and {@link WindowedValue} pairs.
40+
*
41+
* @param <K> The key type for the processing context
42+
* @param <InputT> The input element type to be processed
43+
* @param <OutputT> The output element type after processing
44+
*/
45+
public abstract class AbstractInOutIterator<K, InputT, OutputT>
46+
extends AbstractIterator<Tuple2<TupleTag<?>, WindowedValue<?>>> {
47+
protected final SparkProcessContext<K, InputT, OutputT> ctx;
48+
49+
protected AbstractInOutIterator(SparkProcessContext<K, InputT, OutputT> ctx) {
50+
this.ctx = ctx;
51+
}
52+
53+
/**
54+
* Fires a timer using the DoFnRunner from the context and performs cleanup afterwards.
55+
*
56+
* <p>After firing the timer, if the timer data iterator is an instance of {@link
57+
* ParDoStateUpdateFn.SparkTimerInternalsIterator}, the fired timer will be deleted as part of
58+
* cleanup to prevent re-firing of the same timer.
59+
*
60+
* @param timer The timer data containing information about the timer to fire
61+
* @throws IllegalArgumentException If the timer namespace is not a {@link
62+
* StateNamespaces.WindowNamespace}
63+
*/
64+
public void fireTimer(TimerInternals.TimerData timer) {
65+
StateNamespace namespace = timer.getNamespace();
66+
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
67+
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
68+
try {
69+
this.ctx
70+
.getDoFnRunner()
71+
.onTimer(
72+
timer.getTimerId(),
73+
timer.getTimerFamilyId(),
74+
this.ctx.getKey(),
75+
window,
76+
timer.getTimestamp(),
77+
timer.getOutputTimestamp(),
78+
timer.getDomain());
79+
} finally {
80+
if (this.ctx.getTimerDataIterator()
81+
instanceof ParDoStateUpdateFn.SparkTimerInternalsIterator) {
82+
final ParDoStateUpdateFn.SparkTimerInternalsIterator timerDataIterator =
83+
(ParDoStateUpdateFn.SparkTimerInternalsIterator) this.ctx.getTimerDataIterator();
84+
timerDataIterator.deleteTimer(timer);
85+
}
86+
}
87+
}
88+
}

runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.beam.runners.spark.translation;
1919

20-
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21-
2220
import java.util.ArrayDeque;
2321
import java.util.Iterator;
2422
import java.util.NoSuchElementException;
@@ -28,16 +26,11 @@
2826
import java.util.concurrent.LinkedBlockingQueue;
2927
import java.util.concurrent.TimeUnit;
3028
import javax.annotation.CheckForNull;
31-
import org.apache.beam.runners.core.StateNamespace;
32-
import org.apache.beam.runners.core.StateNamespaces;
33-
import org.apache.beam.runners.core.TimerInternals;
3429
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
35-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
3630
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
3731
import org.apache.beam.sdk.values.TupleTag;
3832
import org.apache.beam.sdk.values.WindowedValue;
3933
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
40-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
4134
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
4235
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
4336
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -124,20 +117,18 @@ public synchronized <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
124117
}
125118
}
126119

127-
private class UnboundedInOutIterator<K>
128-
extends AbstractIterator<Tuple2<TupleTag<?>, WindowedValue<?>>> {
120+
private class UnboundedInOutIterator<K> extends AbstractInOutIterator<K, FnInputT, FnOutputT> {
129121

130122
private final Iterator<WindowedValue<FnInputT>> inputIterator;
131-
private final SparkProcessContext<K, FnInputT, FnOutputT> ctx;
132-
private Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> outputIterator;
123+
private volatile Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> outputIterator;
133124
private boolean isBundleStarted;
134125
private boolean isBundleFinished;
135126

136127
UnboundedInOutIterator(
137128
Iterator<WindowedValue<FnInputT>> iterator,
138129
SparkProcessContext<K, FnInputT, FnOutputT> ctx) {
130+
super(ctx);
139131
this.inputIterator = iterator;
140-
this.ctx = ctx;
141132
this.outputIterator = outputManager.iterator();
142133
}
143134

@@ -185,21 +176,6 @@ private class UnboundedInOutIterator<K>
185176
throw re;
186177
}
187178
}
188-
189-
private void fireTimer(TimerInternals.TimerData timer) {
190-
StateNamespace namespace = timer.getNamespace();
191-
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
192-
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
193-
ctx.getDoFnRunner()
194-
.onTimer(
195-
timer.getTimerId(),
196-
timer.getTimerFamilyId(),
197-
ctx.getKey(),
198-
window,
199-
timer.getTimestamp(),
200-
timer.getOutputTimestamp(),
201-
timer.getDomain());
202-
}
203179
}
204180
}
205181

@@ -281,9 +257,8 @@ public <OutputT> void output(TupleTag<OutputT> tag, WindowedValue<OutputT> outpu
281257
}
282258

283259
private class BoundedInOutIterator<K, InputT, OutputT>
284-
extends AbstractIterator<Tuple2<TupleTag<?>, WindowedValue<?>>> {
260+
extends AbstractInOutIterator<K, InputT, OutputT> {
285261

286-
private final SparkProcessContext<K, InputT, OutputT> ctx;
287262
private final Iterator<WindowedValue<InputT>> inputIterator;
288263
private final Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> outputIterator;
289264
private final ExecutorService executorService;
@@ -293,8 +268,8 @@ private class BoundedInOutIterator<K, InputT, OutputT>
293268

294269
BoundedInOutIterator(
295270
Iterator<WindowedValue<InputT>> iterator, SparkProcessContext<K, InputT, OutputT> ctx) {
271+
super(ctx);
296272
this.inputIterator = iterator;
297-
this.ctx = ctx;
298273
this.outputIterator = outputManager.iterator();
299274
this.executorService =
300275
Executors.newSingleThreadScheduledExecutor(
@@ -325,21 +300,6 @@ private class BoundedInOutIterator<K, InputT, OutputT>
325300
}
326301
}
327302

328-
private void fireTimer(TimerInternals.TimerData timer) {
329-
StateNamespace namespace = timer.getNamespace();
330-
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
331-
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
332-
ctx.getDoFnRunner()
333-
.onTimer(
334-
timer.getTimerId(),
335-
timer.getTimerFamilyId(),
336-
ctx.getKey(),
337-
window,
338-
timer.getTimestamp(),
339-
timer.getOutputTimestamp(),
340-
timer.getDomain());
341-
}
342-
343303
private Future<?> startOutputProducerTask() {
344304
return executorService.submit(
345305
() -> {

runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.beam.runners.spark.util.ByteArray;
3434
import org.apache.beam.runners.spark.util.SideInputBroadcast;
3535
import org.apache.beam.sdk.coders.Coder;
36+
import org.apache.beam.sdk.state.TimeDomain;
37+
import org.apache.beam.sdk.state.TimerSpec;
3638
import org.apache.beam.sdk.transforms.DoFn;
3739
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
3840
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -274,22 +276,30 @@ public static Long getBatchDuration(final SerializablePipelineOptions options) {
274276
}
275277

276278
/**
277-
* Reject timers {@link DoFn}.
279+
* Checks if the given DoFn uses any timers.
278280
*
279-
* @param doFn the {@link DoFn} to possibly reject.
281+
* @param doFn the DoFn to check for timer usage
282+
* @return true if the DoFn uses timers, false otherwise
280283
*/
281-
public static void rejectTimers(DoFn<?, ?> doFn) {
282-
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
283-
if (signature.timerDeclarations().size() > 0
284-
|| signature.timerFamilyDeclarations().size() > 0) {
285-
throw new UnsupportedOperationException(
286-
String.format(
287-
"Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
288-
DoFn.TimerId.class.getSimpleName(),
289-
doFn.getClass().getName(),
290-
DoFn.class.getSimpleName(),
291-
SparkRunner.class.getSimpleName()));
284+
public static boolean hasTimers(DoFn<?, ?> doFn) {
285+
final DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
286+
return signature.usesTimers();
287+
}
288+
289+
/**
290+
* Checks if the given DoFn uses event time timers.
291+
*
292+
* @param doFn the DoFn to check for event time timer usage
293+
* @return true if the DoFn uses event time timers, false otherwise. Note: Returns false if the
294+
* DoFn has no timers at all.
295+
*/
296+
public static boolean hasEventTimers(DoFn<?, ?> doFn) {
297+
for (DoFnSignature.TimerDeclaration timerDeclaration :
298+
DoFnSignatures.signatureForDoFn(doFn).timerDeclarations().values()) {
299+
final TimerSpec timerSpec = DoFnSignatures.getTimerSpecOrThrow(timerDeclaration, doFn);
300+
return timerSpec.getTimeDomain().equals(TimeDomain.EVENT_TIME);
292301
}
302+
return false;
293303
}
294304

295305
/**

0 commit comments

Comments
 (0)