Skip to content

Commit 15b11ed

Browse files
swegnerdhalperi
authored andcommitted
Register DisplayData from composite IO transforms
1 parent ebc7035 commit 15b11ed

File tree

12 files changed

+392
-4
lines changed

12 files changed

+392
-4
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.beam.sdk.options.PipelineOptions;
2727
import org.apache.beam.sdk.runners.PipelineRunner;
2828
import org.apache.beam.sdk.transforms.PTransform;
29+
import org.apache.beam.sdk.transforms.display.DisplayData;
2930
import org.apache.beam.sdk.util.IOChannelUtils;
3031
import org.apache.beam.sdk.util.MimeTypes;
3132
import org.apache.beam.sdk.values.PCollection;
@@ -326,6 +327,14 @@ public PCollection<T> apply(PInput input) {
326327
return pcol;
327328
}
328329

330+
@Override
331+
public void populateDisplayData(DisplayData.Builder builder) {
332+
super.populateDisplayData(builder);
333+
builder
334+
.addIfNotNull("filePattern", filepattern)
335+
.addIfNotDefault("validation", validate, true);
336+
}
337+
329338
@Override
330339
protected Coder<T> getDefaultOutputCoder() {
331340
return AvroCoder.of(type, schema);
@@ -467,6 +476,8 @@ public static Bound<GenericRecord> withoutValidation() {
467476
* @param <T> the type of each of the elements of the input PCollection
468477
*/
469478
public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
479+
private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
480+
470481
/** The filename to write to. */
471482
@Nullable
472483
final String filenamePrefix;
@@ -485,7 +496,7 @@ public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
485496
final boolean validate;
486497

487498
Bound(Class<T> type) {
488-
this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, type, null, true);
499+
this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, type, null, true);
489500
}
490501

491502
Bound(
@@ -679,6 +690,18 @@ public PDone apply(PCollection<T> input) {
679690
filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema))));
680691
}
681692

693+
@Override
694+
public void populateDisplayData(DisplayData.Builder builder) {
695+
super.populateDisplayData(builder);
696+
builder
697+
.add("schema", type)
698+
.addIfNotNull("filePrefix", filenamePrefix)
699+
.addIfNotDefault("shardNameTemplate", shardTemplate, DEFAULT_SHARD_TEMPLATE)
700+
.addIfNotDefault("fileSuffix", filenameSuffix, "")
701+
.addIfNotDefault("numShards", numShards, 0)
702+
.addIfNotDefault("validation", validate, true);
703+
}
704+
682705
/**
683706
* Returns the current shard name template string.
684707
*/

sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.beam.sdk.transforms.ParDo;
4545
import org.apache.beam.sdk.transforms.SerializableFunction;
4646
import org.apache.beam.sdk.transforms.Sum;
47+
import org.apache.beam.sdk.transforms.display.DisplayData;
4748
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4849
import org.apache.beam.sdk.util.BigQueryServices;
4950
import org.apache.beam.sdk.util.BigQueryServices.LoadService;
@@ -80,7 +81,6 @@
8081

8182
import com.fasterxml.jackson.annotation.JsonCreator;
8283
import com.fasterxml.jackson.annotation.JsonProperty;
83-
8484
import org.slf4j.Logger;
8585
import org.slf4j.LoggerFactory;
8686

@@ -517,6 +517,20 @@ protected Coder<TableRow> getDefaultOutputCoder() {
517517
return TableRowJsonCoder.of();
518518
}
519519

