Skip to content

Commit 77a77c9

Browse files
swegnerbchambers
authored andcommitted
Add display data to ParDo transforms
1 parent 9ff4269 commit 77a77c9

16 files changed

Lines changed: 341 additions & 68 deletions

File tree

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@
7272
import com.google.api.services.dataflow.model.WorkerPool;
7373
import com.google.common.collect.ImmutableList;
7474
import com.google.common.collect.ImmutableMap;
75+
import com.google.common.collect.ImmutableSet;
7576
import com.google.common.collect.Iterables;
7677

77-
import org.hamcrest.Matcher;
7878
import org.hamcrest.Matchers;
7979
import org.junit.Assert;
8080
import org.junit.Rule;
@@ -890,10 +890,12 @@ public void populateDisplayData(DisplayData.Builder builder) {
890890
}
891891
};
892892

893+
ParDo.Bound<Integer, Integer> parDo1 = ParDo.of(fn1);
894+
ParDo.Bound<Integer, Integer> parDo2 = ParDo.of(fn2);
893895
pipeline
894896
.apply(Create.of(1, 2, 3))
895-
.apply(ParDo.of(fn1))
896-
.apply(ParDo.of(fn2));
897+
.apply(parDo1)
898+
.apply(parDo2);
897899

898900
Job job =
899901
translator
@@ -910,43 +912,53 @@ public void populateDisplayData(DisplayData.Builder builder) {
910912
Map<String, Object> parDo2Properties = steps.get(2).getProperties();
911913
assertThat(parDo1Properties, hasKey("display_data"));
912914

913-
914-
@SuppressWarnings("unchecked")
915-
Collection<Map<String, Object>> fn1displayData =
916-
(Collection<Map<String, Object>>) parDo1Properties.get("display_data");
917-
@SuppressWarnings("unchecked")
918-
Collection<Map<String, Object>> fn2displayData =
919-
(Collection<Map<String, Object>>) parDo2Properties.get("display_data");
920-
921-
@SuppressWarnings("unchecked")
922-
Matcher<Iterable<? extends Map<String, Object>>> fn1expectedData =
923-
Matchers.<Map<String, Object>>containsInAnyOrder(
924-
ImmutableMap.<String, Object>builder()
925-
.put("namespace", fn1.getClass().getName())
926-
.put("key", "foo")
927-
.put("type", "STRING")
928-
.put("value", "bar")
929-
.build(),
930-
ImmutableMap.<String, Object>builder()
931-
.put("namespace", fn1.getClass().getName())
932-
.put("key", "foo2")
933-
.put("type", "JAVA_CLASS")
934-
.put("value", DataflowPipelineTranslatorTest.class.getName())
935-
.put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
936-
.put("label", "Test Class")
937-
.put("linkUrl", "http://www.google.com")
938-
.build());
939-
940-
@SuppressWarnings("unchecked")
941-
Matcher<Iterable<? extends Map<String, Object>>> fn2expectedData =
942-
Matchers.<Map<String, Object>>contains(
943-
ImmutableMap.<String, Object>builder()
944-
.put("namespace", fn2.getClass().getName())
945-
.put("key", "foo3")
946-
.put("type", "INTEGER")
947-
.put("value", 1234L)
948-
.build());
949-
assertThat(fn1displayData, fn1expectedData);
950-
assertThat(fn2displayData, fn2expectedData);
915+
Collection<Map<String, String>> fn1displayData =
916+
(Collection<Map<String, String>>) parDo1Properties.get("display_data");
917+
Collection<Map<String, String>> fn2displayData =
918+
(Collection<Map<String, String>>) parDo2Properties.get("display_data");
919+
920+
ImmutableSet<ImmutableMap<String, Object>> expectedFn1DisplayData = ImmutableSet.of(
921+
ImmutableMap.<String, Object>builder()
922+
.put("key", "foo")
923+
.put("type", "STRING")
924+
.put("value", "bar")
925+
.put("namespace", fn1.getClass().getName())
926+
.build(),
927+
ImmutableMap.<String, Object>builder()
928+
.put("key", "fn")
929+
.put("type", "JAVA_CLASS")
930+
.put("value", fn1.getClass().getName())
931+
.put("shortValue", fn1.getClass().getSimpleName())
932+
.put("namespace", parDo1.getClass().getName())
933+
.build(),
934+
ImmutableMap.<String, Object>builder()
935+
.put("key", "foo2")
936+
.put("type", "JAVA_CLASS")
937+
.put("value", DataflowPipelineTranslatorTest.class.getName())
938+
.put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
939+
.put("namespace", fn1.getClass().getName())
940+
.put("label", "Test Class")
941+
.put("linkUrl", "http://www.google.com")
942+
.build()
943+
);
944+
945+
ImmutableSet<ImmutableMap<String, Object>> expectedFn2DisplayData = ImmutableSet.of(
946+
ImmutableMap.<String, Object>builder()
947+
.put("key", "fn")
948+
.put("type", "JAVA_CLASS")
949+
.put("value", fn2.getClass().getName())
950+
.put("shortValue", fn2.getClass().getSimpleName())
951+
.put("namespace", parDo2.getClass().getName())
952+
.build(),
953+
ImmutableMap.<String, Object>builder()
954+
.put("key", "foo3")
955+
.put("type", "INTEGER")
956+
.put("value", 1234L)
957+
.put("namespace", fn2.getClass().getName())
958+
.build()
959+
);
960+
961+
assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
962+
assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
951963
}
952964
}

sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.beam.sdk.coders.CannotProvideCoderException;
2121
import org.apache.beam.sdk.coders.Coder;
2222
import org.apache.beam.sdk.transforms.PTransform;
23+
import org.apache.beam.sdk.transforms.display.DisplayData;
2324
import org.apache.beam.sdk.values.PInput;
2425
import org.apache.beam.sdk.values.POutput;
2526
import org.apache.beam.sdk.values.TypedPValue;
@@ -53,4 +54,9 @@ public <T> Coder<T> getDefaultOutputCoder(InputT input, @SuppressWarnings("unuse
5354
TypedPValue<T> output) throws CannotProvideCoderException {
5455
return delegate().getDefaultOutputCoder(input, output);
5556
}
57+
58+
@Override
59+
public void populateDisplayData(DisplayData.Builder builder) {
60+
delegate().populateDisplayData(builder);
61+
}
5662
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.beam.sdk.transforms.DoFnWithContext.FinishBundle;
2323
import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement;
2424
import org.apache.beam.sdk.transforms.DoFnWithContext.StartBundle;
25+
import org.apache.beam.sdk.transforms.display.DisplayData;
2526
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2627
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
2728
import org.apache.beam.sdk.util.UserCodeException;
@@ -653,6 +654,11 @@ protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
653654
return fn.getOutputTypeDescriptor();
654655
}
655656

