Skip to content

Commit 7611831

Browse files
authored
[BEAM-12724][BEAM-12349] Support user timers in Samza portable runner (#15349)
1 parent 2a6601b commit 7611831

10 files changed

Lines changed: 255 additions & 117 deletions

File tree

runners/samza/job-server/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@ createPortableValidatesRunnerTask(
7070
environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
7171
testCategories: {
7272
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
73-
// TODO: BEAM-12349
74-
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
7573
// TODO: BEAM-12350
7674
excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
7775
// TODO: BEAM-12681

runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ public void open(
230230
sideInputMapping,
231231
sideInputHandler,
232232
nonKeyedStateInternalsFactory,
233+
timerInternalsFactory,
233234
samzaPipelineOptions,
234235
outputManagerFactory.create(emitter, outputFutureCollector),
235236
stageBundleFactory,
@@ -415,19 +416,14 @@ private void fireTimer(KeyedTimerData<?> keyedTimerData) {
415416
// NOTE: not sure why this is safe, but DoFnOperator makes this assumption
416417
final BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
417418

418-
if (fnRunner instanceof DoFnRunnerWithKeyedInternals) {
419-
// Need to pass in the keyed TimerData here
420-
((DoFnRunnerWithKeyedInternals) fnRunner).onTimer(keyedTimerData, window);
421-
} else {
422-
pushbackFnRunner.onTimer(
423-
timer.getTimerId(),
424-
timer.getTimerFamilyId(),
425-
null,
426-
window,
427-
timer.getTimestamp(),
428-
timer.getOutputTimestamp(),
429-
timer.getDomain());
430-
}
419+
fnRunner.onTimer(
420+
timer.getTimerId(),
421+
timer.getTimerFamilyId(),
422+
keyedTimerData.getKey(),
423+
window,
424+
timer.getTimestamp(),
425+
timer.getOutputTimestamp(),
426+
timer.getDomain());
431427
}
432428

433429
// todo: should this go through bundle manager to start and finish the bundle?

runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717
*/
1818
package org.apache.beam.runners.samza.runtime;
1919

20-
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
21-
2220
import org.apache.beam.runners.core.DoFnRunner;
2321
import org.apache.beam.runners.core.KeyedWorkItem;
24-
import org.apache.beam.runners.core.TimerInternals;
2522
import org.apache.beam.sdk.state.TimeDomain;
2623
import org.apache.beam.sdk.transforms.DoFn;
2724
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -61,24 +58,6 @@ public void processElement(WindowedValue<InputT> elem) {
6158
}
6259
}
6360

64-
public void onTimer(KeyedTimerData keyedTimerData, BoundedWindow window) {
65-
setKeyedInternals(keyedTimerData);
66-
67-
try {
68-
final TimerInternals.TimerData timer = keyedTimerData.getTimerData();
69-
onTimer(
70-
timer.getTimerId(),
71-
timer.getTimerFamilyId(),
72-
keyedTimerData.getKey(),
73-
window,
74-
timer.getTimestamp(),
75-
timer.getOutputTimestamp(),
76-
timer.getDomain());
77-
} finally {
78-
clearKeyedInternals();
79-
}
80-
}
81-
8261
@Override
8362
public <KeyT> void onTimer(
8463
String timerId,
@@ -88,9 +67,16 @@ public <KeyT> void onTimer(
8867
Instant timestamp,
8968
Instant outputTimestamp,
9069
TimeDomain timeDomain) {
91-
checkState(keyedInternals.getKey() != null, "Key is not set for timer");
70+
// Note: wrap with KV.of(key, null) as a special use case of setKeyedInternals() to set key
71+
// directly.
72+
setKeyedInternals(KV.of(key, null));
9273

93-
underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
74+
try {
75+
underlying.onTimer(
76+
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
77+
} finally {
78+
clearKeyedInternals();
79+
}
9480
}
9581

9682
@Override
@@ -108,7 +94,6 @@ public DoFn<InputT, OutputT> getFn() {
10894
return underlying.getFn();
10995
}
11096

111-
@SuppressWarnings("unchecked")
11297
private void setKeyedInternals(Object value) {
11398
if (value instanceof KeyedWorkItem) {
11499
keyedInternals.setKey(((KeyedWorkItem<?, ?>) value).key());
@@ -117,8 +102,12 @@ private void setKeyedInternals(Object value) {
117102
if (key != null) {
118103
keyedInternals.setKey(key);
119104
}
120-
} else {
105+
} else if (value instanceof KV) {
121106
keyedInternals.setKey(((KV<?, ?>) value).getKey());
107+
} else {
108+
throw new UnsupportedOperationException(
109+
String.format(
110+
"%s is not supported in %s", value.getClass(), DoFnRunnerWithKeyedInternals.class));
122111
}
123112
}
124113

runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
*/
1818
package org.apache.beam.runners.samza.runtime;
1919

20+
import java.util.Collections;
2021
import java.util.List;
22+
import java.util.Locale;
2123
import java.util.Map;
2224
import java.util.concurrent.LinkedBlockingQueue;
2325
import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -30,16 +32,19 @@
3032
import org.apache.beam.runners.core.StatefulDoFnRunner;
3133
import org.apache.beam.runners.core.StepContext;
3234
import org.apache.beam.runners.core.TimerInternals;
35+
import org.apache.beam.runners.core.construction.Timer;
3336
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
3437
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
3538
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
3639
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
3740
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
41+
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
3842
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
3943
import org.apache.beam.runners.samza.SamzaExecutionContext;
4044
import org.apache.beam.runners.samza.SamzaPipelineOptions;
4145
import org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics;
4246
import org.apache.beam.runners.samza.util.StateUtils;
47+
import org.apache.beam.runners.samza.util.WindowUtils;
4348
import org.apache.beam.sdk.coders.Coder;
4449
import org.apache.beam.sdk.fn.data.FnDataReceiver;
4550
import org.apache.beam.sdk.state.BagState;
@@ -49,6 +54,7 @@
4954
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
5055
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
5156
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
57+
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
5258
import org.apache.beam.sdk.util.WindowedValue;
5359
import org.apache.beam.sdk.values.KV;
5460
import org.apache.beam.sdk.values.PCollectionView;
@@ -184,6 +190,7 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
184190
Map<?, PCollectionView<?>> sideInputMapping,
185191
SideInputHandler sideInputHandler,
186192
SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
193+
SamzaTimerInternalsFactory<?> timerInternalsFactory,
187194
SamzaPipelineOptions pipelineOptions,
188195
DoFnRunners.OutputManager outputManager,
189196
StageBundleFactory stageBundleFactory,
@@ -212,6 +219,9 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
212219
(SamzaExecutionContext) context.getApplicationContainerContext();
213220
final DoFnRunner<InT, FnOutT> underlyingRunner =
214221
new SdkHarnessDoFnRunner<>(
222+
timerInternalsFactory,
223+
WindowUtils.getWindowStrategy(
224+
executableStage.getInputPCollection().getId(), executableStage.getComponents()),
215225
outputManager,
216226
stageBundleFactory,
217227
mainOutputTag,
@@ -225,6 +235,8 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
225235
}
226236

227237
private static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT, FnOutT> {
238+
private final SamzaTimerInternalsFactory timerInternalsFactory;
239+
private final WindowingStrategy windowingStrategy;
228240
private final DoFnRunners.OutputManager outputManager;
229241
private final StageBundleFactory stageBundleFactory;
230242
private final TupleTag<FnOutT> mainOutputTag;
@@ -236,12 +248,16 @@ private static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT
236248
private StateRequestHandler stateRequestHandler;
237249

238250
private SdkHarnessDoFnRunner(
251+
SamzaTimerInternalsFactory<?> timerInternalsFactory,
252+
WindowingStrategy windowingStrategy,
239253
DoFnRunners.OutputManager outputManager,
240254
StageBundleFactory stageBundleFactory,
241255
TupleTag<FnOutT> mainOutputTag,
242256
Map<String, TupleTag<?>> idToTupleTagMap,
243257
BagState<WindowedValue<InT>> bundledEventsBag,
244258
StateRequestHandler stateRequestHandler) {
259+
this.timerInternalsFactory = timerInternalsFactory;
260+
this.windowingStrategy = windowingStrategy;
245261
this.outputManager = outputManager;
246262
this.stageBundleFactory = stageBundleFactory;
247263
this.mainOutputTag = mainOutputTag;
@@ -250,6 +266,17 @@ private SdkHarnessDoFnRunner(
250266
this.stateRequestHandler = stateRequestHandler;
251267
}
252268

269+
@SuppressWarnings("unchecked")
270+
private void timerDataConsumer(Timer<?> timerElement, TimerInternals.TimerData timerData) {
271+
TimerInternals timerInternals =
272+
timerInternalsFactory.timerInternalsForKey(timerElement.getUserKey());
273+
if (timerElement.getClearBit()) {
274+
timerInternals.deleteTimer(timerData);
275+
} else {
276+
timerInternals.setTimer(timerData);
277+
}
278+
}
279+
253280
@Override
254281
public void startBundle() {
255282
try {
@@ -264,11 +291,17 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
264291
}
265292
};
266293

294+
final Coder<BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder();
295+
final TimerReceiverFactory timerReceiverFactory =
296+
new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
297+
267298
remoteBundle =
268299
stageBundleFactory.getBundle(
269-
receiverFactory, stateRequestHandler, BundleProgressHandler.ignored());
300+
receiverFactory,
301+
timerReceiverFactory,
302+
stateRequestHandler,
303+
BundleProgressHandler.ignored());
270304

271-
// TODO: side input support needs to implement to handle this properly
272305
inputReceiver = Iterables.getOnlyElement(remoteBundle.getInputReceivers().values());
273306
bundledEventsBag
274307
.read()
@@ -312,7 +345,27 @@ public <KeyT> void onTimer(
312345
BoundedWindow window,
313346
Instant timestamp,
314347
Instant outputTimestamp,
315-
TimeDomain timeDomain) {}
348+
TimeDomain timeDomain) {
349+
final KV<String, String> timerReceiverKey =
350+
TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId);
351+
final FnDataReceiver<Timer> timerReceiver =
352+
remoteBundle.getTimerReceivers().get(timerReceiverKey);
353+
final Timer timerValue =
354+
Timer.of(
355+
key,
356+
timerId,
357+
Collections.singletonList(window),
358+
timestamp,
359+
outputTimestamp,
360+
// TODO: Support propagating the PaneInfo through.
361+
PaneInfo.NO_FIRING);
362+
try {
363+
timerReceiver.accept(timerValue);
364+
} catch (Exception e) {
365+
throw new RuntimeException(
366+
String.format(Locale.ENGLISH, "Failed to process timer %s", timerReceiver), e);
367+
}
368+
}
316369

317370
@Override
318371
public void finishBundle() {

runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.beam.runners.samza.transforms.GroupWithoutRepartition;
3636
import org.apache.beam.runners.samza.util.SamzaCoders;
3737
import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils;
38+
import org.apache.beam.runners.samza.util.WindowUtils;
3839
import org.apache.beam.sdk.Pipeline;
3940
import org.apache.beam.sdk.coders.Coder;
4041
import org.apache.beam.sdk.coders.KvCoder;
@@ -116,9 +117,9 @@ public void translatePortable(
116117
final RunnerApi.PCollection input = pipeline.getComponents().getPcollectionsOrThrow(inputId);
117118
final MessageStream<OpMessage<KV<K, InputT>>> inputStream = ctx.getMessageStreamById(inputId);
118119
final WindowingStrategy<?, BoundedWindow> windowingStrategy =
119-
ctx.getPortableWindowStrategy(inputId, pipeline.getComponents());
120+
WindowUtils.getWindowStrategy(inputId, pipeline.getComponents());
120121
final WindowedValue.WindowedValueCoder<KV<K, InputT>> windowedInputCoder =
121-
ctx.instantiateCoder(inputId, pipeline.getComponents());
122+
WindowUtils.instantiateWindowedCoder(inputId, pipeline.getComponents());
122123
final TupleTag<KV<K, OutputT>> outputTag =
123124
new TupleTag<>(Iterables.getOnlyElement(transform.getTransform().getOutputsMap().keySet()));
124125

runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.beam.runners.samza.runtime.SamzaDoFnInvokerRegistrar;
4646
import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils;
4747
import org.apache.beam.runners.samza.util.StateUtils;
48+
import org.apache.beam.runners.samza.util.WindowUtils;
4849
import org.apache.beam.sdk.coders.Coder;
4950
import org.apache.beam.sdk.coders.IterableCoder;
5051
import org.apache.beam.sdk.coders.KvCoder;
@@ -250,7 +251,7 @@ private static <InT, OutT> void doTranslatePortable(
250251
.getTransformsOrThrow(sideInputId.getTransformId())
251252
.getInputsOrThrow(sideInputId.getLocalName());
252253
final WindowingStrategy<?, BoundedWindow> windowingStrategy =
253-
ctx.getPortableWindowStrategy(sideInputCollectionId, components);
254+
WindowUtils.getWindowStrategy(sideInputCollectionId, components);
254255
final WindowedValue.WindowedValueCoder<?> coder =
255256
(WindowedValue.WindowedValueCoder) instantiateCoder(sideInputCollectionId, components);
256257

@@ -295,7 +296,7 @@ private static <InT, OutT> void doTranslatePortable(
295296
});
296297

297298
WindowedValue.WindowedValueCoder<InT> windowedInputCoder =
298-
ctx.instantiateCoder(inputId, pipeline.getComponents());
299+
WindowUtils.instantiateWindowedCoder(inputId, pipeline.getComponents());
299300

300301
// TODO: support schema and side inputs for portable runner
301302
// Note: transform.getTransform() is an ExecutableStage, not ParDo, so we need to extract
@@ -321,7 +322,7 @@ private static <InT, OutT> void doTranslatePortable(
321322
Collections.emptyMap(), // output coders not in use
322323
new ArrayList<>(sideInputMapping.values()),
323324
new ArrayList<>(idToTupleTagMap.values()), // used by java runner only
324-
ctx.getPortableWindowStrategy(inputId, stagePayload.getComponents()),
325+
WindowUtils.getWindowStrategy(inputId, stagePayload.getComponents()),
325326
idToViewMapping,
326327
new DoFnOp.MultiOutputManagerFactory(tagToIndexMap),
327328
ctx.getTransformFullName(),

0 commit comments

Comments
 (0)