2424import org .apache .beam .sdk .coders .KvCoder ;
2525import org .apache .beam .sdk .coders .SerializableCoder ;
2626import org .apache .beam .sdk .transforms .Combine .CombineFn ;
27+ import org .apache .beam .sdk .transforms .display .DisplayData ;
2728import org .apache .beam .sdk .values .KV ;
2829import org .apache .beam .sdk .values .PCollection ;
2930
3031import com .google .common .hash .Hashing ;
3132import com .google .common .hash .HashingOutputStream ;
3233import com .google .common .io .ByteStreams ;
3334
35+ import org .apache .avro .reflect .Nullable ;
36+
3437import java .io .IOException ;
3538import java .io .Serializable ;
3639import java .util .Arrays ;
@@ -167,6 +170,12 @@ static class Globally<T> extends PTransform<PCollection<T>, PCollection<Long>> {
167170 */
168171 private final long sampleSize ;
169172
173+ /**
174+ * The desired maximum estimation error or null if not specified.
175+ */
176+ @ Nullable
177+ private final Double maximumEstimationError ;
178+
170179 /**
171180 * @see ApproximateUnique#globally(int)
172181 */
@@ -178,7 +187,9 @@ public Globally(int sampleSize) {
178187 + "In general, the estimation "
179188 + "error is about 2 / sqrt(sampleSize)." );
180189 }
190+
181191 this .sampleSize = sampleSize ;
192+ this .maximumEstimationError = null ;
182193 }
183194
184195 /**
@@ -190,7 +201,9 @@ public Globally(double maximumEstimationError) {
190201 "ApproximateUnique needs an "
191202 + "estimation error between 1% (0.01) and 50% (0.5)." );
192203 }
204+
193205 this .sampleSize = sampleSizeFromEstimationError (maximumEstimationError );
206+ this .maximumEstimationError = maximumEstimationError ;
194207 }
195208
196209 @ Override
@@ -200,6 +213,11 @@ public PCollection<Long> apply(PCollection<T> input) {
200213 Combine .globally (
201214 new ApproximateUniqueCombineFn <>(sampleSize , coder )));
202215 }
216+
217+ @ Override
218+ public void populateDisplayData (DisplayData .Builder builder ) {
219+ ApproximateUnique .populateDisplayData (builder , sampleSize , maximumEstimationError );
220+ }
203221 }
204222
205223 /**
@@ -213,8 +231,18 @@ public PCollection<Long> apply(PCollection<T> input) {
213231 static class PerKey <K , V >
214232 extends PTransform <PCollection <KV <K , V >>, PCollection <KV <K , Long >>> {
215233
234+ /**
235+ * The number of entries in the statistical sample; the higher this number,
236+ * the more accurate the estimate will be.
237+ */
216238 private final long sampleSize ;
217239
240+ /**
241+ * The the desired maximum estimation error or null if not specified.
242+ */
243+ @ Nullable
244+ private final Double maximumEstimationError ;
245+
218246 /**
219247 * @see ApproximateUnique#perKey(int)
220248 */
@@ -225,7 +253,9 @@ public PerKey(int sampleSize) {
225253 + "sampleSize >= 16 for an estimation error <= 50%. In general, "
226254 + "the estimation error is about 2 / sqrt(sampleSize)." );
227255 }
256+
228257 this .sampleSize = sampleSize ;
258+ this .maximumEstimationError = null ;
229259 }
230260
231261 /**
@@ -237,7 +267,9 @@ public PerKey(double estimationError) {
237267 "ApproximateUnique.PerKey needs an "
238268 + "estimation error between 1% (0.01) and 50% (0.5)." );
239269 }
270+
240271 this .sampleSize = sampleSizeFromEstimationError (estimationError );
272+ this .maximumEstimationError = estimationError ;
241273 }
242274
243275 @ Override
@@ -254,6 +286,11 @@ public PCollection<KV<K, Long>> apply(PCollection<KV<K, V>> input) {
254286 Combine .perKey (new ApproximateUniqueCombineFn <>(
255287 sampleSize , coder ).<K >asKeyedFn ()));
256288 }
289+
290+ @ Override
291+ public void populateDisplayData (DisplayData .Builder builder ) {
292+ ApproximateUnique .populateDisplayData (builder , sampleSize , maximumEstimationError );
293+ }
257294 }
258295
259296
@@ -418,4 +455,11 @@ static <T> long hash(T element, Coder<T> coder) throws CoderException, IOExcepti
418455 static long sampleSizeFromEstimationError (double estimationError ) {
419456 return Math .round (Math .ceil (4.0 / Math .pow (estimationError , 2.0 )));
420457 }
458+
459+ private static void populateDisplayData (
460+ DisplayData .Builder builder , long sampleSize , Double maxEstimationError ) {
461+ builder
462+ .add ("sampleSize" , sampleSize )
463+ .addIfNotNull ("maximumEstimationError" , maxEstimationError );
464+ }
421465}
0 commit comments