Skip to content

Commit 2b24c3f

Browse files
authored
---
yaml --- r: 9143 b: refs/heads/batching-expr c: 35d59a6 h: refs/heads/master i: 9141: 5cbb6be 9139: 09e57b1 9135: 3724b53
1 parent 01fac63 commit 2b24c3f

4 files changed

Lines changed: 348 additions & 178 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ refs/tags/v0.27.0: 015e56421f67042037ed276353d3c0852483d570
8484
refs/tags/v0.28.0: 0d4c4daeda94e17ab70577334ae4a228b8774742
8585
refs/tags/v0.29.0: d702b33b70f2e81853a142b4158227c5a88c8781
8686
refs/tags/v0.30.0: 9ea3a32084835d0499934e7c070af9f120747e88
87-
refs/heads/batching-expr: 95bc1ac20addaeb25ef65f62450593ed600484b9
87+
refs/heads/batching-expr: 35d59a668289b45745e012f0ab3a0ea227bf5984
8888
"refs/heads/datastore_orderby_tostring": 24a5a107969714ebb69a5a098f47658f7b1e8689
8989
"refs/heads/mrschmidt-__name___": effe27458f7d31b1d5ae1af6a5a228ce89c33462
9090
refs/heads/mrschmidt-emptymerge: 447dbddebbcb8b88fe5dc480d16c0c7697712d11
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.batchingexperimental;
18+
19+
import com.google.api.core.InternalApi;
20+
import com.google.api.core.SettableApiFuture;
21+
import com.google.common.base.Preconditions;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
/**
26+
* Accumulates requests, used to batch many requests into one RPC call.
27+
*
28+
* <p>Experimental and only meant to be used by other packages in google-cloud-java.
29+
*
30+
* <p>Methods are not thread-safe.
31+
*
32+
* <p>Sample usage:
33+
*
34+
* <pre>{@code
35+
* synchronized(accumulator) {
36+
* accumulator.add(item, future);
37+
* while (accumulator.hasBatch()) {
38+
* // If items and futures must be retained, they must be copied.
39+
* List<ElementT> items = new ArrayList<>(accumulator.batch());
40+
* List<SettableApiFuture<ResponseT>> futures = new ArrayList<>(accumulator.futureBatch());
41+
*
42+
* // ...
43+
*
44+
* accumulator.next();
45+
* }
46+
* }
47+
*
48+
* @param <ElementT> The elements that, when batched, make up RPC calls.
49+
* @param <ResponseT> The RPC results for the individual elements.
50+
* }</pre>
51+
*/
52+
@InternalApi
53+
public class RequestAccumulator<ElementT, ResponseT> {
54+
private final ArrayList<ElementT> requests = new ArrayList<>();
55+
private final ArrayList<SettableApiFuture<ResponseT>> futures = new ArrayList<>();
56+
private long curBytes = 0;
57+
58+
// If not 0, the last element of requests is oversized and
59+
// must be sent in its own batch.
60+
private long oversizedBytes = 0;
61+
62+
private final int batchCount;
63+
private final long batchBytes;
64+
65+
/**
66+
* Creates a new accumulator, holding {@code batchCount} items and {@code batchBytes} bytes in a
67+
* batch.
68+
*
69+
* <p>{@code batchBytes} is a soft limit. Once the current batch is larger than {@code
70+
* batchBytes}, no new items will be added to the batch. {@code batchCount} is a hard limit. Once
71+
* reached, no new items will be added.
72+
*/
73+
public RequestAccumulator(int batchCount, long batchBytes) {
74+
Preconditions.checkArgument(batchCount > 0, "batchCount must be positive");
75+
Preconditions.checkArgument(batchBytes > 0, "batchBytes must be positive");
76+
this.batchCount = batchCount;
77+
this.batchBytes = batchBytes;
78+
}
79+
80+
/**
81+
* Add a new item {@code e} with size {@code bytes} into the current batch. The item is associated
82+
* with a {@code future}.
83+
*
84+
* <p>Given a call {@code add(e, f)}, {@code e} and {@code f} are in the same index in {@link
85+
* #batch()} and {@link #futureBatch()}.
86+
*
87+
* @throws IllegalArgumentException if {@code bytes} is not positive.
88+
* @throws IllegalStateException if {@link #hasBatch()} would return true immediately before the
89+
* call.
90+
*/
91+
public void add(ElementT e, long bytes, SettableApiFuture<ResponseT> future) {
92+
Preconditions.checkArgument(bytes >= 0, "size of element must not be negative");
93+
Preconditions.checkState(!hasBatch(), "invalid use of add; there's already a batch waiting");
94+
95+
if (!requests.isEmpty() && bytes >= batchBytes) {
96+
oversizedBytes = bytes;
97+
}
98+
requests.add(e);
99+
futures.add(future);
100+
curBytes += bytes;
101+
}
102+
103+
/**
104+
* Reports whether the current batch is ready to be sent. If this method returns true, it is
105+
* invalid to call {@link #add()}. See the class documentation for example usage.
106+
*/
107+
public boolean hasBatch() {
108+
return requests.size() >= batchCount || curBytes >= batchBytes;
109+
}
110+
111+
/**
112+
* Returns the previously added items. The returned list must not be modified and will become
113+
* invalid after the next call to {@link #next()}.
114+
*/
115+
public List<ElementT> batch() {
116+
if (oversizedBytes > 0) {
117+
return requests.subList(0, requests.size() - 1);
118+
}
119+
return requests;
120+
}
121+
122+
/**
123+
* Returns the previously added futures. The returned list must not be modified and will become
124+
* invalid after the next call to {@link #next()}.
125+
*/
126+
public List<SettableApiFuture<ResponseT>> futureBatch() {
127+
if (oversizedBytes > 0) {
128+
return futures.subList(0, futures.size() - 1);
129+
}
130+
return futures;
131+
}
132+
133+
/** Consumes the current batch. */
134+
public void next() {
135+
batch().clear();
136+
futureBatch().clear();
137+
curBytes = oversizedBytes;
138+
oversizedBytes = 0;
139+
}
140+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.batchingexperimental;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import org.junit.Test;
22+
23+
public class RequestAccumulatorTest {
24+
@Test
25+
public void testCount() {
26+
RequestAccumulator<String, Void> acc = new RequestAccumulator<>(2, Long.MAX_VALUE);
27+
assertThat(acc.hasBatch()).isFalse();
28+
29+
acc.add("a", 1, null);
30+
assertThat(acc.hasBatch()).isFalse();
31+
32+
acc.add("b", 1, null);
33+
assertThat(acc.hasBatch()).isTrue();
34+
assertThat(acc.batch()).containsExactly("a", "b");
35+
36+
acc.next();
37+
assertThat(acc.hasBatch()).isFalse();
38+
39+
acc.add("c", 1, null);
40+
assertThat(acc.hasBatch()).isFalse();
41+
42+
acc.add("d", 1, null);
43+
assertThat(acc.hasBatch()).isTrue();
44+
assertThat(acc.batch()).containsExactly("c", "d");
45+
46+
acc.next();
47+
assertThat(acc.hasBatch()).isFalse();
48+
}
49+
50+
@Test
51+
public void testSize() {
52+
RequestAccumulator<String, Void> acc = new RequestAccumulator<>(Integer.MAX_VALUE, 3);
53+
assertThat(acc.hasBatch()).isFalse();
54+
55+
acc.add("a", 1, null);
56+
assertThat(acc.hasBatch()).isFalse();
57+
58+
acc.add("bc", 2, null);
59+
assertThat(acc.hasBatch()).isTrue();
60+
assertThat(acc.batch()).containsExactly("a", "bc");
61+
62+
acc.next();
63+
assertThat(acc.hasBatch()).isFalse();
64+
65+
acc.add("de", 2, null);
66+
assertThat(acc.hasBatch()).isFalse();
67+
68+
acc.add("fg", 2, null);
69+
assertThat(acc.hasBatch()).isTrue();
70+
assertThat(acc.batch()).containsExactly("de", "fg");
71+
72+
acc.next();
73+
assertThat(acc.hasBatch()).isFalse();
74+
}
75+
76+
@Test
77+
public void testOversizeAlone() {
78+
RequestAccumulator<String, Void> acc = new RequestAccumulator<>(Integer.MAX_VALUE, 2);
79+
acc.add("abc", 3, null);
80+
assertThat(acc.hasBatch()).isTrue();
81+
assertThat(acc.batch()).containsExactly("abc");
82+
}
83+
84+
@Test
85+
public void testOversize() {
86+
RequestAccumulator<String, Void> acc = new RequestAccumulator<>(Integer.MAX_VALUE, 2);
87+
acc.add("a", 1, null);
88+
assertThat(acc.hasBatch()).isFalse();
89+
90+
acc.add("xyz", 3, null);
91+
assertThat(acc.hasBatch()).isTrue();
92+
assertThat(acc.batch()).containsExactly("a");
93+
94+
acc.next();
95+
assertThat(acc.hasBatch()).isTrue();
96+
assertThat(acc.batch()).containsExactly("xyz");
97+
98+
acc.next();
99+
assertThat(acc.hasBatch()).isFalse();
100+
}
101+
102+
@Test(expected = IllegalStateException.class)
103+
public void testOverflow() {
104+
RequestAccumulator<String, Void> acc = new RequestAccumulator<>(1, Long.MAX_VALUE);
105+
acc.add("a", 1, null);
106+
assertThat(acc.hasBatch()).isTrue();
107+
acc.add("b", 1, null);
108+
}
109+
}

0 commit comments

Comments
 (0)