Skip to content

Commit 493bf20

Browse files
committed
Revert "Adding percentiles to distribution metric (apache#16)"
This reverts commit 9459eef.
1 parent 9459eef commit 493bf20

23 files changed

Lines changed: 182 additions & 689 deletions

File tree

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,6 @@ class BeamModulePlugin implements Plugin<Project> {
546546
commons_io : "commons-io:commons-io:2.6",
547547
commons_lang3 : "org.apache.commons:commons-lang3:3.9",
548548
commons_math3 : "org.apache.commons:commons-math3:3.6.1",
549-
datasketches : "org.apache.datasketches:datasketches-java:3.0.0",
550549
error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version",
551550
flogger_system_backend : "com.google.flogger:flogger-system-backend:0.6",
552551
gax : "com.google.api:gax", // google_cloud_platform_libraries_bom sets version

runners/core-java/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ dependencies {
3838
compile project(path: ":model:job-management", configuration: "shadow")
3939
compile project(":runners:core-construction-java")
4040
compile project(":sdks:java:fn-execution")
41-
compile library.java.datasketches
4241
compile library.java.vendored_guava_26_0_jre
4342
compile library.java.joda_time
4443
compile library.java.vendored_grpc_1_36_0

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
3333
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
3434
import org.apache.beam.runners.core.TimerInternals.TimerData;
35+
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
3536
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
3637
import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
3738
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
*/
3535
public class DistributionCell implements Distribution, MetricCell<DistributionData> {
3636

37-
private final DirtyState dirty;
38-
private final AtomicReference<DistributionData> value;
37+
private final DirtyState dirty = new DirtyState();
38+
private final AtomicReference<DistributionData> value =
39+
new AtomicReference<>(DistributionData.EMPTY);
3940
private final MetricName name;
4041

4142
/**
@@ -44,36 +45,31 @@ public class DistributionCell implements Distribution, MetricCell<DistributionDa
4445
* MetricsContainer}. These constructors are *only* public so runners can instantiate.
4546
*/
4647
public DistributionCell(MetricName name) {
47-
this.dirty = new DirtyState();
48-
this.value = new AtomicReference<>(DistributionData.empty());
4948
this.name = name;
5049
}
5150

52-
public DistributionCell(DistributionMetricKey metricKey) {
53-
this.dirty = new DirtyState();
54-
this.value =
55-
new AtomicReference<>(DistributionData.withPercentiles(metricKey.getPercentiles()));
56-
this.name = metricKey.getMetricName();
57-
}
58-
5951
@Override
6052
public void reset() {
6153
dirty.afterModification();
62-
value.get().reset();
54+
value.set(DistributionData.EMPTY);
6355
}
6456

6557
/** Increment the distribution by the given amount. */
6658
@Override
6759
public void update(long n) {
68-
value.get().update(n);
69-
dirty.afterModification();
60+
update(DistributionData.singleton(n));
61+
}
62+
63+
@Override
64+
public void update(long sum, long count, long min, long max) {
65+
update(DistributionData.create(sum, count, min, max));
7066
}
7167

72-
void update(DistributionData other) {
68+
void update(DistributionData data) {
7369
DistributionData original;
7470
do {
7571
original = value.get();
76-
} while (!value.compareAndSet(original, original.combine(other)));
72+
} while (!value.compareAndSet(original, original.combine(data)));
7773
dirty.afterModification();
7874
}
7975

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java

Lines changed: 19 additions & 202 deletions
Original file line numberDiff line numberDiff line change
@@ -17,231 +17,48 @@
1717
*/
1818
package org.apache.beam.runners.core.metrics;
1919

20-
import java.io.IOException;
21-
import java.io.ObjectInputStream;
22-
import java.io.ObjectOutputStream;
20+
import com.google.auto.value.AutoValue;
2321
import java.io.Serializable;
24-
import java.util.Map;
25-
import java.util.Objects;
26-
import java.util.Optional;
27-
import java.util.Set;
2822
import org.apache.beam.sdk.metrics.DistributionResult;
29-
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
30-
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
31-
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
32-
import org.apache.datasketches.memory.Memory;
33-
import org.apache.datasketches.quantiles.DoublesSketchBuilder;
34-
import org.apache.datasketches.quantiles.DoublesUnion;
35-
import org.apache.datasketches.quantiles.DoublesUnionBuilder;
36-
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
3723

3824
/**
3925
* Data describing the the distribution. This should retain enough detail that it can be combined
4026
* with other {@link DistributionData}.
4127
*
42-
* <p>Datasketch library is used to compute percentiles. See {@linktourl
43-
* https://datasketches.apache.org/}.
44-
*
4528
* <p>This is kept distinct from {@link DistributionResult} since this may be extended to include
4629
* data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include
4730
* the approximate value of those quantiles.
4831
*/
49-
public class DistributionData implements Serializable {
50-
// k = 256 should yield an approximate error ε of less than 1%
51-
private static final int SKETCH_SUMMARY_SIZE = 256;
52-
53-
private final Set<Double> percentiles;
54-
private long sum;
55-
private long count;
56-
private long min;
57-
private long max;
58-
private transient Optional<UpdateDoublesSketch> sketch;
59-
60-
/** Creates an instance of DistributionData with custom percentiles. */
61-
public static DistributionData withPercentiles(Set<Double> percentiles) {
62-
return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, percentiles);
63-
}
64-
65-
/** Backward compatible static factory method. */
66-
public static DistributionData empty() {
67-
return new DistributionData(0L, 0L, Long.MAX_VALUE, Long.MIN_VALUE, ImmutableSet.of());
68-
}
69-
70-
/** Static factory method primary used for testing. */
71-
@VisibleForTesting
72-
public static DistributionData create(long sum, long count, long min, long max) {
73-
return new DistributionData(sum, count, min, max, ImmutableSet.of());
74-
}
32+
@AutoValue
33+
public abstract class DistributionData implements Serializable {
7534

76-
private DistributionData(long sum, long count, long min, long max, Set<Double> percentiles) {
77-
this.sum = sum;
78-
this.count = count;
79-
this.min = min;
80-
this.max = max;
81-
this.percentiles = percentiles;
82-
if (!percentiles.isEmpty()) {
83-
final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder();
84-
this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build());
85-
} else {
86-
this.sketch = Optional.empty();
87-
}
88-
}
35+
public abstract long sum();
8936

90-
public static DistributionData singleton(long value) {
91-
final DistributionData distributionData = empty();
92-
distributionData.update(value);
93-
return distributionData;
94-
}
37+
public abstract long count();
9538

96-
////////////////////////////////////////////////////////////////////////////////////////////////////////
97-
// Getters
39+
public abstract long min();
9840

99-
public long sum() {
100-
return sum;
101-
}
41+
public abstract long max();
10242

103-
public long count() {
104-
return count;
105-
}
106-
107-
public long min() {
108-
return min;
109-
}
110-
111-
public long max() {
112-
return max;
113-
}
43+
public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE);
11444

115-
/** Gets the percentiles and the percentiles values as a map. */
116-
public Map<Double, Double> percentiles() {
117-
if (!sketch.isPresent() || sketch.get().getN() == 0) {
118-
// if the sketch is not present or is empty, do not compute the percentile
119-
return ImmutableMap.of();
120-
}
121-
122-
double[] quantiles = percentiles.stream().mapToDouble(i -> i / 100).toArray();
123-
double[] quantileResults = sketch.get().getQuantiles(quantiles);
124-
125-
final ImmutableMap.Builder<Double, Double> resultBuilder = ImmutableMap.builder();
126-
for (int k = 0; k < quantiles.length; k++) {
127-
resultBuilder.put(quantiles[k] * 100, quantileResults[k]);
128-
}
129-
return resultBuilder.build();
130-
}
131-
132-
////////////////////////////////////////////////////////////////////////////////////////////////////////
133-
134-
/**
135-
* Updates the distribution with a value. For percentiles, only add the value to the sketch.
136-
* Percentile will be computed prior to calling {@link DistributionCell#getCumulative()} or in
137-
* {@link #extractResult()}.
138-
*
139-
* @param value value to update the distribution with.
140-
*/
141-
public void update(long value) {
142-
++count;
143-
min = Math.min(min, value);
144-
max = Math.max(max, value);
145-
sum += value;
146-
sketch.ifPresent(currSketch -> currSketch.update(value));
45+
public static DistributionData create(long sum, long count, long min, long max) {
46+
return new AutoValue_DistributionData(sum, count, min, max);
14747
}
14848

149-
/** Merges two distributions. */
150-
public DistributionData combine(DistributionData other) {
151-
if (sketch.isPresent()
152-
&& other.sketch.isPresent()
153-
&& sketch.get().getN() > 0
154-
&& other.sketch.get().getN() > 0) {
155-
final DoublesUnion union = new DoublesUnionBuilder().build();
156-
union.update(sketch.get());
157-
union.update(other.sketch.get());
158-
sketch = Optional.of(union.getResult());
159-
} else if (other.sketch.isPresent() && other.sketch.get().getN() > 0) {
160-
sketch = other.sketch;
161-
}
162-
sum += other.sum;
163-
count += other.count;
164-
max = Math.max(max, other.max);
165-
min = Math.min(min, other.min);
166-
return this;
49+
public static DistributionData singleton(long value) {
50+
return create(value, 1, value, value);
16751
}
16852

169-
public DistributionData reset() {
170-
this.sum = 0L;
171-
this.count = 0L;
172-
this.min = Long.MAX_VALUE;
173-
this.max = Long.MIN_VALUE;
174-
if (!this.percentiles.isEmpty()) {
175-
final DoublesSketchBuilder doublesSketchBuilder = new DoublesSketchBuilder();
176-
this.sketch = Optional.of(doublesSketchBuilder.setK(SKETCH_SUMMARY_SIZE).build());
177-
} else {
178-
this.sketch = Optional.empty();
179-
}
180-
return this;
53+
public DistributionData combine(DistributionData value) {
54+
return create(
55+
sum() + value.sum(),
56+
count() + value.count(),
57+
Math.min(value.min(), min()),
58+
Math.max(value.max(), max()));
18159
}
18260

183-
/** Generates DistributionResult from DistributionData. */
18461
public DistributionResult extractResult() {
185-
return DistributionResult.create(sum(), count(), min(), max(), percentiles());
186-
}
187-
188-
@Override
189-
public boolean equals(Object object) {
190-
if (object instanceof DistributionData) {
191-
DistributionData other = (DistributionData) object;
192-
return Objects.equals(max, other.max())
193-
&& Objects.equals(min, other.min())
194-
&& Objects.equals(count, other.count())
195-
&& Objects.equals(sum, other.sum())
196-
&& Objects.equals(percentiles(), other.percentiles());
197-
}
198-
return false;
199-
}
200-
201-
@Override
202-
public int hashCode() {
203-
return Objects.hash(min, max, sum, count, percentiles());
204-
}
205-
206-
@Override
207-
public String toString() {
208-
return "DistributionData{"
209-
+ "sum="
210-
+ sum
211-
+ ", "
212-
+ "count="
213-
+ count
214-
+ ", "
215-
+ "min="
216-
+ min
217-
+ ", "
218-
+ "max="
219-
+ max
220-
+ ", "
221-
+ "percentiles="
222-
+ percentiles()
223-
+ "}";
224-
}
225-
226-
private void writeObject(ObjectOutputStream out) throws IOException {
227-
out.defaultWriteObject();
228-
if (sketch.isPresent()) {
229-
byte[] bytes = sketch.get().toByteArray();
230-
out.writeInt(bytes.length);
231-
out.write(bytes);
232-
}
233-
}
234-
235-
@SuppressWarnings("ResultOfMethodCallIgnored")
236-
private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
237-
in.defaultReadObject();
238-
if (!this.percentiles.isEmpty()) {
239-
int len = in.readInt();
240-
byte[] bytes = new byte[len];
241-
in.read(bytes);
242-
this.sketch = Optional.of(UpdateDoublesSketch.heapify(Memory.wrap(bytes)));
243-
} else {
244-
this.sketch = Optional.empty();
245-
}
62+
return DistributionResult.create(sum(), count(), min(), max());
24663
}
24764
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionMetricKey.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)