Skip to content

Commit c155a39

Browse files
authored
Element metadata in windowed value for offset deduplication (#35739)
1 parent 00531c6 commit c155a39

File tree

15 files changed

+770
-45
lines changed

15 files changed

+770
-45
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,16 @@ public PaneInfo pane() {
383383
return element.getPaneInfo();
384384
}
385385

386+
@Override
387+
public String currentRecordId() {
388+
return element.getCurrentRecordId();
389+
}
390+
391+
@Override
392+
public Long currentRecordOffset() {
393+
return element.getCurrentRecordOffset();
394+
}
395+
386396
@Override
387397
public PipelineOptions getPipelineOptions() {
388398
return pipelineOptions;
@@ -411,6 +421,24 @@ public void outputWindowedValue(
411421
outputReceiver.output(mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo));
412422
}
413423

424+
@Override
425+
public void outputWindowedValue(
426+
OutputT value,
427+
Instant timestamp,
428+
Collection<? extends BoundedWindow> windows,
429+
PaneInfo paneInfo,
430+
@Nullable String currentRecordId,
431+
@Nullable Long currentRecordOffset) {
432+
noteOutput();
433+
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
434+
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
435+
}
436+
outputReceiver.output(
437+
mainOutputTag,
438+
WindowedValues.of(
439+
value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
440+
}
441+
414442
@Override
415443
public <T> void output(TupleTag<T> tag, T value) {
416444
outputWithTimestamp(tag, value, element.getTimestamp());
@@ -429,11 +457,26 @@ public <T> void outputWindowedValue(
429457
Instant timestamp,
430458
Collection<? extends BoundedWindow> windows,
431459
PaneInfo paneInfo) {
460+
outputWindowedValue(tag, value, timestamp, windows, paneInfo, null, null);
461+
}
462+
463+
@Override
464+
public <T> void outputWindowedValue(
465+
TupleTag<T> tag,
466+
T value,
467+
Instant timestamp,
468+
Collection<? extends BoundedWindow> windows,
469+
PaneInfo paneInfo,
470+
@Nullable String currentRecordId,
471+
@Nullable Long currentRecordOffset) {
432472
noteOutput();
433473
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
434474
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
435475
}
436-
outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo));
476+
outputReceiver.output(
477+
tag,
478+
WindowedValues.of(
479+
value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
437480
}
438481

439482
private void noteOutput() {

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,35 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) {
334334
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
335335
outputWindowedValue(tag, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING));
336336
}
337+
338+
@Override
339+
public void output(
340+
OutputT output,
341+
Instant timestamp,
342+
BoundedWindow window,
343+
@Nullable String currentRecordId,
344+
@Nullable Long currentRecordOffset) {
345+
output(mainOutputTag, output, timestamp, window, currentRecordId, currentRecordOffset);
346+
}
347+
348+
@Override
349+
public <T> void output(
350+
TupleTag<T> tag,
351+
T output,
352+
Instant timestamp,
353+
BoundedWindow window,
354+
@Nullable String currentRecordId,
355+
@Nullable Long currentRecordOffset) {
356+
outputWindowedValue(
357+
tag,
358+
WindowedValues.of(
359+
output,
360+
timestamp,
361+
Collections.singletonList(window),
362+
PaneInfo.NO_FIRING,
363+
currentRecordId,
364+
currentRecordOffset));
365+
}
337366
}
338367

339368
private final DoFnFinishBundleArgumentProvider.Context context =
@@ -427,6 +456,24 @@ public void outputWindowedValue(
427456
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
428457
}
429458

