Skip to content

Commit b0baa4c

Browse files
swegnerbchambers
authored andcommitted
Add DisplayData for combine transforms
If more than one combineFn have the same namespace, add a sequential suffix. This is necessary because each namespace/key pair must be unique within the transform. Add a `JavaClass` wrapper around a name/simple-name for a class. This is necessary in cases where the class may be serialized to support accessing `DisplayData` since `Class` is not serializable in some cases.
1 parent d440d94 commit b0baa4c

26 files changed

Lines changed: 878 additions & 75 deletions

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.beam.sdk.coders.ListCoder;
2626
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
2727
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
28+
import org.apache.beam.sdk.transforms.display.DisplayData;
2829
import org.apache.beam.sdk.util.WeightedValue;
2930
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
3031
import org.apache.beam.sdk.values.KV;
@@ -359,6 +360,13 @@ public Coder<QuantileState<T, ComparatorT>> getAccumulatorCoder(
359360
CoderRegistry registry, Coder<T> elementCoder) {
360361
return new QuantileStateCoder<>(compareFn, elementCoder);
361362
}
363+
364+
@Override
365+
public void populateDisplayData(DisplayData.Builder builder) {
366+
builder
367+
.add("numQuantiles", numQuantiles)
368+
.add("comparer", compareFn.getClass());
369+
}
362370
}
363371

364372
/**

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@
2424
import org.apache.beam.sdk.coders.KvCoder;
2525
import org.apache.beam.sdk.coders.SerializableCoder;
2626
import org.apache.beam.sdk.transforms.Combine.CombineFn;
27+
import org.apache.beam.sdk.transforms.display.DisplayData;
2728
import org.apache.beam.sdk.values.KV;
2829
import org.apache.beam.sdk.values.PCollection;
2930

3031
import com.google.common.hash.Hashing;
3132
import com.google.common.hash.HashingOutputStream;
3233
import com.google.common.io.ByteStreams;
3334

35+
import org.apache.avro.reflect.Nullable;
36+
3437
import java.io.IOException;
3538
import java.io.Serializable;
3639
import java.util.Arrays;
@@ -167,6 +170,12 @@ static class Globally<T> extends PTransform<PCollection<T>, PCollection<Long>> {
167170
*/
168171
private final long sampleSize;
169172

173+
/**
174+
* The desired maximum estimation error or null if not specified.
175+
*/
176+
@Nullable
177+
private final Double maximumEstimationError;
178+
170179
/**
171180
* @see ApproximateUnique#globally(int)
172181
*/
@@ -178,7 +187,9 @@ public Globally(int sampleSize) {
178187
+ "In general, the estimation "
179188
+ "error is about 2 / sqrt(sampleSize).");
180189
}
190+
181191
this.sampleSize = sampleSize;
192+
this.maximumEstimationError = null;
182193
}
183194

184195
/**
@@ -190,7 +201,9 @@ public Globally(double maximumEstimationError) {
190201
"ApproximateUnique needs an "
191202
+ "estimation error between 1% (0.01) and 50% (0.5).");
192203
}
204+
193205
this.sampleSize = sampleSizeFromEstimationError(maximumEstimationError);
206+
this.maximumEstimationError = maximumEstimationError;
194207
}
195208

196209
@Override
@@ -200,6 +213,11 @@ public PCollection<Long> apply(PCollection<T> input) {
200213
Combine.globally(
201214
new ApproximateUniqueCombineFn<>(sampleSize, coder)));
202215
}
216+
217+
@Override
218+
public void populateDisplayData(DisplayData.Builder builder) {
219+
ApproximateUnique.populateDisplayData(builder, sampleSize, maximumEstimationError);
220+
}
203221
}
204222

205223
/**
@@ -213,8 +231,18 @@ public PCollection<Long> apply(PCollection<T> input) {
213231
static class PerKey<K, V>
214232
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> {
215233

234+
/**
235+
* The number of entries in the statistical sample; the higher this number,
236+
* the more accurate the estimate will be.
237+
*/
216238
private final long sampleSize;
217239

240+
/**
241+
* The the desired maximum estimation error or null if not specified.
242+
*/
243+
@Nullable
244+
private final Double maximumEstimationError;
245+
218246
/**
219247
* @see ApproximateUnique#perKey(int)
220248
*/
@@ -225,7 +253,9 @@ public PerKey(int sampleSize) {
225253
+ "sampleSize >= 16 for an estimation error <= 50%. In general, "
226254
+ "the estimation error is about 2 / sqrt(sampleSize).");
227255
}
256+
228257
this.sampleSize = sampleSize;
258+
this.maximumEstimationError = null;
229259
}
230260

231261
/**
@@ -237,7 +267,9 @@ public PerKey(double estimationError) {
237267
"ApproximateUnique.PerKey needs an "
238268
+ "estimation error between 1% (0.01) and 50% (0.5).");
239269
}
270+
240271
this.sampleSize = sampleSizeFromEstimationError(estimationError);
272+
this.maximumEstimationError = estimationError;
241273
}
242274

243275
@Override
@@ -254,6 +286,11 @@ public PCollection<KV<K, Long>> apply(PCollection<KV<K, V>> input) {
254286
Combine.perKey(new ApproximateUniqueCombineFn<>(
255287
sampleSize, coder).<K>asKeyedFn()));
256288
}
289+
290+
@Override
291+
public void populateDisplayData(DisplayData.Builder builder) {
292+
ApproximateUnique.populateDisplayData(builder, sampleSize, maximumEstimationError);
293+
}
257294
}
258295

259296

@@ -418,4 +455,11 @@ static <T> long hash(T element, Coder<T> coder) throws CoderException, IOExcepti
418455
static long sampleSizeFromEstimationError(double estimationError) {
419456
return Math.round(Math.ceil(4.0 / Math.pow(estimationError, 2.0)));
420457
}
458+
459+
private static void populateDisplayData(
460+
DisplayData.Builder builder, long sampleSize, Double maxEstimationError) {
461+
builder
462+
.add("sampleSize", sampleSize)
463+
.addIfNotNull("maximumEstimationError", maxEstimationError);
464+
}
421465
}

0 commit comments

Comments
 (0)