657+
@Override
658+
public void populateDisplayData(DisplayData.Builder builder) {
659+
fn.populateDisplayData(builder);
660+
}
661+
656662
private void readObject(java.io.ObjectInputStream in)
657663
throws IOException, ClassNotFoundException {
658664
in.defaultReadObject();

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.beam.sdk.options.PipelineOptions;
2626
import org.apache.beam.sdk.transforms.Combine.CombineFn;
2727
import org.apache.beam.sdk.transforms.DoFn.DelegatingAggregator;
28+
import org.apache.beam.sdk.transforms.display.DisplayData;
29+
import org.apache.beam.sdk.transforms.display.HasDisplayData;
2830
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2931
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
3032
import org.apache.beam.sdk.util.WindowingInternals;
@@ -82,7 +84,7 @@
8284
* @param <OutputT> the type of the (main) output elements
8385
*/
8486
@Experimental
85-
public abstract class DoFnWithContext<InputT, OutputT> implements Serializable {
87+
public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, HasDisplayData {
8688

8789
/** Information accessible to all methods in this {@code DoFnWithContext}. */
8890
public abstract class Context {
@@ -414,4 +416,14 @@ public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(
414416
void prepareForProcessing() {
415417
aggregatorsAreFinal = true;
416418
}
419+
420+
/**
421+
* {@inheritDoc}
422+
*
423+
* <p>By default, does not register any display data. Implementors may override this method
424+
* to provide their own display metadata.
425+
*/
426+
@Override
427+
public void populateDisplayData(DisplayData.Builder builder) {
428+
}
417429
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.transforms;
1919

2020
import org.apache.beam.sdk.coders.Coder;
21+
import org.apache.beam.sdk.transforms.display.DisplayData;
2122
import org.apache.beam.sdk.values.PCollection;
2223

2324
/**
@@ -99,9 +100,15 @@ public void processElement(ProcessContext c) {
99100
c.output(c.element());
100101
}
101102
}
103+
104+
@Override
105+
public void populateDisplayData(DisplayData.Builder builder) {
106+
Filter.populateDisplayData(builder, String.format("x < %s", value));
107+
}
102108
});
103109
}
104110

111+
105112
/**
106113
* Returns a {@code PTransform} that takes an input
107114
* {@code PCollection<T>} and returns a {@code PCollection<T>} with
@@ -131,6 +138,11 @@ public void processElement(ProcessContext c) {
131138
c.output(c.element());
132139
}
133140
}
141+
142+
@Override
143+
public void populateDisplayData(DisplayData.Builder builder) {
144+
Filter.populateDisplayData(builder, String.format("x > %s", value));
145+
}
134146
});
135147
}
136148

@@ -163,6 +175,11 @@ public void processElement(ProcessContext c) {
163175
c.output(c.element());
164176
}
165177
}
178+
179+
@Override
180+
public void populateDisplayData(DisplayData.Builder builder) {
181+
Filter.populateDisplayData(builder, String.format("x ≤ %s", value));
182+
}
166183
});
167184
}
168185

@@ -195,6 +212,11 @@ public void processElement(ProcessContext c) {
195212
c.output(c.element());
196213
}
197214
}
215+
216+
@Override
217+
public void populateDisplayData(DisplayData.Builder builder) {
218+
Filter.populateDisplayData(builder, String.format("x ≥ %s", value));
219+
}
198220
});
199221
}
200222

@@ -232,4 +254,9 @@ public void processElement(ProcessContext c) {
232254
protected Coder<T> getDefaultOutputCoder(PCollection<T> input) {
233255
return input.getCoder();
234256
}
257+
258+
private static void populateDisplayData(
259+
DisplayData.Builder builder, String predicateDescription) {
260+
builder.add("predicate", predicateDescription);
261+
}
235262
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
2222
import org.apache.beam.sdk.coders.IterableCoder;
2323
import org.apache.beam.sdk.coders.KvCoder;
24+
import org.apache.beam.sdk.transforms.display.DisplayData;
2425
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
2526
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
2627
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
@@ -273,4 +274,12 @@ static <K, V> Coder<Iterable<V>> getOutputValueCoder(Coder<KV<K, V>> inputCoder)
273274
public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
274275
return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));
275276
}
277+
278+
@Override
279+
public void populateDisplayData(DisplayData.Builder builder) {
280+
if (fewKeys) {
281+
builder.add("fewKeys", true);
282+
}
283+
}
284+
276285
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.beam.sdk.options.GcsOptions;
2121
import org.apache.beam.sdk.options.PipelineOptions;
2222
import org.apache.beam.sdk.transforms.Combine.CombineFn;
23+
import org.apache.beam.sdk.transforms.display.DisplayData;
2324
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2425
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
2526
import org.apache.beam.sdk.util.WindowingInternals;
@@ -172,6 +173,14 @@ public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
172173
return input.apply(
173174
ParDo.of(new MultiThreadedIntraBundleProcessingDoFn<>(doFn, maxParallelism)));
174175
}
176+
177+
@Override
178+
public void populateDisplayData(DisplayData.Builder builder) {
179+
builder
180+
.add("maxParallelism", maxParallelism)
181+
.add("fn", doFn.getClass())
182+
.include(doFn);
183+
}
175184
}
176185

177186
/**

0 commit comments

Comments
 (0)