459+
@Override
460+
public void outputWindowedValue(
461+
OutputT output,
462+
Instant timestamp,
463+
Collection<? extends BoundedWindow> windows,
464+
PaneInfo paneInfo,
465+
@Nullable String currentRecordId,
466+
@Nullable Long currentRecordOffset) {
467+
outputWindowedValue(
468+
mainOutputTag,
469+
output,
470+
timestamp,
471+
windows,
472+
paneInfo,
473+
currentRecordId,
474+
currentRecordOffset);
475+
}
476+
430477
@Override
431478
public <T> void output(TupleTag<T> tag, T output) {
432479
checkNotNull(tag, "Tag passed to output cannot be null");
@@ -451,11 +498,36 @@ public <T> void outputWindowedValue(
451498
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
452499
}
453500

501+
@Override
502+
public <T> void outputWindowedValue(
503+
TupleTag<T> tag,
504+
T output,
505+
Instant timestamp,
506+
Collection<? extends BoundedWindow> windows,
507+
PaneInfo paneInfo,
508+
@Nullable String currentRecordId,
509+
@Nullable Long currentRecordOffset) {
510+
SimpleDoFnRunner.this.outputWindowedValue(
511+
tag,
512+
WindowedValues.of(
513+
output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
514+
}
515+
454516
@Override
455517
public Instant timestamp() {
456518
return elem.getTimestamp();
457519
}
458520

521+
@Override
522+
public String currentRecordId() {
523+
return elem.getCurrentRecordId();
524+
}
525+
526+
@Override
527+
public Long currentRecordOffset() {
528+
return elem.getCurrentRecordOffset();
529+
}
530+
459531
public Collection<? extends BoundedWindow> windows() {
460532
return elem.getWindows();
461533
}
@@ -867,6 +939,24 @@ public void outputWindowedValue(
867939
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
868940
}
869941

942+
@Override
943+
public void outputWindowedValue(
944+
OutputT output,
945+
Instant timestamp,
946+
Collection<? extends BoundedWindow> windows,
947+
PaneInfo paneInfo,
948+
@Nullable String currentRecordId,
949+
@Nullable Long currentRecordOffset) {
950+
outputWindowedValue(
951+
mainOutputTag,
952+
output,
953+
timestamp,
954+
windows,
955+
paneInfo,
956+
currentRecordId,
957+
currentRecordOffset);
958+
}
959+
870960
@Override
871961
public <T> void output(TupleTag<T> tag, T output) {
872962
checkTimestamp(timestamp(), timestamp);
@@ -892,6 +982,22 @@ public <T> void outputWindowedValue(
892982
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
893983
}
894984

985+
@Override
986+
public <T> void outputWindowedValue(
987+
TupleTag<T> tag,
988+
T output,
989+
Instant timestamp,
990+
Collection<? extends BoundedWindow> windows,
991+
PaneInfo paneInfo,
992+
@Nullable String currentRecordId,
993+
@Nullable Long currentRecordOffset) {
994+
checkTimestamp(timestamp(), timestamp);
995+
SimpleDoFnRunner.this.outputWindowedValue(
996+
tag,
997+
WindowedValues.of(
998+
output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
999+
}
1000+
8951001
@Override
8961002
public BundleFinalizer bundleFinalizer() {
8971003
throw new UnsupportedOperationException(
@@ -1096,6 +1202,24 @@ public void outputWindowedValue(
10961202
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
10971203
}
10981204

1205+
@Override
1206+
public void outputWindowedValue(
1207+
OutputT output,
1208+
Instant timestamp,
1209+
Collection<? extends BoundedWindow> windows,
1210+
PaneInfo paneInfo,
1211+
@Nullable String currentRecordId,
1212+
@Nullable Long currentRecordOffset) {
1213+
outputWindowedValue(
1214+
mainOutputTag,
1215+
output,
1216+
timestamp,
1217+
windows,
1218+
paneInfo,
1219+
currentRecordId,
1220+
currentRecordOffset);
1221+
}
1222+
10991223
@Override
11001224
public <T> void output(TupleTag<T> tag, T output) {
11011225
checkTimestamp(this.timestamp, timestamp);
@@ -1121,6 +1245,22 @@ public <T> void outputWindowedValue(
11211245
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
11221246
}
11231247

1248+
@Override
1249+
public <T> void outputWindowedValue(
1250+
TupleTag<T> tag,
1251+
T output,
1252+
Instant timestamp,
1253+
Collection<? extends BoundedWindow> windows,
1254+
PaneInfo paneInfo,
1255+
@Nullable String currentRecordId,
1256+
@Nullable Long currentRecordOffset) {
1257+
checkTimestamp(this.timestamp, timestamp);
1258+
SimpleDoFnRunner.this.outputWindowedValue(
1259+
tag,
1260+
WindowedValues.of(
1261+
output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
1262+
}
1263+
11241264
@Override
11251265
public BundleFinalizer bundleFinalizer() {
11261266
throw new UnsupportedOperationException(

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,27 @@ public <T> void output(
663663
throwUnsupportedOutput();
664664
}
665665

666+
@Override
667+
public void output(
668+
OutputT output,
669+
Instant timestamp,
670+
BoundedWindow window,
671+
@Nullable String currentRecordId,
672+
@Nullable Long currentRecordOffset) {
673+
throwUnsupportedOutput();
674+
}
675+
676+
@Override
677+
public <T> void output(
678+
TupleTag<T> tag,
679+
T output,
680+
Instant timestamp,
681+
BoundedWindow window,
682+
@Nullable String currentRecordId,
683+
@Nullable Long currentRecordOffset) {
684+
throwUnsupportedOutput();
685+
}
686+
666687
@Override
667688
public PipelineOptions getPipelineOptions() {
668689
return baseContext.getPipelineOptions();

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,6 +1395,16 @@ public PaneInfo getPaneInfo() {
13951395
return PaneInfo.NO_FIRING;
13961396
}
13971397

1398+
@Override
1399+
public @Nullable String getCurrentRecordId() {
1400+
return null;
1401+
}
1402+
1403+
@Override
1404+
public @Nullable Long getCurrentRecordOffset() {
1405+
return null;
1406+
}
1407+
13981408
@Override
13991409
public Iterable<WindowedValue<T>> explodeWindows() {
14001410
return Collections.emptyList();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.auto.service.AutoService;
2424
import java.io.IOException;
2525
import java.io.InputStream;
26+
import java.nio.charset.StandardCharsets;
2627
import java.util.Collection;
2728
import java.util.HashMap;
2829
import java.util.Map;
@@ -43,6 +44,7 @@
4344
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
4445
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
4546
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
47+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
4648
import org.checkerframework.checker.nullness.qual.Nullable;
4749
import org.slf4j.Logger;
4850
import org.slf4j.LoggerFactory;
@@ -221,15 +223,26 @@ public long add(WindowedValue<T> data) throws IOException {
221223
throw new RuntimeException(
222224
"Unexpected record ID via ValueWithRecordIdCoder while offset-based deduplication enabled.");
223225
}
224-
byte[] rawId = context.getCurrentRecordId();
225-
if (rawId.length == 0) {
226+
byte[] rawId = null;
227+
228+
if (data.getCurrentRecordId() != null) {
229+
rawId = data.getCurrentRecordId().getBytes(StandardCharsets.UTF_8);
230+
} else {
231+
rawId = context.getCurrentRecordId();
232+
}
233+
if (rawId == null || rawId.length == 0) {
226234
throw new RuntimeException(
227235
"Unexpected empty record ID while offset-based deduplication enabled.");
228236
}
229237
id = ByteString.copyFrom(rawId);
230238

231-
byte[] rawOffset = context.getCurrentRecordOffset();
232-
if (rawOffset.length == 0) {
239+
byte[] rawOffset = null;
240+
if (data.getCurrentRecordOffset() != null) {
241+
rawOffset = Longs.toByteArray(data.getCurrentRecordOffset());
242+
} else {
243+
rawOffset = context.getCurrentRecordOffset();
244+
}
245+
if (rawOffset == null || rawOffset.length == 0) {
233246
throw new RuntimeException(
234247
"Unexpected empty record offset while offset-based deduplication enabled.");
235248
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ public PaneInfo getPaneInfo() {
4949
return PaneInfo.NO_FIRING;
5050
}
5151

52+
@Override
53+
public @Nullable String getCurrentRecordId() {
54+
return null;
55+
}
56+
57+
@Override
58+
public @Nullable Long getCurrentRecordOffset() {
59+
return null;
60+
}
61+
5262
@Override
5363
public Iterable<WindowedValue<T>> explodeWindows() {
5464
return Collections.emptyList();

runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ public PaneInfo getPaneInfo() {
110110
return PaneInfo.NO_FIRING;
111111
}
112112

113+
@Override
114+
public @Nullable String getCurrentRecordId() {
115+
return null;
116+
}
117+
118+
@Override
119+
public @Nullable Long getCurrentRecordOffset() {
120+
return null;
121+
}
122+
113123
@Override
114124
public Iterable<WindowedValue<T>> explodeWindows() {
115125
return Collections.emptyList();

0 commit comments

Comments
 (0)