Skip to content

Commit 2c16bdd

Browse files
swegnerlukecwik
authored andcommitted
Publish DisplayData for PipelineOptions.
1 parent 692f3a1 commit 2c16bdd

10 files changed

Lines changed: 979 additions & 98 deletions

File tree

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,9 @@ public void testSettingOfSdkPipelineOptions() throws IOException {
204204
settings.put("numberOfWorkerHarnessThreads", 0);
205205
settings.put("experiments", null);
206206

207-
assertEquals(ImmutableMap.of("options", settings),
208-
job.getEnvironment().getSdkPipelineOptions());
207+
Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
208+
assertThat(sdkPipelineOptions, hasKey("options"));
209+
assertEquals(settings, sdkPipelineOptions.get("options"));
209210
}
210211

211212
@Test
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.options;
19+
20+
import com.google.common.base.MoreObjects;
21+
import com.google.common.base.Objects;
22+
23+
import java.lang.reflect.Method;
24+
25+
/**
26+
* For internal use. Specification for an option defined in a {@link PipelineOptions} interface.
27+
*/
28+
class PipelineOptionSpec {
29+
private final Class<? extends PipelineOptions> clazz;
30+
private final String name;
31+
private final Method getter;
32+
33+
static PipelineOptionSpec of(Class<? extends PipelineOptions> clazz, String name, Method getter) {
34+
return new PipelineOptionSpec(clazz, name, getter);
35+
}
36+
37+
private PipelineOptionSpec(Class<? extends PipelineOptions> clazz, String name, Method getter) {
38+
this.clazz = clazz;
39+
this.name = name;
40+
this.getter = getter;
41+
}
42+
43+
/**
44+
* The {@link PipelineOptions} interface which defines this {@link PipelineOptionSpec}.
45+
*/
46+
Class<? extends PipelineOptions> getDefiningInterface() {
47+
return clazz;
48+
}
49+
50+
/**
51+
* Name of the property.
52+
*/
53+
String getName() {
54+
return name;
55+
}
56+
57+
/**
58+
* The getter method for this property.
59+
*/
60+
Method getGetterMethod() {
61+
return getter;
62+
}
63+
64+
@Override
65+
public String toString() {
66+
return MoreObjects.toStringHelper(this)
67+
.add("definingInterface", getDefiningInterface())
68+
.add("name", getName())
69+
.add("getterMethod", getGetterMethod())
70+
.toString();
71+
}
72+
73+
@Override
74+
public int hashCode() {
75+
return Objects.hashCode(getDefiningInterface(), getName(), getGetterMethod());
76+
}
77+
78+
@Override
79+
public boolean equals(Object obj) {
80+
if (!(obj instanceof PipelineOptionSpec)) {
81+
return false;
82+
}
83+
84+
PipelineOptionSpec that = (PipelineOptionSpec) obj;
85+
return Objects.equal(this.getDefiningInterface(), that.getDefiningInterface())
86+
&& Objects.equal(this.getName(), that.getName())
87+
&& Objects.equal(this.getGetterMethod(), that.getGetterMethod());
88+
}
89+
}

sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.beam.sdk.runners.PipelineRunner;
2626
import org.apache.beam.sdk.transforms.DoFn;
2727
import org.apache.beam.sdk.transforms.DoFn.Context;
28-
28+
import org.apache.beam.sdk.transforms.display.HasDisplayData;
2929
import com.google.auto.service.AutoService;
3030

