Skip to content

Commit f61c8e1

Browse files
authored
Enforce a size limit on StringSetData (#32650) (#32707)
* Enforce a size limit on StringSetData * Make StringSetData set mutable. This avoids copy and create new ImutableSet every time * adjust warning log
1 parent 96a96da commit f61c8e1

File tree

9 files changed

+255
-43
lines changed

9 files changed

+255
-43
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.beam.sdk.metrics.MetricName;
2323
import org.apache.beam.sdk.metrics.MetricsContainer;
2424
import org.apache.beam.sdk.metrics.StringSet;
25-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
2625
import org.checkerframework.checker.nullness.qual.Nullable;
2726

2827
/**
@@ -101,11 +100,15 @@ public void add(String value) {
101100
if (this.setValue.get().stringSet().contains(value)) {
102101
return;
103102
}
104-
update(StringSetData.create(ImmutableSet.of(value)));
103+
add(new String[] {value});
105104
}
106105

107106
@Override
108107
public void add(String... values) {
109-
update(StringSetData.create(ImmutableSet.copyOf(values)));
108+
StringSetData original;
109+
do {
110+
original = setValue.get();
111+
} while (!setValue.compareAndSet(original, original.addAll(values)));
112+
dirty.afterModification();
110113
}
111114
}

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java

Lines changed: 86 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,73 @@
1919

2020
import com.google.auto.value.AutoValue;
2121
import java.io.Serializable;
22+
import java.util.Arrays;
23+
import java.util.HashSet;
2224
import java.util.Set;
23-
import java.util.stream.Collectors;
24-
import java.util.stream.StreamSupport;
2525
import org.apache.beam.sdk.metrics.StringSetResult;
26+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
2627
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2730

2831
/**
29-
* Data describing the StringSet. The {@link StringSetData} hold an immutable copy of the set from
30-
* which it was initially created. This should retain enough detail that it can be combined with
31-
* other {@link StringSetData}.
32+
* Data describing the StringSet. The {@link StringSetData} hold a copy of the set from which it was
33+
* initially created. This should retain enough detail that it can be combined with other {@link
34+
* StringSetData}.
35+
*
36+
* <p>The underlying set is mutable for {@link #addAll} operation, otherwise a copy set will be
37+
* generated.
38+
*
39+
* <p>The summation of all string length for a {@code StringSetData} cannot exceed 1 MB. Further
40+
* addition of elements are dropped.
3241
*/
3342
@AutoValue
3443
public abstract class StringSetData implements Serializable {
44+
private static final Logger LOG = LoggerFactory.getLogger(StringSetData.class);
45+
// 1 MB
46+
@VisibleForTesting static final long STRING_SET_SIZE_LIMIT = 1_000_000L;
3547

3648
public abstract Set<String> stringSet();
3749

50+
public abstract long stringSize();
51+
3852
/** Returns a {@link StringSetData} which is made from an immutable copy of the given set. */
3953
public static StringSetData create(Set<String> set) {
40-
return new AutoValue_StringSetData(ImmutableSet.copyOf(set));
54+
if (set.isEmpty()) {
55+
return empty();
56+
}
57+
HashSet<String> combined = new HashSet<>();
58+
long stringSize = addUntilCapacity(combined, 0L, set);
59+
return new AutoValue_StringSetData(combined, stringSize);
60+
}
61+
62+
/** Returns a {@link StringSetData} which is made from the given set in place. */
63+
private static StringSetData createInPlace(HashSet<String> set, long stringSize) {
64+
return new AutoValue_StringSetData(set, stringSize);
4165
}
4266

4367
/** Return a {@link EmptyStringSetData#INSTANCE} representing an empty {@link StringSetData}. */
4468
public static StringSetData empty() {
4569
return EmptyStringSetData.INSTANCE;
4670
}
4771

72+
/**
73+
* Add strings into this {@code StringSetData} and return the result {@code StringSetData}. Reuse
74+
* the original StringSetData's set. As a result, current StringSetData will become invalid.
75+
*
76+
* <p>>Should only be used by {@link StringSetCell#add}.
77+
*/
78+
public StringSetData addAll(String... strings) {
79+
HashSet<String> combined;
80+
if (this.stringSet() instanceof HashSet) {
81+
combined = (HashSet<String>) this.stringSet();
82+
} else {
83+
combined = new HashSet<>(this.stringSet());
84+
}
85+
long stringSize = addUntilCapacity(combined, this.stringSize(), Arrays.asList(strings));
86+
return StringSetData.createInPlace(combined, stringSize);
87+
}
88+
4889
/**
4990
* Combines this {@link StringSetData} with other, both original StringSetData are left intact.
5091
*/
@@ -54,30 +95,54 @@ public StringSetData combine(StringSetData other) {
5495
} else if (other.stringSet().isEmpty()) {
5596
return this;
5697
} else {
57-
ImmutableSet.Builder<String> combined = ImmutableSet.builder();
58-
combined.addAll(this.stringSet());
59-
combined.addAll(other.stringSet());
60-
return StringSetData.create(combined.build());
98+
HashSet<String> combined = new HashSet<>(this.stringSet());
99+
long stringSize = addUntilCapacity(combined, this.stringSize(), other.stringSet());
100+
return StringSetData.createInPlace(combined, stringSize);
61101
}
62102
}
63103

64104
/**
65105
* Combines this {@link StringSetData} with others, all original StringSetData are left intact.
66106
*/
67107
public StringSetData combine(Iterable<StringSetData> others) {
68-
Set<String> combined =
69-
StreamSupport.stream(others.spliterator(), true)
70-
.flatMap(other -> other.stringSet().stream())
71-
.collect(Collectors.toSet());
72-
combined.addAll(this.stringSet());
73-
return StringSetData.create(combined);
108+
HashSet<String> combined = new HashSet<>(this.stringSet());
109+
long stringSize = this.stringSize();
110+
for (StringSetData other : others) {
111+
stringSize = addUntilCapacity(combined, stringSize, other.stringSet());
112+
}
113+
return StringSetData.createInPlace(combined, stringSize);
74114
}
75115

76116
/** Returns a {@link StringSetResult} representing this {@link StringSetData}. */
77117
public StringSetResult extractResult() {
78118
return StringSetResult.create(stringSet());
79119
}
80120

121+
/** Add strings into set until reach capacity. Return the all string size of added set. */
122+
private static long addUntilCapacity(
123+
HashSet<String> combined, long currentSize, Iterable<String> others) {
124+
if (currentSize > STRING_SET_SIZE_LIMIT) {
125+
// already at capacity
126+
return currentSize;
127+
}
128+
for (String string : others) {
129+
if (combined.add(string)) {
130+
currentSize += string.length();
131+
132+
// check capacity both before insert and after insert one, so the warning only emit once.
133+
if (currentSize > STRING_SET_SIZE_LIMIT) {
134+
LOG.warn(
135+
"StringSet metrics reaches capacity. Further incoming elements won't be recorded."
136+
+ " Current size: {}, last element size: {}.",
137+
currentSize,
138+
string.length());
139+
break;
140+
}
141+
}
142+
}
143+
return currentSize;
144+
}
145+
81146
/** Empty {@link StringSetData}, representing no values reported and is immutable. */
82147
public static class EmptyStringSetData extends StringSetData {
83148

@@ -91,6 +156,11 @@ public Set<String> stringSet() {
91156
return ImmutableSet.of();
92157
}
93158

159+
@Override
160+
public long stringSize() {
161+
return 0L;
162+
}
163+
94164
/** Return a {@link StringSetResult#empty()} which is immutable empty set. */
95165
@Override
96166
public StringSetResult extractResult() {

runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.assertTrue;
2323

2424
import java.util.Collections;
25+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
2526
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
2627
import org.junit.Rule;
2728
import org.junit.Test;
@@ -81,6 +82,14 @@ public void testStringSetDataEmptyIsImmutable() {
8182
assertThrows(UnsupportedOperationException.class, () -> empty.stringSet().add("aa"));
8283
}
8384

85+
@Test
86+
public void testStringSetDataEmptyCanAdd() {
87+
ImmutableSet<String> contents = ImmutableSet.of("ab", "cd");
88+
StringSetData stringSetData = StringSetData.empty();
89+
stringSetData = stringSetData.addAll(contents.toArray(new String[] {}));
90+
assertEquals(stringSetData.stringSet(), contents);
91+
}
92+
8493
@Test
8594
public void testEmptyExtract() {
8695
assertTrue(StringSetData.empty().extractResult().getStringSet().isEmpty());
@@ -94,9 +103,26 @@ public void testExtract() {
94103
}
95104

96105
@Test
97-
public void testExtractReturnsImmutable() {
98-
StringSetData stringSetData = StringSetData.create(ImmutableSet.of("ab", "cd"));
99-
// check that immutable copy is returned
100-
assertThrows(UnsupportedOperationException.class, () -> stringSetData.stringSet().add("aa"));
106+
public void testStringSetAddUntilCapacity() {
107+
StringSetData combined = StringSetData.empty();
108+
@SuppressWarnings("InlineMeInliner") // Inline representation is Java11+ only
109+
String commonPrefix = Strings.repeat("*", 1000);
110+
long stringSize = 0;
111+
for (int i = 0; i < 1000; ++i) {
112+
String s = commonPrefix + i;
113+
stringSize += s.length();
114+
combined = combined.addAll(s);
115+
}
116+
assertTrue(combined.stringSize() < stringSize);
117+
assertTrue(combined.stringSize() > StringSetData.STRING_SET_SIZE_LIMIT);
118+
}
119+
120+
@Test
121+
public void testStringSetAddSizeTrackedCorrectly() {
122+
StringSetData combined = StringSetData.empty();
123+
combined = combined.addAll("a", "b", "c", "b");
124+
assertEquals(3, combined.stringSize());
125+
combined = combined.addAll("c", "d", "e");
126+
assertEquals(5, combined.stringSize());
101127
}
102128
}

sdks/python/apache_beam/metrics/cells.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ cdef class GaugeCell(MetricCell):
4545

4646

4747
cdef class StringSetCell(MetricCell):
48-
cdef readonly set data
48+
cdef readonly object data
4949

5050
cdef inline bint _update(self, value) except -1
5151

0 commit comments

Comments
 (0)