520+
@Override
521+
public void populateDisplayData(DisplayData.Builder builder) {
522+
super.populateDisplayData(builder);
523+
524+
if (table != null) {
525+
builder.add("table", toTableSpec(table));
526+
}
527+
528+
builder
529+
.addIfNotNull("query", query)
530+
.addIfNotNull("flattenResults", flattenResults)
531+
.addIfNotDefault("validation", validate, true);
532+
}
533+
520534
static {
521535
DirectPipelineRunner.registerDefaultTransformEvaluator(
522536
Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() {
@@ -1048,6 +1062,24 @@ protected Coder<Void> getDefaultOutputCoder() {
10481062
return VoidCoder.of();
10491063
}
10501064

1065+
@Override
1066+
public void populateDisplayData(DisplayData.Builder builder) {
1067+
super.populateDisplayData(builder);
1068+
1069+
builder
1070+
.addIfNotNull("table", jsonTableRef)
1071+
.addIfNotNull("schema", jsonSchema);
1072+
1073+
if (tableRefFunction != null) {
1074+
builder.add("tableFn", tableRefFunction.getClass());
1075+
}
1076+
1077+
builder
1078+
.add("createDisposition", createDisposition.toString())
1079+
.add("writeDisposition", writeDisposition.toString())
1080+
.addIfNotDefault("validation", validate, true);
1081+
}
1082+
10511083
/** Returns the create disposition. */
10521084
public CreateDisposition getCreateDisposition() {
10531085
return createDisposition;

sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.beam.sdk.io.Read.Unbounded;
2525
import org.apache.beam.sdk.transforms.PTransform;
2626
import org.apache.beam.sdk.transforms.SerializableFunction;
27+
import org.apache.beam.sdk.transforms.display.DisplayData;
2728
import org.apache.beam.sdk.values.PBegin;
2829
import org.apache.beam.sdk.values.PCollection;
2930
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -114,6 +115,12 @@ private BoundedCountingInput(long numElements) {
114115
public PCollection<Long> apply(PBegin begin) {
115116
return begin.apply(Read.from(CountingSource.upTo(numElements)));
116117
}
118+
119+
@Override
120+
public void populateDisplayData(DisplayData.Builder builder) {
121+
super.populateDisplayData(builder);
122+
builder.add("upTo", numElements);
123+
}
117124
}
118125

119126
/**
@@ -221,5 +228,20 @@ public PCollection<Long> apply(PBegin begin) {
221228
read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
222229
}
223230
}
231+
232+
@Override
233+
public void populateDisplayData(DisplayData.Builder builder) {
234+
super.populateDisplayData(builder);
235+
236+
builder.add("timestampFn", timestampFn.getClass());
237+
238+
if (maxReadTime.isPresent()) {
239+
builder.add("maxReadTime", maxReadTime.get());
240+
}
241+
242+
if (maxNumRecords.isPresent()) {
243+
builder.add("maxRecords", maxNumRecords.get());
244+
}
245+
}
224246
}
225247
}

sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.beam.sdk.transforms.DoFn;
3131
import org.apache.beam.sdk.transforms.PTransform;
3232
import org.apache.beam.sdk.transforms.ParDo;
33+
import org.apache.beam.sdk.transforms.display.DisplayData;
3334
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
3435
import org.apache.beam.sdk.util.CoderUtils;
3536
import org.apache.beam.sdk.util.Transport;
@@ -688,6 +689,25 @@ public PCollection<T> apply(PInput input) {
688689
}
689690
}
690691

692+
@Override
693+
public void populateDisplayData(DisplayData.Builder builder) {
694+
super.populateDisplayData(builder);
695+
696+
builder
697+
.addIfNotNull("timestampLabel", timestampLabel)
698+
.addIfNotNull("idLabel", idLabel)
699+
.addIfNotNull("maxReadTime", maxReadTime)
700+
.addIfNotDefault("maxNumRecords", maxNumRecords, 0);
701+
702+
if (topic != null) {
703+
builder.add("topic", topic.asPath());
704+
}
705+
706+
if (subscription != null) {
707+
builder.add("subscription", subscription.asPath());
708+
}
709+
}
710+
691711
@Override
692712
protected Coder<T> getDefaultOutputCoder() {
693713
return coder;
@@ -973,6 +993,19 @@ public PDone apply(PCollection<T> input) {
973993
return PDone.in(input.getPipeline());
974994
}
975995

996+
@Override
997+
public void populateDisplayData(DisplayData.Builder builder) {
998+
super.populateDisplayData(builder);
999+
1000+
builder
1001+
.addIfNotNull("timestampLabel", timestampLabel)
1002+
.addIfNotNull("idLabel", idLabel);
1003+
1004+
if (topic != null) {
1005+
builder.add("topic", topic.asPath());
1006+
}
1007+
}
1008+
9761009
@Override
9771010
protected Coder<Void> getDefaultOutputCoder() {
9781011
return VoidCoder.of();

sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.beam.sdk.options.PipelineOptions;
2828
import org.apache.beam.sdk.runners.DirectPipelineRunner;
2929
import org.apache.beam.sdk.transforms.PTransform;
30+
import org.apache.beam.sdk.transforms.display.DisplayData;
3031
import org.apache.beam.sdk.util.IOChannelUtils;
3132
import org.apache.beam.sdk.util.MimeTypes;
3233
import org.apache.beam.sdk.values.PCollection;
@@ -338,6 +339,16 @@ public PCollection<T> apply(PInput input) {
338339
return pcol;
339340
}
340341

342+
@Override
343+
public void populateDisplayData(DisplayData.Builder builder) {
344+
super.populateDisplayData(builder);
345+
346+
builder
347+
.add("compressionType", compressionType.toString())
348+
.addIfNotDefault("validation", validate, true)
349+
.addIfNotNull("filePattern", filepattern);
350+
}
351+
341352
@Override
342353
protected Coder<T> getDefaultOutputCoder() {
343354
return coder;
@@ -467,6 +478,8 @@ public static Bound<String> withoutValidation() {
467478
* @param <T> the type of the elements of the input PCollection
468479
*/
469480
public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
481+
private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
482+
470483
/** The prefix of each file written, combined with suffix and shardTemplate. */
471484
@Nullable private final String filenamePrefix;
472485
/** The suffix of each file written, combined with prefix and shardTemplate. */
@@ -485,7 +498,7 @@ public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
485498
private final boolean validate;
486499

487500
Bound(Coder<T> coder) {
488-
this(null, null, "", coder, 0, ShardNameTemplate.INDEX_OF_MAX, true);
501+
this(null, null, "", coder, 0, DEFAULT_SHARD_TEMPLATE, true);
489502
}
490503

491504
private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder,
@@ -631,6 +644,18 @@ public PDone apply(PCollection<T> input) {
631644
filenamePrefix, filenameSuffix, shardTemplate, coder)));
632645
}
633646

647+
@Override
648+
public void populateDisplayData(DisplayData.Builder builder) {
649+
super.populateDisplayData(builder);
650+
651+
builder
652+
.addIfNotNull("filePrefix", filenamePrefix)
653+
.addIfNotDefault("fileSuffix", filenameSuffix, "")
654+
.addIfNotDefault("shardNameTemplate", shardTemplate, DEFAULT_SHARD_TEMPLATE)
655+
.addIfNotDefault("validation", validate, true)
656+
.addIfNotDefault("numShards", numShards, 0);
657+
}
658+
634659
/**
635660
* Returns the current shard name template string.
636661
*/

sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.beam.sdk.options.PipelineOptions;
3636
import org.apache.beam.sdk.runners.PipelineRunner;
3737
import org.apache.beam.sdk.transforms.PTransform;
38+
import org.apache.beam.sdk.transforms.display.DisplayData;
3839
import org.apache.beam.sdk.util.ReleaseInfo;
3940
import org.apache.beam.sdk.values.KV;
4041
import org.apache.beam.sdk.values.PBegin;
@@ -261,6 +262,21 @@ public void validate(PBegin input) {
261262
}
262263
}
263264