3131
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -194,7 +194,7 @@
194194
@JsonSerialize(using = Serializer.class)
195195
@JsonDeserialize(using = Deserializer.class)
196196
@ThreadSafe
197-
public interface PipelineOptions {
197+
public interface PipelineOptions extends HasDisplayData {
198198
/**
199199
* Transforms this object into an object of type {@code <T>} saving each property
200200
* that has been manipulated. {@code <T>} must extend {@link PipelineOptions}.

sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java

Lines changed: 61 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@
2222
import org.apache.beam.sdk.options.Validation.Required;
2323
import org.apache.beam.sdk.runners.PipelineRunner;
2424
import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
25+
import org.apache.beam.sdk.transforms.display.DisplayData;
2526
import org.apache.beam.sdk.util.StringUtils;
2627
import org.apache.beam.sdk.util.common.ReflectHelpers;
27-
28+
import com.google.common.annotations.VisibleForTesting;
2829
import com.google.common.base.Function;
2930
import com.google.common.base.Joiner;
3031
import com.google.common.base.Optional;
3132
import com.google.common.base.Preconditions;
3233
import com.google.common.base.Predicate;
3334
import com.google.common.base.Strings;
3435
import com.google.common.base.Throwables;
35-
import com.google.common.collect.ArrayListMultimap;
3636
import com.google.common.collect.Collections2;
3737
import com.google.common.collect.FluentIterable;
3838
import com.google.common.collect.ImmutableListMultimap;
@@ -43,8 +43,11 @@
4343
import com.google.common.collect.ListMultimap;
4444
import com.google.common.collect.Lists;
4545
import com.google.common.collect.Maps;
46+
import com.google.common.collect.Ordering;
47+
import com.google.common.collect.RowSortedTable;
4648
import com.google.common.collect.Sets;
4749
import com.google.common.collect.SortedSetMultimap;
50+
import com.google.common.collect.TreeBasedTable;
4851
import com.google.common.collect.TreeMultimap;
4952

5053
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -77,6 +80,7 @@
7780
import java.util.Set;
7881
import java.util.SortedMap;
7982
import java.util.SortedSet;
83+
import java.util.TreeMap;
8084
import java.util.TreeSet;
8185

8286
import javax.annotation.Nullable;
@@ -444,6 +448,7 @@ Class<T> getProxyClass() {
444448
@SuppressWarnings("rawtypes")
445449
private static final Class<?>[] EMPTY_CLASS_ARRAY = new Class[0];
446450
private static final ObjectMapper MAPPER = new ObjectMapper();
451+
private static final ClassLoader CLASS_LOADER;
447452
private static final Map<String, Class<? extends PipelineRunner<?>>> SUPPORTED_PIPELINE_RUNNERS;
448453

449454
/** Classes that are used as the boundary in the stack trace to find the callers class name. */
@@ -510,33 +515,22 @@ static ClassLoader findClassLoader() {
510515
throw new ExceptionInInitializerError(e);
511516
}
512517

513-
ClassLoader classLoader = findClassLoader();
518+
CLASS_LOADER = findClassLoader();
514519

515520
// Store the list of all available pipeline runners.
516521
ImmutableMap.Builder<String, Class<? extends PipelineRunner<?>>> builder =
517522
ImmutableMap.builder();
518523
Set<PipelineRunnerRegistrar> pipelineRunnerRegistrars =
519524
Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
520525
pipelineRunnerRegistrars.addAll(
521-
Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, classLoader)));
526+
Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, CLASS_LOADER)));
522527
for (PipelineRunnerRegistrar registrar : pipelineRunnerRegistrars) {
523528
for (Class<? extends PipelineRunner<?>> klass : registrar.getPipelineRunners()) {
524529
builder.put(klass.getSimpleName(), klass);
525530
}
526531
}
527532
SUPPORTED_PIPELINE_RUNNERS = builder.build();
528-
529-
// Load and register the list of all classes that extend PipelineOptions.
530-
register(PipelineOptions.class);
531-
Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
532-
Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
533-
pipelineOptionsRegistrars.addAll(
534-
Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, classLoader)));
535-
for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
536-
for (Class<? extends PipelineOptions> klass : registrar.getPipelineOptions()) {
537-
register(klass);
538-
}
539-
}
533+
initializeRegistry();
540534
}
541535

