Skip to content

Commit 4c74107

Browse files
authored
fix: abstract batch resource and add method to determine if batch should be flushed (#1790)
1 parent 596ebbd commit 4c74107

6 files changed

Lines changed: 213 additions & 31 deletions

File tree

gax-java/gax/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
<!-- Ignore any files marked as @InternalApi -->
8989
<excludes>
9090
<exclude>com/google/api/gax/rpc/RequestUrlParamsEncoder</exclude>
91+
<exclude>com/google/api/gax/batching/BatchingDescriptor</exclude>
9192
</excludes>
9293
</configuration>
9394
</plugin>
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.batching;
31+
32+
import com.google.api.core.InternalApi;
33+
34+
/**
35+
* Represent the resource used by a batch including element and byte. It can also be extended to
36+
* other things to determine if adding a new element needs to be flow controlled or if the current
37+
* batch needs to be flushed.
38+
*/
39+
@InternalApi("For google-cloud-java client use only.")
40+
public interface BatchResource {
41+
42+
/** Adds the additional resource. */
43+
BatchResource add(BatchResource resource);
44+
45+
/** Returns the element count of this resource. */
46+
long getElementCount();
47+
48+
/** Returns the byte count of this resource. */
49+
long getByteCount();
50+
51+
/**
52+
* Checks if the current {@link BatchResource} should be flushed based on the maxElementThreshold
53+
* and maxBytesThreshold.
54+
*/
55+
boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold);
56+
}

gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
9595
private final FlowController flowController;
9696
private final ApiCallContext callContext;
9797

98+
// If element threshold or bytes threshold is 0, it means that it'll always flush every element
99+
// without batching
100+
private final long elementThreshold;
101+
private final long bytesThreshold;
102+
98103
/**
99104
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
100105
* into wrappers request and response
@@ -192,7 +197,7 @@ public BatcherImpl(
192197
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
193198
}
194199
this.flowController = flowController;
195-
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
200+
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
196201
if (batchingSettings.getDelayThreshold() != null) {
197202
long delay = batchingSettings.getDelayThreshold().toMillis();
198203
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
@@ -204,6 +209,11 @@ public BatcherImpl(
204209
}
205210
currentBatcherReference = new BatcherReference(this);
206211
this.callContext = callContext;
212+
213+
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
214+
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
215+
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
216+
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
207217
}
208218

209219
/** {@inheritDoc} */
@@ -213,7 +223,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
213223
// will only be done from a single calling thread.
214224
Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher");
215225

216-
long bytesSize = batchingDescriptor.countBytes(element);
226+
BatchResource newResource = batchingDescriptor.createResource(element);
217227

218228
// This is not the optimal way of throttling. It does not send out partial batches, which
219229
// means that the Batcher might not use up all the resources allowed by FlowController.
@@ -232,7 +242,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
232242
// defer it till we decide on if refactoring FlowController is necessary.
233243
Stopwatch stopwatch = Stopwatch.createStarted();
234244
try {
235-
flowController.reserve(1, bytesSize);
245+
flowController.reserve(newResource.getElementCount(), newResource.getByteCount());
236246
} catch (FlowControlException e) {
237247
// This exception will only be thrown if the FlowController is set to ThrowException behavior
238248
throw FlowControlRuntimeException.fromFlowControlException(e);
@@ -241,12 +251,16 @@ public ApiFuture<ElementResultT> add(ElementT element) {
241251

242252
SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
243253
synchronized (elementLock) {
244-
currentOpenBatch.add(element, result, throttledTimeMs);
245-
}
254+
if (currentOpenBatch
255+
.resource
256+
.add(newResource)
257+
.shouldFlush(elementThreshold, bytesThreshold)) {
258+
sendOutstanding();
259+
}
246260

247-
if (currentOpenBatch.hasAnyThresholdReached()) {
248-
sendOutstanding();
261+
currentOpenBatch.add(element, newResource, result, throttledTimeMs);
249262
}
263+
250264
return result;
251265
}
252266

@@ -267,7 +281,7 @@ public void sendOutstanding() {
267281
return;
268282
}
269283
accumulatedBatch = currentOpenBatch;
270-
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
284+
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
271285
}
272286

