Skip to content

Commit 9459eef

Browse files
authored
Adding percentiles to distribution metric (apache#16)
1 parent 2e00168 commit 9459eef

23 files changed

Lines changed: 689 additions & 182 deletions

File tree

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ class BeamModulePlugin implements Plugin<Project> {
546546
commons_io : "commons-io:commons-io:2.6",
547547
commons_lang3 : "org.apache.commons:commons-lang3:3.9",
548548
commons_math3 : "org.apache.commons:commons-math3:3.6.1",
549+
datasketches : "org.apache.datasketches:datasketches-java:3.0.0",
549550
error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version",
550551
flogger_system_backend : "com.google.flogger:flogger-system-backend:0.6",
551552
gax : "com.google.api:gax", // google_cloud_platform_libraries_bom sets version

runners/core-java/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies {
3838
compile project(path: ":model:job-management", configuration: "shadow")
3939
compile project(":runners:core-construction-java")
4040
compile project(":sdks:java:fn-execution")
41+
compile library.java.datasketches
4142
compile library.java.vendored_guava_26_0_jre
4243
compile library.java.joda_time
4344
compile library.java.vendored_grpc_1_36_0

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
3333
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
3434
import org.apache.beam.runners.core.TimerInternals.TimerData;
35-
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
3635
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
3736
import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
3837
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@
3434
*/
3535
public class DistributionCell implements Distribution, MetricCell<DistributionData> {
3636

37-
private final DirtyState dirty = new DirtyState();
38-
private final AtomicReference<DistributionData> value =
39-
new AtomicReference<>(DistributionData.EMPTY);
37+
private final DirtyState dirty;
38+
private final AtomicReference<DistributionData> value;
4039
private final MetricName name;
4140

4241
/**
@@ -45,31 +44,36 @@ public class DistributionCell implements Distribution, MetricCell<DistributionDa
4544
* MetricsContainer}. These constructors are *only* public so runners can instantiate.
4645
*/
4746
public DistributionCell(MetricName name) {
47+
this.dirty = new DirtyState();
48+
this.value = new AtomicReference<>(DistributionData.empty());
4849
this.name = name;
4950
}
5051

52+
public DistributionCell(DistributionMetricKey metricKey) {
53+
this.dirty = new DirtyState();
54+
this.value =
55+
new AtomicReference<>(DistributionData.withPercentiles(metricKey.getPercentiles()));
56+
this.name = metricKey.getMetricName();
57+
}
58+
5159
@Override
5260
public void reset() {
5361
dirty.afterModification();
54-
value.set(DistributionData.EMPTY);
62+
value.get().reset();
5563
}
5664

5765
/** Increment the distribution by the given amount. */
5866
@Override
5967
public void update(long n) {
60-
update(DistributionData.singleton(n));
61-
}
62-
63-
@Override
64-
public void update(long sum, long count, long min, long max) {
65-
update(DistributionData.create(sum, count, min, max));
68+
value.get().update(n);
69+
dirty.afterModification();
6670
}
6771

68-
void update(DistributionData data) {
72+
void update(DistributionData other) {
6973
DistributionData original;
7074
do {
7175
original = value.get();
72-
} while (!value.compareAndSet(original, original.combine(data)));
76+
} while (!value.compareAndSet(original, original.combine(other)));
7377
dirty.afterModification();
7478
}
7579

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java

Lines changed: 202 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,231 @@
1717
*/
1818
package org.apache.beam.runners.core.metrics;
1919

20-
import com.google.auto.value.AutoValue;
20+
import java.io.IOException;
21+
import java.io.ObjectInputStream;
22+
import java.io.ObjectOutputStream;
2123
import java.io.Serializable;
24+
import java.util.Map;
25+
import java.util.Objects;
26+
import java.util.Optional;
27+
import java.util.Set;
2228
import org.apache.beam.sdk.metrics.DistributionResult;
29+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
30+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
31+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
32+
import org.apache.datasketches.memory.Memory;
33+
import org.apache.datasketches.quantiles.DoublesSketchBuilder;
34+
import org.apache.datasketches.quantiles.DoublesUnion;
35+
import org.apache.datasketches.quantiles.DoublesUnionBuilder;
36+
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
2337

2438
/**
2539
* Data describing the the distribution. This should retain enough detail that it can be combined
2640
* with other {@link DistributionData}.
2741
*
42+
* <p>Datasketch library is used to compute percentiles. See {@linktourl
43+
* https://datasketches.apache.org/}.
44+
*
2845
* <p>This is kept distinct from {@link DistributionResult} since this may be extended to include
2946
* data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include
3047
* the approximate value of those quantiles.
3148
*/
32-
@AutoValue
33-
public abstract class DistributionData implements Serializable {
34-
35-
public abstract long sum();
36-
37-
public abstract long count();
49+
public class DistributionData implements Serializable {
50+
// k = 256 should yield an approximate error ε of less than 1%
51+
private static final int SKETCH_SUMMARY_SIZE = 256;
3852

39-
public abstract long min();
53+
private final Set<Double> percentiles;
54+
private long sum;
55+
private long count;
56+
private long min;
57+
private long max;
58+
private transient Optional<UpdateDoublesSketch> sketch;
4059

41-
public abstract long max();
60+
/** Creates an instance of DistributionData with custom percentiles. */
61+
public static DistributionData withPercentiles(Set<Double> percentiles) {
62+
return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, percentiles);
63+
}
4264

43-
public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE);
65+
/** Backward compatible static factory method. */
66+
public static DistributionData empty() {
67+
return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, ImmutableSet.of());
68+
}
4469

70+
/** Static factory method primary used for testing. */
71+
@VisibleForTesting
4572
public static DistributionData create(long sum, long count, long min, long max) {
46-
return new AutoValue_DistributionData(sum, count, min, max);
73+
return new DistributionData(sum, count, min, max, ImmutableSet.of());
74+
}
75+
76+
private DistributionData(long sum, long count, long min, long max, Set<Double> percentiles) {
77+
this.sum = sum;
78+
this.count = count;
79+
this.min = min;
80+
this.max = max;
81+
this.percentiles = percentiles;
82+
if (!percentiles.isEmpty()) {
83+
final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder();
84+
this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build());
85+
} else {
86+
this.sketch = Optional.empty();
87+
}
4788
}
4889

4990
public static DistributionData singleton(long value) {
50-
return create(value, 1, value, value);
91+
final DistributionData distributionData = empty();
92+
distributionData.update(value);
93+
return distributionData;
94+
}
95+
96+
////////////////////////////////////////////////////////////////////////////////////////////////////////
97+
// Getters
98+
99+
public long sum() {
100+
return sum;
101+
}
102+
103+
public long count() {
104+
return count;
105+
}
106+
107+
public long min() {
108+
return min;
51109
}
52110

53-
public DistributionData combine(DistributionData value) {
54-
return create(
55-
sum() + value.sum(),
56-
count() + value.count(),
57-
Math.min(value.min(), min()),
58-
Math.max(value.max(), max()));
111+
public long max() {
112+
return max;
113+
}
114+
115+
/** Gets the percentiles and the percentiles values as a map. */
116+
public Map<Double, Double> percentiles() {
117+
if (!sketch.isPresent() || sketch.get().getN() == 0) {
118+
// if the sketch is not present or is empty, do not compute the percentile
119+
return ImmutableMap.of();
120+
}
121+
122+
double[] quantiles = percentiles.stream().mapToDouble(i -> i / 100).toArray();
123+
double[] quantileResults = sketch.get().getQuantiles(quantiles);
124+
125+
final ImmutableMap.Builder<Double, Double> resultBuilder = ImmutableMap.builder();
126+
for (int k = 0; k < quantiles.length; k++) {
127+
resultBuilder.put(quantiles[k] * 100, quantileResults[k]);
128+
}
129+
return resultBuilder.build();
59130
}
60131

132+
////////////////////////////////////////////////////////////////////////////////////////////////////////
133+
134+
/**
135+
* Updates the distribution with a value. For percentiles, only add the value to the sketch.
136+
* Percentile will be computed prior to calling {@link DistributionCell#getCumulative()} or in
137+
* {@link #extractResult()}.
138+
*
139+
* @param value value to update the distribution with.
140+
*/
141+
public void update(long value) {
142+
++count;
143+
min = Math.min(min, value);
144+
max = Math.max(max, value);
145+
sum += value;
146+
sketch.ifPresent(currSketch -> currSketch.update(value));
147+
}
148+
149+
/** Merges two distributions. */
150+
public DistributionData combine(DistributionData other) {
151+
if (sketch.isPresent()
152+
&& other.sketch.isPresent()
153+
&& sketch.get().getN() > 0
154+
&& other.sketch.get().getN() > 0) {
155+
final DoublesUnion union = new DoublesUnionBuilder().build();
156+
union.update(sketch.get());
157+
union.update(other.sketch.get());
158+
sketch = Optional.of(union.getResult());
159+
} else if (other.sketch.isPresent() && other.sketch.get().getN() > 0) {
160+
sketch = other.sketch;
161+
}
162+
sum += other.sum;
163+
count += other.count;
164+
max = Math.max(max, other.max);
165+
min = Math.min(min, other.min);
166+
return this;
167+
}
168+
169+
public DistributionData reset() {
170+
this.sum = 0L;
171+
this.count = 0L;
172+
this.min = Long.MAX_VALUE;
173+
this.max = Long.MIN_VALUE;
174+
if (!this.percentiles.isEmpty()) {
175+
final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder();
176+
this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build());
177+
} else {
178+
this.sketch = Optional.empty();
179+
}
180+
return this;
181+
}
182+
183+
/** Generates DistributionResult from DistributionData. */
61184
public DistributionResult extractResult() {
62-
return DistributionResult.create(sum(), count(), min(), max());
185+
return DistributionResult.create(sum(), count(), min(), max(), percentiles());
186+
}
187+
188+
@Override
189+
public boolean equals(Object object) {
190+
if (object instanceof DistributionData) {
191+
DistributionData other = (DistributionData) object;
192+
return Objects.equals(max, other.max())
193+
&& Objects.equals(min, other.min())
194+
&& Objects.equals(count, other.count())
195+
&& Objects.equals(sum, other.sum())
196+
&& Objects.equals(percentiles(), other.percentiles());
197+
}
198+
return false;
199+
}
200+
201+
@Override
202+
public int hashCode() {
203+
return Objects.hash(min, max, sum, count, percentiles());
204+
}
205+
206+
@Override
207+
public String toString() {
208+
return "DistributionData{"
209+
+ "sum="
210+
+ sum
211+
+ ", "
212+
+ "count="
213+
+ count
214+
+ ", "
215+
+ "min="
216+
+ min
217+
+ ", "
218+
+ "max="
219+
+ max
220+
+ ", "
221+
+ "percentiles="
222+
+ percentiles()
223+
+ "}";
224+
}
225+
226+
private void writeObject(ObjectOutputStream out) throws IOException {
227+
out.defaultWriteObject();
228+
if (sketch.isPresent()) {
229+
byte[] bytes = sketch.get().toByteArray();
230+
out.writeInt(bytes.length);
231+
out.write(bytes);
232+
}
233+
}
234+
235+
@SuppressWarnings("ResultOfMethodCallIgnored")
236+
private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
237+
in.defaultReadObject();
238+
if (!this.percentiles.isEmpty()) {
239+
int len = in.readInt();
240+
byte[] bytes = new byte[len];
241+
in.read(bytes);
242+
this.sketch = Optional.of(UpdateDoublesSketch.heapify(Memory.wrap(bytes)));
243+
} else {
244+
this.sketch = Optional.empty();
245+
}
63246
}
64247
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core.metrics;
19+
20+
import com.google.auto.value.AutoValue;
21+
import java.io.Serializable;
22+
import java.util.Set;
23+
import org.apache.beam.sdk.metrics.MetricName;
24+
25+
/**
26+
* Value class to represent Metric name and percentiles together. {@link MetricsContainerImpl} uses
27+
* a map of this key and the Distribution Metric.
28+
*/
29+
@AutoValue
30+
public abstract class DistributionMetricKey implements Serializable {
31+
public abstract MetricName getMetricName();
32+
33+
public abstract Set<Double> getPercentiles();
34+
35+
public static DistributionMetricKey create(MetricName metricName, Set<Double> percentiles) {
36+
return new AutoValue_DistributionMetricKey(metricName, percentiles);
37+
}
38+
}

0 commit comments

Comments
 (0)