542536
/**
@@ -565,6 +559,33 @@ public static synchronized void register(Class<? extends PipelineOptions> iface)
565559
REGISTERED_OPTIONS.add(iface);
566560
}
567561

562+
/**
563+
* Resets the set of interfaces registered with this factory to the default state.
564+
*
565+
* @see PipelineOptionsFactory#register(Class)
566+
*/
567+
@VisibleForTesting
568+
static synchronized void resetRegistry() {
569+
REGISTERED_OPTIONS.clear();
570+
initializeRegistry();
571+
}
572+
573+
/**
574+
* Load and register the list of all classes that extend PipelineOptions.
575+
*/
576+
private static void initializeRegistry() {
577+
register(PipelineOptions.class);
578+
Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
579+
Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
580+
pipelineOptionsRegistrars.addAll(
581+
Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, CLASS_LOADER)));
582+
for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
583+
for (Class<? extends PipelineOptions> klass : registrar.getPipelineOptions()) {
584+
register(klass);
585+
}
586+
}
587+
}
588+
568589
/**
569590
* Validates that the interface conforms to the following:
570591
* <ul>
@@ -674,32 +695,20 @@ public static void printHelp(PrintStream out, Class<? extends PipelineOptions> i
674695
Preconditions.checkNotNull(iface);
675696
validateWellFormed(iface, REGISTERED_OPTIONS);
676697

677-
Iterable<Method> methods =
678-
Iterables.filter(
679-
ReflectHelpers.getClosureOfMethodsOnInterface(iface), NOT_SYNTHETIC_PREDICATE);
680-
ListMultimap<Class<?>, Method> ifaceToMethods = ArrayListMultimap.create();
681-
for (Method method : methods) {
682-
// Process only methods that are not marked as hidden.
683-
if (method.getAnnotation(Hidden.class) == null) {
684-
ifaceToMethods.put(method.getDeclaringClass(), method);
685-
}
698+
Set<PipelineOptionSpec> properties =
699+
PipelineOptionsReflector.getOptionSpecs(iface);
700+
701+
RowSortedTable<Class<?>, String, Method> ifacePropGetterTable = TreeBasedTable.create(
702+
ClassNameComparator.INSTANCE, Ordering.natural());
703+
for (PipelineOptionSpec prop : properties) {
704+
ifacePropGetterTable.put(prop.getDefiningInterface(), prop.getName(), prop.getGetterMethod());
686705
}
687-
SortedSet<Class<?>> ifaces = new TreeSet<>(ClassNameComparator.INSTANCE);
688-
// Keep interfaces that are not marked as hidden.
689-
ifaces.addAll(Collections2.filter(ifaceToMethods.keySet(), new Predicate<Class<?>>() {
690-
@Override
691-
public boolean apply(Class<?> input) {
692-
return input.getAnnotation(Hidden.class) == null;
693-
}
694-
}));
695-
for (Class<?> currentIface : ifaces) {
696-
Map<String, Method> propertyNamesToGetters =
697-
getPropertyNamesToGetters(ifaceToMethods.get(currentIface));
698706

699-
// Don't output anything if there are no defined options
700-
if (propertyNamesToGetters.isEmpty()) {
701-
continue;
702-
}
707+
for (Map.Entry<Class<?>, Map<String, Method>> ifaceToPropertyMap :
708+
ifacePropGetterTable.rowMap().entrySet()) {
709+
Class<?> currentIface = ifaceToPropertyMap.getKey();
710+
Map<String, Method> propertyNamesToGetters = ifaceToPropertyMap.getValue();
711+
703712
SortedSetMultimap<String, String> requiredGroupNameToProperties =
704713
getRequiredGroupNamesToProperties(propertyNamesToGetters);
705714

@@ -838,15 +847,21 @@ static List<PropertyDescriptor> getPropertyDescriptors(
838847
* <p>TODO: Swap back to using Introspector once the proxy class issue with AppEngine is
839848
* resolved.
840849
*/
841-
private static List<PropertyDescriptor> getPropertyDescriptors(Class<?> beanClass)
850+
private static List<PropertyDescriptor> getPropertyDescriptors(
851+
Class<? extends PipelineOptions> beanClass)
842852
throws IntrospectionException {
843853
// The sorting is important to make this method stable.
844854
SortedSet<Method> methods = Sets.newTreeSet(MethodComparator.INSTANCE);
845855
methods.addAll(
846856
Collections2.filter(Arrays.asList(beanClass.getMethods()), NOT_SYNTHETIC_PREDICATE));
847-
SortedMap<String, Method> propertyNamesToGetters = getPropertyNamesToGetters(methods);
848-
List<PropertyDescriptor> descriptors = Lists.newArrayList();
849857

858+
SortedMap<String, Method> propertyNamesToGetters = new TreeMap<>();
859+
for (Map.Entry<String, Method> entry :
860+
PipelineOptionsReflector.getPropertyNamesToGetters(methods).entries()) {
861+
propertyNamesToGetters.put(entry.getKey(), entry.getValue());
862+
}
863+
864+
List<PropertyDescriptor> descriptors = Lists.newArrayList();
850865
List<TypeMismatch> mismatches = new ArrayList<>();
851866
/*
852867
* Add all the getter/setter pairs to the list of descriptors removing the getter once
@@ -918,28 +933,6 @@ private static void throwForTypeMismatches(List<TypeMismatch> mismatches) {
918933
}
919934
}
920935

921-
/**
922-
* Returns a map of the property name to the getter method it represents.
923-
* If there are duplicate methods with the same bean name, then it is indeterminate
924-
* as to which method will be returned.
925-
*/
926-
private static SortedMap<String, Method> getPropertyNamesToGetters(Iterable<Method> methods) {
927-
SortedMap<String, Method> propertyNamesToGetters = Maps.newTreeMap();
928-
for (Method method : methods) {
929-
String methodName = method.getName();
930-
if ((!methodName.startsWith("get")
931-
&& !methodName.startsWith("is"))
932-
|| method.getParameterTypes().length != 0
933-
|| method.getReturnType() == void.class) {
934-
continue;
935-
}
936-
String propertyName = Introspector.decapitalize(
937-
methodName.startsWith("is") ? methodName.substring(2) : methodName.substring(3));
938-
propertyNamesToGetters.put(propertyName, method);
939-
}
940-
return propertyNamesToGetters;
941-
}
942-
943936
/**
944937
* Returns a map of required groups of arguments to the properties that satisfy the requirement.
945938
*/
@@ -981,21 +974,22 @@ private static SortedSetMultimap<String, String> getRequiredGroupNamesToProperti
981974
*/
982975
private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOptions> iface,
983976
Set<Class<? extends PipelineOptions>> validatedPipelineOptionsInterfaces,
984-
Class<?> klass) throws IntrospectionException {
977+
Class<? extends PipelineOptions> klass) throws IntrospectionException {
985978
Set<Method> methods = Sets.newHashSet(IGNORED_METHODS);
986-
// Ignore static methods, "equals", "hashCode", "toString" and "as" on the generated class.
987979
// Ignore synthetic methods
988980
for (Method method : klass.getMethods()) {
989981
if (Modifier.isStatic(method.getModifiers()) || method.isSynthetic()) {
990982
methods.add(method);
991983
}
992984
}
985+
// Ignore standard infrastructure methods on the generated class.
993986
try {
994987
methods.add(klass.getMethod("equals", Object.class));
995988
methods.add(klass.getMethod("hashCode"));
996989
methods.add(klass.getMethod("toString"));
997990
methods.add(klass.getMethod("as", Class.class));
998991
methods.add(klass.getMethod("cloneAs", Class.class));
992+
methods.add(klass.getMethod("populateDisplayData", DisplayData.Builder.class));
999993
} catch (NoSuchMethodException | SecurityException e) {
1000994
throw Throwables.propagate(e);
1001995
}

0 commit comments

Comments
 (0)