Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit cc662f2

Browse files
authored
Merge 149aad0 into 95a7dab
2 parents 95a7dab + 149aad0 commit cc662f2

13 files changed

Lines changed: 1488 additions & 20 deletions

gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,13 @@
3737
import com.google.api.core.BetaApi;
3838
import com.google.api.core.InternalApi;
3939
import com.google.api.core.SettableApiFuture;
40+
import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
41+
import com.google.api.gax.batching.FlowController.FlowControlException;
42+
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
4043
import com.google.api.gax.rpc.UnaryCallable;
4144
import com.google.common.annotations.VisibleForTesting;
4245
import com.google.common.base.Preconditions;
46+
import com.google.common.base.Stopwatch;
4347
import com.google.common.util.concurrent.Futures;
4448
import java.lang.ref.Reference;
4549
import java.lang.ref.ReferenceQueue;
@@ -55,6 +59,7 @@
5559
import java.util.concurrent.atomic.AtomicInteger;
5660
import java.util.logging.Level;
5761
import java.util.logging.Logger;
62+
import javax.annotation.Nullable;
5863

5964
/**
6065
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
@@ -87,6 +92,8 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
8792
private final Future<?> scheduledFuture;
8893
private volatile boolean isClosed = false;
8994
private final BatcherStats batcherStats = new BatcherStats();
95+
private final FlowController flowController;
96+
private final FlowControlEventStats flowControlEventStats;
9097

9198
/**
9299
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
@@ -102,15 +109,46 @@ public BatcherImpl(
102109
BatchingSettings batchingSettings,
103110
ScheduledExecutorService executor) {
104111

112+
this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null, null);
113+
}
114+
115+
public BatcherImpl(
116+
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
117+
UnaryCallable<RequestT, ResponseT> unaryCallable,
118+
RequestT prototype,
119+
BatchingSettings batchingSettings,
120+
ScheduledExecutorService executor,
121+
@Nullable FlowController flowControllerToUse,
122+
@Nullable FlowControlEventStats flowControlEventStatsToUse) {
123+
105124
this.batchingDescriptor =
106125
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null");
107126
this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null");
108127
this.prototype = Preconditions.checkNotNull(prototype, "request prototype cannot be null");
109128
this.batchingSettings =
110129
Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null");
111130
Preconditions.checkNotNull(executor, "executor cannot be null");
131+
if (flowControllerToUse == null) {
132+
flowControllerToUse = new FlowController(batchingSettings.getFlowControlSettings());
133+
}
134+
// If throttling is enabled, make sure flow control limits are greater or equal to batch sizes
135+
// to avoid deadlocking
136+
if (flowControllerToUse.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) {
137+
Preconditions.checkArgument(
138+
flowControllerToUse.getMinOutstandingElementCount() == null
139+
|| batchingSettings.getElementCountThreshold() == null
140+
|| flowControllerToUse.getMinOutstandingElementCount()
141+
>= batchingSettings.getElementCountThreshold(),
142+
"if throttling and batching on element count are enabled, FlowController#minOutstandingElementCount must be greater or equal to elementCountThreshold");
143+
Preconditions.checkArgument(
144+
flowControllerToUse.getMinOutstandingRequestBytes() == null
145+
|| batchingSettings.getRequestByteThreshold() == null
146+
|| flowControllerToUse.getMinOutstandingRequestBytes()
147+
>= batchingSettings.getRequestByteThreshold(),
148+
"if throttling and batching on request bytes are enabled, FlowController#minOutstandingRequestBytes must be greater or equal to requestByteThreshold");
149+
}
150+
this.flowController = flowControllerToUse;
112151
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
113-
114152
if (batchingSettings.getDelayThreshold() != null) {
115153
long delay = batchingSettings.getDelayThreshold().toMillis();
116154
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
@@ -121,14 +159,45 @@ public BatcherImpl(
121159
scheduledFuture = Futures.immediateCancelledFuture();
122160
}
123161
currentBatcherReference = new BatcherReference(this);
162+
if (flowControlEventStatsToUse == null) {
163+
flowControlEventStatsToUse = new FlowControlEventStats();
164+
}
165+
this.flowControlEventStats = flowControlEventStatsToUse;
124166
}
125167

126168
/** {@inheritDoc} */
127169
@Override
128170
public ApiFuture<ElementResultT> add(ElementT element) {
129171
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher");
130-
SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
172+
// This is not the optimal way of throttling. It does not send out partial batches, which
173+
// means that the Batcher might not use up all the resources allowed by FlowController.
174+
// The more efficient implementation should look like:
175+
// if (!flowController.tryReserve(1, bytes)) {
176+
// sendOutstanding();
177+
// reserve(1, bytes);
178+
// }
179+
// where tryReserve() will return false if there isn't enough resources, or reserve and return
180+
// true.
181+
// However, with the current FlowController implementation, adding a tryReserve() could be
182+
// confusing. FlowController will end up having 3 different reserve behaviors: blocking,
183+
// non blocking and try reserve. And we'll also need to add a tryAcquire() to the Semaphore64
184+
// class, which makes it seemed unnecessary to have blocking and non-blocking semaphore
185+
// implementations. Some refactoring may be needed for the optimized implementation. So we'll
186+
// defer it till we decide on if refactoring FlowController is necessary.
187+
try {
188+
Stopwatch stopwatch = Stopwatch.createStarted();
189+
flowController.reserve(1, batchingDescriptor.countBytes(element));
190+
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
191+
if (elapsed >= TimeUnit.MILLISECONDS.toNanos(1)) {
192+
flowControlEventStats.recordFlowControlEvent(FlowControlEvent.create(elapsed));
193+
}
194+
} catch (FlowControlException e) {
195+
// This exception will only be thrown if the FlowController is set to ThrowException behavior
196+
flowControlEventStats.recordFlowControlEvent(FlowControlEvent.create(e));
197+
throw new RuntimeException(e);
198+
}
131199

200+
SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
132201
synchronized (elementLock) {
133202
currentOpenBatch.add(element, result);
134203
}
@@ -158,7 +227,6 @@ public void sendOutstanding() {
158227
accumulatedBatch = currentOpenBatch;
159228
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
160229
}
161-
162230
final ApiFuture<ResponseT> batchResponse =
163231
unaryCallable.futureCall(accumulatedBatch.builder.build());
164232

@@ -169,6 +237,7 @@ public void sendOutstanding() {
169237
@Override
170238
public void onSuccess(ResponseT response) {
171239
try {
240+
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
172241
accumulatedBatch.onBatchSuccess(response);
173242
} finally {
174243
onBatchCompletion();
@@ -178,6 +247,7 @@ public void onSuccess(ResponseT response) {
178247
@Override
179248
public void onFailure(Throwable throwable) {
180249
try {
250+
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
181251
accumulatedBatch.onBatchFailure(throwable);
182252
} finally {
183253
onBatchCompletion();
@@ -224,6 +294,16 @@ public void close() throws InterruptedException {
224294
}
225295
}
226296

297+
@InternalApi
298+
public FlowController getFlowController() {
299+
return flowController;
300+
}
301+
302+
@InternalApi
303+
public FlowControlEventStats getFlowControlEventStats() {
304+
return flowControlEventStats;
305+
}
306+
227307
/**
228308
* This class represent one logical Batch. It accumulates all the elements and their corresponding
229309
* future results for one batch.

gax/src/main/java/com/google/api/gax/batching/BatchingSettings.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,22 @@ public BatchingSettings build() {
174174
settings.getDelayThreshold() == null
175175
|| settings.getDelayThreshold().compareTo(Duration.ZERO) > 0,
176176
"delayThreshold must be either unset or positive");
177+
if (settings.getFlowControlSettings().getLimitExceededBehavior()
178+
!= LimitExceededBehavior.Ignore) {
179+
FlowControlSettings flowControlSettings = settings.getFlowControlSettings();
180+
Preconditions.checkArgument(
181+
flowControlSettings.getMaxOutstandingElementCount() == null
182+
|| settings.getElementCountThreshold() == null
183+
|| flowControlSettings.getMaxOutstandingElementCount()
184+
>= settings.getElementCountThreshold(),
185+
"if throttling and batching on element count are enabled, FlowController#minOutstandingElementCount must be greater or equal to elementCountThreshold");
186+
Preconditions.checkArgument(
187+
flowControlSettings.getMaxOutstandingRequestBytes() == null
188+
|| settings.getRequestByteThreshold() == null
189+
|| flowControlSettings.getMaxOutstandingRequestBytes()
190+
>= settings.getRequestByteThreshold(),
191+
"if throttling and batching on request bytes are enabled, FlowController#minOutstandingRequestBytes must be greater or equal to requestByteThreshold");
192+
}
177193
return settings;
178194
}
179195
}

gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,10 @@ public synchronized boolean acquire(long permits) {
7070
}
7171
return true;
7272
}
73+
74+
public synchronized void reducePermits(long reduction) {
75+
checkNotNegative(reduction);
76+
77+
currentPermits -= reduction;
78+
}
7379
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.batching;
31+
32+
import com.google.api.core.InternalApi;
33+
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
34+
import com.google.auto.value.AutoValue;
35+
import com.google.common.base.Preconditions;
36+
import javax.annotation.Nullable;
37+
38+
/** Settings for dynamic flow control */
39+
@AutoValue
40+
@InternalApi
41+
public abstract class DynamicFlowControlSettings {
42+
43+
@Nullable
44+
public abstract Long getInitialOutstandingElementCount();
45+
46+
@Nullable
47+
public abstract Long getInitialOutstandingRequestBytes();
48+
49+
@Nullable
50+
public abstract Long getMaxOutstandingElementCount();
51+
52+
@Nullable
53+
public abstract Long getMaxOutstandingRequestBytes();
54+
55+
@Nullable
56+
public abstract Long getMinOutstandingElementCount();
57+
58+
@Nullable
59+
public abstract Long getMinOutstandingRequestBytes();
60+
61+
public abstract LimitExceededBehavior getLimitExceededBehavior();
62+
63+
public abstract Builder toBuilder();
64+
65+
public static Builder newBuilder() {
66+
return new AutoValue_DynamicFlowControlSettings.Builder()
67+
.setLimitExceededBehavior(LimitExceededBehavior.Block);
68+
}
69+
70+
@AutoValue.Builder
71+
public abstract static class Builder {
72+
73+
public abstract Builder setInitialOutstandingElementCount(Long value);
74+
75+
public abstract Builder setInitialOutstandingRequestBytes(Long value);
76+
77+
public abstract Builder setMaxOutstandingElementCount(Long value);
78+
79+
public abstract Builder setMaxOutstandingRequestBytes(Long value);
80+
81+
public abstract Builder setMinOutstandingElementCount(Long value);
82+
83+
public abstract Builder setMinOutstandingRequestBytes(Long value);
84+
85+
public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value);
86+
87+
abstract DynamicFlowControlSettings autoBuild();
88+
89+
public DynamicFlowControlSettings build() {
90+
DynamicFlowControlSettings settings = autoBuild();
91+
Preconditions.checkArgument(
92+
(settings.getInitialOutstandingElementCount() != null
93+
&& settings.getMinOutstandingElementCount() != null
94+
&& settings.getMaxOutstandingElementCount() != null)
95+
|| (settings.getInitialOutstandingElementCount() == null
96+
&& settings.getMinOutstandingElementCount() == null
97+
&& settings.getMaxOutstandingElementCount() == null),
98+
"Throttling on element count is disabled by default, to enable this setting, all the initial, min and max thresholds must be set");
99+
Preconditions.checkArgument(
100+
(settings.getInitialOutstandingRequestBytes() != null
101+
&& settings.getMinOutstandingRequestBytes() != null
102+
&& settings.getMaxOutstandingRequestBytes() != null)
103+
|| (settings.getInitialOutstandingRequestBytes() == null
104+
&& settings.getMinOutstandingRequestBytes() == null
105+
&& settings.getMaxOutstandingRequestBytes() == null),
106+
"Throttling on number of bytes is disabled by default, to enable this setting, all the initial, min and max thresholds must be set");
107+
if (settings.getInitialOutstandingElementCount() != null) {
108+
Preconditions.checkArgument(
109+
settings.getMinOutstandingElementCount() > 0
110+
&& settings.getInitialOutstandingElementCount()
111+
<= settings.getMaxOutstandingElementCount()
112+
&& settings.getInitialOutstandingElementCount()
113+
>= settings.getMinOutstandingElementCount(),
114+
"If throttling on element count is set, the thresholds must be greater than 0, and min <= initial <= max");
115+
}
116+
if (settings.getInitialOutstandingRequestBytes() != null) {
117+
Preconditions.checkArgument(
118+
settings.getMinOutstandingRequestBytes() > 0
119+
&& settings.getInitialOutstandingRequestBytes()
120+
<= settings.getMaxOutstandingRequestBytes()
121+
&& settings.getInitialOutstandingRequestBytes()
122+
>= settings.getMinOutstandingRequestBytes(),
123+
"If throttling on number of bytes is set, the thresholds must be greater than 0, and min <= initial <= max");
124+
}
125+
return settings;
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)