265+
@Override
266+
public void populateDisplayData(DisplayData.Builder builder) {
267+
super.populateDisplayData(builder);
268+
269+
builder.add("tableId", tableId);
270+
271+
if (options != null) {
272+
builder.add("bigtableOptions", options.toString());
273+
}
274+
275+
if (filter != null) {
276+
builder.add("rowFilter", filter.toString());
277+
}
278+
}
279+
264280
@Override
265281
public String toString() {
266282
return MoreObjects.toStringHelper(Read.class)
@@ -428,6 +444,17 @@ Write withBigtableService(BigtableService bigtableService) {
428444
return new Write(options, tableId, bigtableService);
429445
}
430446

447+
@Override
448+
public void populateDisplayData(DisplayData.Builder builder) {
449+
super.populateDisplayData(builder);
450+
451+
builder.add("tableId", tableId);
452+
453+
if (options != null) {
454+
builder.add("bigtableOptions", options.toString());
455+
}
456+
}
457+
431458
@Override
432459
public String toString() {
433460
return MoreObjects.toStringHelper(Write.class)

sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.io;
1919

20+
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
21+
2022
import static org.hamcrest.Matchers.containsInAnyOrder;
2123
import static org.junit.Assert.assertEquals;
2224
import static org.junit.Assert.assertFalse;
@@ -29,6 +31,7 @@
2931
import org.apache.beam.sdk.testing.PAssert;
3032
import org.apache.beam.sdk.testing.TestPipeline;
3133
import org.apache.beam.sdk.transforms.Create;
34+
import org.apache.beam.sdk.transforms.display.DisplayData;
3235
import org.apache.beam.sdk.util.IOChannelUtils;
3336
import org.apache.beam.sdk.values.PCollection;
3437

@@ -256,4 +259,34 @@ public void testAvroSinkShardedWrite() throws Exception {
256259
}
257260
// TODO: for Write only, test withSuffix,
258261
// withShardNameTemplate and withoutSharding.
262+
263+
@Test
264+
public void testReadDisplayData() {
265+
AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
266+
.withoutValidation();
267+
268+
DisplayData displayData = DisplayData.from(read);
269+
assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
270+
assertThat(displayData, hasDisplayItem("validation", false));
271+
}
272+
273+
@Test
274+
public void testWriteDisplayData() {
275+
AvroIO.Write.Bound<?> write = AvroIO.Write
276+
.to("foo")
277+
.withShardNameTemplate("-SS-of-NN-")
278+
.withSuffix("bar")
279+
.withSchema(GenericClass.class)
280+
.withNumShards(100)
281+
.withoutValidation();
282+
283+
DisplayData displayData = DisplayData.from(write);
284+
285+
assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
286+
assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
287+
assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
288+
assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
289+
assertThat(displayData, hasDisplayItem("numShards", 100));
290+
assertThat(displayData, hasDisplayItem("validation", false));
291+
}
259292
}

0 commit comments

Comments
 (0)