273287
// This check is for old clients that instantiated the batcher without ApiCallContext
@@ -291,7 +305,9 @@ public void sendOutstanding() {
291305
@Override
292306
public void onSuccess(ResponseT response) {
293307
try {
294-
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
308+
flowController.release(
309+
accumulatedBatch.resource.getElementCount(),
310+
accumulatedBatch.resource.getByteCount());
295311
accumulatedBatch.onBatchSuccess(response);
296312
} finally {
297313
onBatchCompletion();
@@ -301,7 +317,9 @@ public void onSuccess(ResponseT response) {
301317
@Override
302318
public void onFailure(Throwable throwable) {
303319
try {
304-
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
320+
flowController.release(
321+
accumulatedBatch.resource.getElementCount(),
322+
accumulatedBatch.resource.getByteCount());
305323
accumulatedBatch.onBatchFailure(throwable);
306324
} finally {
307325
onBatchCompletion();
@@ -412,34 +430,30 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
412430
private final BatchingRequestBuilder<ElementT, RequestT> builder;
413431
private final List<BatchEntry<ElementT, ElementResultT>> entries;
414432
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
415-
private final BatcherStats batcherStats;
416-
private final long elementThreshold;
417-
private final long bytesThreshold;
418433

419-
private long elementCounter = 0;
420-
private long byteCounter = 0;
434+
private final BatcherStats batcherStats;
421435
private long totalThrottledTimeMs = 0;
436+
private BatchResource resource;
422437

423438
private Batch(
424439
RequestT prototype,
425440
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
426-
BatchingSettings batchingSettings,
427441
BatcherStats batcherStats) {
428442
this.descriptor = descriptor;
429443
this.builder = descriptor.newRequestBuilder(prototype);
430444
this.entries = new ArrayList<>();
431-
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
432-
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
433-
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
434-
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
435445
this.batcherStats = batcherStats;
446+
this.resource = descriptor.createEmptyResource();
436447
}
437448

438-
void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTimeMs) {
449+
void add(
450+
ElementT element,
451+
BatchResource newResource,
452+
SettableApiFuture<ElementResultT> result,
453+
long throttledTimeMs) {
439454
builder.add(element);
440455
entries.add(BatchEntry.create(element, result));
441-
elementCounter++;
442-
byteCounter += descriptor.countBytes(element);
456+
resource = resource.add(newResource);
443457
totalThrottledTimeMs += throttledTimeMs;
444458
}
445459

@@ -464,11 +478,7 @@ void onBatchFailure(Throwable throwable) {
464478
}
465479

466480
boolean isEmpty() {
467-
return elementCounter == 0;
468-
}
469-
470-
boolean hasAnyThresholdReached() {
471-
return elementCounter >= elementThreshold || byteCounter >= bytesThreshold;
481+
return resource.getElementCount() == 0;
472482
}
473483
}
474484

gax-java/gax/src/main/java/com/google/api/gax/batching/BatchingDescriptor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,17 @@ public interface BatchingDescriptor<ElementT, ElementResultT, RequestT, Response
9696

9797
/** Returns the size of the passed element object in bytes. */
9898
long countBytes(ElementT element);
99+
100+
/** Creates a new {@link BatchResource} with ElementT. */
101+
default BatchResource createResource(ElementT element) {
102+
return DefaultBatchResource.builder()
103+
.setElementCount(1)
104+
.setByteCount(countBytes(element))
105+
.build();
106+
}
107+
108+
/** Create an empty {@link BatchResource}. */
109+
default BatchResource createEmptyResource() {
110+
return DefaultBatchResource.builder().setElementCount(0).setByteCount(0).build();
111+
}
99112
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.batching;
31+
32+
import com.google.auto.value.AutoValue;
33+
import com.google.common.base.Preconditions;
34+
35+
/**
36+
* The default implementation of {@link BatchResource} which tracks the elementCount and byteCount.
37+
*/
38+
@AutoValue
39+
abstract class DefaultBatchResource implements BatchResource {
40+
41+
static DefaultBatchResource.Builder builder() {
42+
return new AutoValue_DefaultBatchResource.Builder();
43+
}
44+
45+
@Override
46+
public BatchResource add(BatchResource resource) {
47+
Preconditions.checkArgument(
48+
resource instanceof DefaultBatchResource,
49+
"Expect an instance of DefaultBatchResource, got " + resource.getClass());
50+
DefaultBatchResource defaultResource = (DefaultBatchResource) resource;
51+
return new AutoValue_DefaultBatchResource.Builder()
52+
.setElementCount(getElementCount() + defaultResource.getElementCount())
53+
.setByteCount(getByteCount() + defaultResource.getByteCount())
54+
.build();
55+
}
56+
57+
@Override
58+
public abstract long getElementCount();
59+
60+
@Override
61+
public abstract long getByteCount();
62+
63+
@Override
64+
public boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold) {
65+
return getElementCount() > maxElementThreshold || getByteCount() > maxBytesThreshold;
66+
}
67+
68+
@AutoValue.Builder
69+
abstract static class Builder {
70+
abstract Builder setElementCount(long elementCount);
71+
72+
abstract Builder setByteCount(long byteCount);
73+
74+
abstract DefaultBatchResource build();
75+
}
76+
}

gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public class BatcherImplTest {
9393
BatchingSettings.newBuilder()
9494
.setElementCountThreshold(1000L)
9595
.setRequestByteThreshold(1000L)
96-
.setDelayThreshold(Duration.ofSeconds(1))
96+
.setDelayThreshold(Duration.ofSeconds(1000))
9797
.build();
9898

9999
@After
@@ -376,6 +376,7 @@ public void testWhenThresholdIsDisabled() throws Exception {
376376
.build();
377377
underTest = createDefaultBatcherImpl(settings, null);
378378
Future<Integer> result = underTest.add(2);
379+
underTest.add(3);
379380
assertThat(result.isDone()).isTrue();
380381
assertThat(result.get()).isEqualTo(4);
381382
}
@@ -895,7 +896,7 @@ public void run() {
895896

896897
// Mockito recommends using verify() as the ONLY way to interact with Argument
897898
// captors - otherwise it may incur in unexpected behaviour
898-
Mockito.verify(callContext).withOption(key.capture(), value.capture());
899+
Mockito.verify(callContext, Mockito.timeout(100)).withOption(key.capture(), value.capture());
899900

900901
// Verify that throttled time is recorded in ApiCallContext
901902
assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY);
@@ -1008,12 +1009,37 @@ public ApiFuture<Object> futureCall(Object o, ApiCallContext apiCallContext) {
10081009
Assert.assertThrows(RuntimeException.class, batcher::close);
10091010
}
10101011

1012+
@Test
1013+
public void testDefaultShouldFlush() {
1014+
BatchResource resource =
1015+
DefaultBatchResource.builder().setElementCount(2).setByteCount(2).build();
1016+
1017+
assertThat(resource.shouldFlush(2, 2)).isFalse();
1018+
assertThat(resource.shouldFlush(1, 1)).isTrue();
1019+
}
1020+
1021+
@Test
1022+
public void testDefaultBatchResourceAdd() {
1023+
BatchResource resource =
1024+
DefaultBatchResource.builder().setElementCount(1).setByteCount(1).build();
1025+
1026+
BatchResource newResource =
1027+
resource.add(DefaultBatchResource.builder().setElementCount(1).setByteCount(1).build());
1028+
1029+
// Make sure add doesn't modify the old object
1030+
assertThat(resource.getElementCount()).isEqualTo(1);
1031+
assertThat(resource.getByteCount()).isEqualTo(1);
1032+
assertThat(newResource.getElementCount()).isEqualTo(2);
1033+
assertThat(newResource.getByteCount()).isEqualTo(2);
1034+
}
1035+
10111036
private void testElementTriggers(BatchingSettings settings) throws Exception {
10121037
underTest = createDefaultBatcherImpl(settings, null);
10131038
Future<Integer> result = underTest.add(4);
10141039
assertThat(result.isDone()).isFalse();
1015-
// After this element is added, the batch triggers sendOutstanding().
10161040
Future<Integer> anotherResult = underTest.add(5);
1041+
// After this element is added, the batch triggers sendOutstanding().
1042+
underTest.add(6);
10171043
// Both the elements should be resolved now.
10181044
assertThat(result.isDone()).isTrue();
10191045
assertThat(result.get()).isEqualTo(16);

0 commit comments

Comments
 (0)