Skip to content

Commit 3cf8cf3

Browse files
committed
[BEAM-13614] Add OnWindowExpiration support to the Java SDK harness and proto translation.
This implementation adds a timer family spec in the event time domain and adds the field to the ParDoPayload mentioning which timer family spec represents the on window expiration callback.
1 parent fea9c4b commit 3cf8cf3

9 files changed

Lines changed: 385 additions & 51 deletions

File tree

model/pipeline/src/main/proto/beam_runner_api.proto

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,12 @@ message ParDoPayload {
524524
// be placed in the pipeline requirements.
525525
bool requires_stable_input = 11;
526526

527+
// If populated, the name of the timer family spec which should be notified
528+
// on each window expiry.
529+
// If this is set, the corresponding standard requirement should also
530+
// be placed in the pipeline requirements.
531+
string on_window_expiration_timer_family_spec = 12;
532+
527533
reserved 6;
528534
}
529535

@@ -1601,7 +1607,7 @@ message StandardRunnerProtocols {
16011607
// to be added in a forwards-compatible way).
16021608
message StandardRequirements {
16031609
enum Enum {
1604-
// This requirement indicates the state_spec and time_spec fields of ParDo
1610+
// This requirement indicates the state_specs and timer_family_specs fields of ParDo
16051611
// transform payloads must be inspected.
16061612
REQUIRES_STATEFUL_PROCESSING = 0 [(beam_urn) = "beam:requirement:pardo:stateful:v1"];
16071613

@@ -1620,6 +1626,10 @@ message StandardRequirements {
16201626
// This requirement indicates the restriction_coder_id field of ParDo
16211627
// transform payloads must be inspected.
16221628
REQUIRES_SPLITTABLE_DOFN = 4 [(beam_urn) = "beam:requirement:pardo:splittable_dofn:v1"];
1629+
1630+
// This requirement indicates that the on_window_expiration_timer_family_spec field
1631+
// of ParDo transform payloads must be inspected.
1632+
REQUIRES_ON_WINDOW_EXPIRATION = 5 [(beam_urn) = "beam:requirement:pardo:on_window_expiration:v1"];
16231633
}
16241634
}
16251635

runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
3131
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
3232

33+
import com.google.auto.value.AutoValue;
3334
import java.io.IOException;
3435
import java.util.ArrayList;
3536
import java.util.Collections;
@@ -38,6 +39,7 @@
3839
import java.util.Map;
3940
import java.util.Set;
4041
import java.util.stream.Collectors;
42+
import javax.annotation.Nullable;
4143
import org.apache.beam.model.pipeline.v1.RunnerApi;
4244
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
4345
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
@@ -116,6 +118,9 @@ public class ParDoTranslation {
116118
*/
117119
public static final String REQUIRES_SPLITTABLE_DOFN_URN =
118120
"beam:requirement:pardo:splittable_dofn:v1";
121+
/** This requirement indicates that the ParDo requires a callback on each window expiration. */
122+
public static final String REQUIRES_ON_WINDOW_EXPIRATION_URN =
123+
"beam:requirement:pardo:on_window_expiration:v1";
119124

120125
static {
121126
checkState(
@@ -132,6 +137,9 @@ public class ParDoTranslation {
132137
checkState(
133138
REQUIRES_SPLITTABLE_DOFN_URN.equals(
134139
getUrn(StandardRequirements.Enum.REQUIRES_SPLITTABLE_DOFN)));
140+
checkState(
141+
REQUIRES_ON_WINDOW_EXPIRATION_URN.equals(
142+
getUrn(StandardRequirements.Enum.REQUIRES_ON_WINDOW_EXPIRATION)));
135143
}
136144

137145
/** The URN for an unknown Java {@link DoFn}. */
@@ -281,8 +289,7 @@ public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents compon
281289
}
282290

283291
@Override
284-
public Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(
285-
SdkComponents newComponents) {
292+
public ParDoLikeTimerFamilySpecs translateTimerFamilySpecs(SdkComponents newComponents) {
286293
Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs = new HashMap<>();
287294

288295
for (Map.Entry<String, TimerDeclaration> timer :
@@ -306,14 +313,34 @@ public Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(
306313
windowCoder);
307314
timerFamilySpecs.put(timerFamily.getKey(), spec);
308315
}
309-
return timerFamilySpecs;
316+
317+
String onWindowExpirationTimerFamilySpec = null;
318+
if (signature.onWindowExpiration() != null) {
319+
RunnerApi.TimerFamilySpec spec =
320+
RunnerApi.TimerFamilySpec.newBuilder()
321+
.setTimeDomain(translateTimeDomain(TimeDomain.EVENT_TIME))
322+
.setTimerFamilyCoderId(
323+
registerCoderOrThrow(components, Timer.Coder.of(keyCoder, windowCoder)))
324+
.build();
325+
for (int i = 0; i < Integer.MAX_VALUE; ++i) {
326+
onWindowExpirationTimerFamilySpec = "onWindowExpiration" + i;
327+
if (!timerFamilySpecs.containsKey(onWindowExpirationTimerFamilySpec)) {
328+
break;
329+
}
330+
}
331+
timerFamilySpecs.put(onWindowExpirationTimerFamilySpec, spec);
332+
}
333+
334+
return ParDoLikeTimerFamilySpecs.create(
335+
timerFamilySpecs, onWindowExpirationTimerFamilySpec);
310336
}
311337

312338
@Override
313339
public boolean isStateful() {
314340
return !signature.stateDeclarations().isEmpty()
315341
|| !signature.timerDeclarations().isEmpty()
316-
|| !signature.timerFamilyDeclarations().isEmpty();
342+
|| !signature.timerFamilyDeclarations().isEmpty()
343+
|| signature.onWindowExpiration() != null;
317344
}
318345

319346
@Override
@@ -645,7 +672,7 @@ static StateSpec<?> fromProto(RunnerApi.StateSpec stateSpec, RehydratedComponent
645672
}
646673
}
647674

648-
private static String registerCoderOrThrow(SdkComponents components, Coder coder) {
675+
public static String registerCoderOrThrow(SdkComponents components, Coder coder) {
649676
try {
650677
return components.registerCoder(coder);
651678
} catch (IOException exc) {
@@ -665,7 +692,7 @@ public static RunnerApi.TimerFamilySpec translateTimerFamilySpec(
665692
.build();
666693
}
667694

668-
private static RunnerApi.TimeDomain.Enum translateTimeDomain(TimeDomain timeDomain) {
695+
public static RunnerApi.TimeDomain.Enum translateTimeDomain(TimeDomain timeDomain) {
669696
switch (timeDomain) {
670697
case EVENT_TIME:
671698
return RunnerApi.TimeDomain.Enum.EVENT_TIME;
@@ -769,6 +796,22 @@ public static FunctionSpec translateWindowMappingFn(
769796
.build();
770797
}
771798

799+
@AutoValue
800+
public abstract static class ParDoLikeTimerFamilySpecs {
801+
802+
public static ParDoLikeTimerFamilySpecs create(
803+
Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs,
804+
@Nullable String onWindowExpirationTimerFamilySpec) {
805+
return new AutoValue_ParDoTranslation_ParDoLikeTimerFamilySpecs(
806+
timerFamilySpecs, onWindowExpirationTimerFamilySpec);
807+
}
808+
809+
abstract Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs();
810+
811+
@Nullable
812+
abstract String onWindowExpirationTimerFamilySpec();
813+
}
814+
772815
/** These methods drive to-proto translation from Java and from rehydrated ParDos. */
773816
public interface ParDoLike {
774817
FunctionSpec translateDoFn(SdkComponents newComponents);
@@ -778,7 +821,7 @@ public interface ParDoLike {
778821
Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
779822
throws IOException;
780823

781-
Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(SdkComponents newComponents);
824+
ParDoLikeTimerFamilySpecs translateTimerFamilySpecs(SdkComponents newComponents);
782825

783826
boolean isStateful();
784827

@@ -812,15 +855,24 @@ public static ParDoPayload payloadForParDoLike(ParDoLike parDo, SdkComponents co
812855
components.addRequirement(REQUIRES_TIME_SORTED_INPUT_URN);
813856
}
814857

815-
return ParDoPayload.newBuilder()
816-
.setDoFn(parDo.translateDoFn(components))
817-
.putAllStateSpecs(parDo.translateStateSpecs(components))
818-
.putAllTimerFamilySpecs(parDo.translateTimerFamilySpecs(components))
819-
.putAllSideInputs(parDo.translateSideInputs(components))
820-
.setRequiresStableInput(parDo.isRequiresStableInput())
821-
.setRequiresTimeSortedInput(parDo.isRequiresTimeSortedInput())
822-
.setRestrictionCoderId(parDo.translateRestrictionCoderId(components))
823-
.setRequestsFinalization(parDo.requestsFinalization())
824-
.build();
858+
ParDoLikeTimerFamilySpecs timerFamilySpecs = parDo.translateTimerFamilySpecs(components);
859+
ParDoPayload.Builder builder =
860+
ParDoPayload.newBuilder()
861+
.setDoFn(parDo.translateDoFn(components))
862+
.putAllStateSpecs(parDo.translateStateSpecs(components))
863+
.putAllTimerFamilySpecs(timerFamilySpecs.timerFamilySpecs())
864+
.putAllSideInputs(parDo.translateSideInputs(components))
865+
.setRequiresStableInput(parDo.isRequiresStableInput())
866+
.setRequiresTimeSortedInput(parDo.isRequiresTimeSortedInput())
867+
.setRestrictionCoderId(parDo.translateRestrictionCoderId(components))
868+
.setRequestsFinalization(parDo.requestsFinalization());
869+
870+
if (timerFamilySpecs.onWindowExpirationTimerFamilySpec() != null) {
871+
components.addRequirement(REQUIRES_ON_WINDOW_EXPIRATION_URN);
872+
builder.setOnWindowExpirationTimerFamilySpec(
873+
timerFamilySpecs.onWindowExpirationTimerFamilySpec());
874+
}
875+
876+
return builder.build();
825877
}
826878
}

runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
3030
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
3131
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
32-
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
3332
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
3433
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLike;
34+
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLikeTimerFamilySpecs;
3535
import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator;
3636
import org.apache.beam.runners.core.construction.ReadTranslation.UnboundedReadPayloadTranslator;
3737
import org.apache.beam.sdk.Pipeline;
@@ -435,17 +435,16 @@ public Map<String, StateSpec> translateStateSpecs(SdkComponents components) {
435435
}
436436

437437
@Override
438-
public Map<String, TimerFamilySpec> translateTimerFamilySpecs(
438+
public ParDoLikeTimerFamilySpecs translateTimerFamilySpecs(
439439
SdkComponents newComponents) {
440440
// SDFs don't have timers.
441-
return ImmutableMap.of();
441+
return ParDoLikeTimerFamilySpecs.create(ImmutableMap.of(), null);
442442
}
443443

444444
@Override
445445
public boolean isStateful() {
446-
return !signature.stateDeclarations().isEmpty()
447-
|| !signature.timerDeclarations().isEmpty()
448-
|| !signature.timerFamilyDeclarations().isEmpty();
446+
// SDFs don't have state or timers.
447+
return false;
449448
}
450449

451450
@Override

runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ public void testToProto() throws Exception {
150150
assertEquals(
151151
parDo.getFn() instanceof StateTimerDropElementsFn,
152152
components.requirements().contains(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN));
153+
assertEquals(
154+
parDo.getFn() instanceof StateTimerDropElementsFn,
155+
components.requirements().contains(ParDoTranslation.REQUIRES_ON_WINDOW_EXPIRATION_URN));
156+
assertEquals(
157+
parDo.getFn() instanceof StateTimerDropElementsFn ? "onWindowExpiration0" : "",
158+
payload.getOnWindowExpirationTimerFamilySpec());
153159
}
154160

155161
@Test
@@ -339,6 +345,9 @@ public void onEventTime(OnTimerContext context) {}
339345
@OnTimer(PROCESSING_TIMER_ID)
340346
public void onProcessingTime(OnTimerContext context) {}
341347

348+
@OnWindowExpiration
349+
public void onWindowExpiration() {}
350+
342351
@Override
343352
public boolean equals(@Nullable Object other) {
344353
return other instanceof StateTimerDropElementsFn;

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
package org.apache.beam.runners.dataflow;
1919

2020
import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
21+
import static org.apache.beam.runners.core.construction.ParDoTranslation.registerCoderOrThrow;
22+
import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimeDomain;
2123
import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerFamilySpec;
2224
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getStateSpecOrThrow;
23-
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerFamilySpecOrThrow;
2425
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerSpecOrThrow;
2526
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
2627

@@ -37,14 +38,17 @@
3738
import org.apache.beam.runners.core.construction.PTransformReplacements;
3839
import org.apache.beam.runners.core.construction.PTransformTranslation;
3940
import org.apache.beam.runners.core.construction.ParDoTranslation;
41+
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLikeTimerFamilySpecs;
4042
import org.apache.beam.runners.core.construction.SdkComponents;
4143
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
44+
import org.apache.beam.runners.core.construction.Timer;
4245
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
4346
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
4447
import org.apache.beam.sdk.coders.Coder;
4548
import org.apache.beam.sdk.coders.KvCoder;
4649
import org.apache.beam.sdk.runners.AppliedPTransform;
4750
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
51+
import org.apache.beam.sdk.state.TimeDomain;
4852
import org.apache.beam.sdk.transforms.DoFn;
4953
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
5054
import org.apache.beam.sdk.transforms.PTransform;
@@ -54,6 +58,7 @@
5458
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
5559
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
5660
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
61+
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
5762
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
5863
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
5964
import org.apache.beam.sdk.values.PCollection;
@@ -245,37 +250,59 @@ public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents compon
245250
}
246251

247252
@Override
248-
public Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(
253+
public ParDoLikeTimerFamilySpecs translateTimerFamilySpecs(
249254
SdkComponents newComponents) {
250255
Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs = new HashMap<>();
251-
for (Map.Entry<String, DoFnSignature.TimerFamilyDeclaration> timerFamily :
252-
signature.timerFamilyDeclarations().entrySet()) {
256+
257+
for (Map.Entry<String, TimerDeclaration> timer :
258+
signature.timerDeclarations().entrySet()) {
253259
RunnerApi.TimerFamilySpec spec =
254260
translateTimerFamilySpec(
255-
getTimerFamilySpecOrThrow(timerFamily.getValue(), doFn),
261+
getTimerSpecOrThrow(timer.getValue(), doFn),
256262
newComponents,
257263
keyCoder,
258264
windowCoder);
259-
timerFamilySpecs.put(timerFamily.getKey(), spec);
265+
timerFamilySpecs.put(timer.getKey(), spec);
260266
}
261-
for (Map.Entry<String, DoFnSignature.TimerDeclaration> timer :
262-
signature.timerDeclarations().entrySet()) {
267+
268+
for (Map.Entry<String, DoFnSignature.TimerFamilyDeclaration> timerFamily :
269+
signature.timerFamilyDeclarations().entrySet()) {
263270
RunnerApi.TimerFamilySpec spec =
264271
translateTimerFamilySpec(
265-
getTimerSpecOrThrow(timer.getValue(), doFn),
272+
DoFnSignatures.getTimerFamilySpecOrThrow(timerFamily.getValue(), doFn),
266273
newComponents,
267274
keyCoder,
268275
windowCoder);
269-
timerFamilySpecs.put(timer.getKey(), spec);
276+
timerFamilySpecs.put(timerFamily.getKey(), spec);
270277
}
271-
return timerFamilySpecs;
278+
279+
String onWindowExpirationTimerFamilySpec = null;
280+
if (signature.onWindowExpiration() != null) {
281+
RunnerApi.TimerFamilySpec spec =
282+
RunnerApi.TimerFamilySpec.newBuilder()
283+
.setTimeDomain(translateTimeDomain(TimeDomain.EVENT_TIME))
284+
.setTimerFamilyCoderId(
285+
registerCoderOrThrow(components, Timer.Coder.of(keyCoder, windowCoder)))
286+
.build();
287+
for (int i = 0; i < Integer.MAX_VALUE; ++i) {
288+
onWindowExpirationTimerFamilySpec = "onWindowExpiration" + i;
289+
if (!timerFamilySpecs.containsKey(onWindowExpirationTimerFamilySpec)) {
290+
break;
291+
}
292+
}
293+
timerFamilySpecs.put(onWindowExpirationTimerFamilySpec, spec);
294+
}
295+
296+
return ParDoLikeTimerFamilySpecs.create(
297+
timerFamilySpecs, onWindowExpirationTimerFamilySpec);
272298
}
273299

274300
@Override
275301
public boolean isStateful() {
276302
return !signature.stateDeclarations().isEmpty()
277303
|| !signature.timerDeclarations().isEmpty()
278-
|| !signature.timerFamilyDeclarations().isEmpty();
304+
|| !signature.timerFamilyDeclarations().isEmpty()
305+
|| signature.onWindowExpiration() != null;
279306
}
280307

281308
@Override

0 commit comments

Comments
 (0)