1919
2020import com .google .auto .value .AutoValue ;
2121import java .io .Serializable ;
22+ import java .util .Arrays ;
23+ import java .util .HashSet ;
2224import java .util .Set ;
23- import java .util .stream .Collectors ;
24- import java .util .stream .StreamSupport ;
2525import org .apache .beam .sdk .metrics .StringSetResult ;
26+ import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .annotations .VisibleForTesting ;
2627import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableSet ;
28+ import org .slf4j .Logger ;
29+ import org .slf4j .LoggerFactory ;
2730
2831/**
29- * Data describing the StringSet. The {@link StringSetData} hold an immutable copy of the set from
30- * which it was initially created. This should retain enough detail that it can be combined with
31- * other {@link StringSetData}.
32+ * Data describing the StringSet. The {@link StringSetData} hold a copy of the set from which it was
33+ * initially created. This should retain enough detail that it can be combined with other {@link
34+ * StringSetData}.
35+ *
36+ * <p>The underlying set is mutable for {@link #addAll} operation, otherwise a copy set will be
37+ * generated.
38+ *
39+ * <p>The summation of all string length for a {@code StringSetData} cannot exceed 1 MB. Further
40+ * addition of elements are dropped.
3241 */
3342@ AutoValue
3443public abstract class StringSetData implements Serializable {
44+ private static final Logger LOG = LoggerFactory .getLogger (StringSetData .class );
45+ // 1 MB
46+ @ VisibleForTesting static final long STRING_SET_SIZE_LIMIT = 1_000_000L ;
3547
3648 public abstract Set <String > stringSet ();
3749
50+ public abstract long stringSize ();
51+
3852 /** Returns a {@link StringSetData} which is made from an immutable copy of the given set. */
3953 public static StringSetData create (Set <String > set ) {
40- return new AutoValue_StringSetData (ImmutableSet .copyOf (set ));
54+ if (set .isEmpty ()) {
55+ return empty ();
56+ }
57+ HashSet <String > combined = new HashSet <>();
58+ long stringSize = addUntilCapacity (combined , 0L , set );
59+ return new AutoValue_StringSetData (combined , stringSize );
60+ }
61+
62+ /** Returns a {@link StringSetData} which is made from the given set in place. */
63+ private static StringSetData createInPlace (HashSet <String > set , long stringSize ) {
64+ return new AutoValue_StringSetData (set , stringSize );
4165 }
4266
4367 /** Return a {@link EmptyStringSetData#INSTANCE} representing an empty {@link StringSetData}. */
4468 public static StringSetData empty () {
4569 return EmptyStringSetData .INSTANCE ;
4670 }
4771
72+ /**
73+ * Add strings into this {@code StringSetData} and return the result {@code StringSetData}. Reuse
74+ * the original StringSetData's set. As a result, current StringSetData will become invalid.
75+ *
76+ * <p>>Should only be used by {@link StringSetCell#add}.
77+ */
78+ public StringSetData addAll (String ... strings ) {
79+ HashSet <String > combined ;
80+ if (this .stringSet () instanceof HashSet ) {
81+ combined = (HashSet <String >) this .stringSet ();
82+ } else {
83+ combined = new HashSet <>(this .stringSet ());
84+ }
85+ long stringSize = addUntilCapacity (combined , this .stringSize (), Arrays .asList (strings ));
86+ return StringSetData .createInPlace (combined , stringSize );
87+ }
88+
4889 /**
4990 * Combines this {@link StringSetData} with other, both original StringSetData are left intact.
5091 */
@@ -54,30 +95,54 @@ public StringSetData combine(StringSetData other) {
5495 } else if (other .stringSet ().isEmpty ()) {
5596 return this ;
5697 } else {
57- ImmutableSet .Builder <String > combined = ImmutableSet .builder ();
58- combined .addAll (this .stringSet ());
59- combined .addAll (other .stringSet ());
60- return StringSetData .create (combined .build ());
98+ HashSet <String > combined = new HashSet <>(this .stringSet ());
99+ long stringSize = addUntilCapacity (combined , this .stringSize (), other .stringSet ());
100+ return StringSetData .createInPlace (combined , stringSize );
61101 }
62102 }
63103
64104 /**
65105 * Combines this {@link StringSetData} with others, all original StringSetData are left intact.
66106 */
67107 public StringSetData combine (Iterable <StringSetData > others ) {
68- Set <String > combined =
69- StreamSupport . stream ( others . spliterator (), true )
70- . flatMap ( other -> other . stringSet (). stream ())
71- . collect ( Collectors . toSet ());
72- combined . addAll ( this . stringSet ());
73- return StringSetData .create (combined );
108+ HashSet <String > combined = new HashSet <>( this . stringSet ());
109+ long stringSize = this . stringSize ();
110+ for ( StringSetData other : others ) {
111+ stringSize = addUntilCapacity ( combined , stringSize , other . stringSet ());
112+ }
113+ return StringSetData .createInPlace (combined , stringSize );
74114 }
75115
76116 /** Returns a {@link StringSetResult} representing this {@link StringSetData}. */
77117 public StringSetResult extractResult () {
78118 return StringSetResult .create (stringSet ());
79119 }
80120
121+ /** Add strings into set until reach capacity. Return the all string size of added set. */
122+ private static long addUntilCapacity (
123+ HashSet <String > combined , long currentSize , Iterable <String > others ) {
124+ if (currentSize > STRING_SET_SIZE_LIMIT ) {
125+ // already at capacity
126+ return currentSize ;
127+ }
128+ for (String string : others ) {
129+ if (combined .add (string )) {
130+ currentSize += string .length ();
131+
132+ // check capacity both before insert and after insert one, so the warning only emit once.
133+ if (currentSize > STRING_SET_SIZE_LIMIT ) {
134+ LOG .warn (
135+ "StringSet metrics reaches capacity. Further incoming elements won't be recorded."
136+ + " Current size: {}, last element size: {}." ,
137+ currentSize ,
138+ string .length ());
139+ break ;
140+ }
141+ }
142+ }
143+ return currentSize ;
144+ }
145+
81146 /** Empty {@link StringSetData}, representing no values reported and is immutable. */
82147 public static class EmptyStringSetData extends StringSetData {
83148
@@ -91,6 +156,11 @@ public Set<String> stringSet() {
91156 return ImmutableSet .of ();
92157 }
93158
159+ @ Override
160+ public long stringSize () {
161+ return 0L ;
162+ }
163+
94164 /** Return a {@link StringSetResult#empty()} which is immutable empty set. */
95165 @ Override
96166 public StringSetResult extractResult () {
0 commit comments