3737import com .google .api .core .BetaApi ;
3838import com .google .api .core .InternalApi ;
3939import com .google .api .core .SettableApiFuture ;
40+ import com .google .api .gax .batching .FlowControlEventStats .FlowControlEvent ;
41+ import com .google .api .gax .batching .FlowController .FlowControlException ;
42+ import com .google .api .gax .batching .FlowController .LimitExceededBehavior ;
4043import com .google .api .gax .rpc .UnaryCallable ;
4144import com .google .common .annotations .VisibleForTesting ;
4245import com .google .common .base .Preconditions ;
46+ import com .google .common .base .Stopwatch ;
4347import com .google .common .util .concurrent .Futures ;
4448import java .lang .ref .Reference ;
4549import java .lang .ref .ReferenceQueue ;
5559import java .util .concurrent .atomic .AtomicInteger ;
5660import java .util .logging .Level ;
5761import java .util .logging .Logger ;
62+ import javax .annotation .Nullable ;
5863
5964/**
6065 * Queues up the elements until {@link #flush()} is called; once batching is over, returned future
@@ -87,6 +92,8 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
8792 private final Future <?> scheduledFuture ;
8893 private volatile boolean isClosed = false ;
8994 private final BatcherStats batcherStats = new BatcherStats ();
95+ private final FlowController flowController ;
96+ private final FlowControlEventStats flowControlEventStats ;
9097
9198 /**
9299 * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
@@ -102,15 +109,46 @@ public BatcherImpl(
102109 BatchingSettings batchingSettings ,
103110 ScheduledExecutorService executor ) {
104111
112+ this (batchingDescriptor , unaryCallable , prototype , batchingSettings , executor , null , null );
113+ }
114+
115+ public BatcherImpl (
116+ BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > batchingDescriptor ,
117+ UnaryCallable <RequestT , ResponseT > unaryCallable ,
118+ RequestT prototype ,
119+ BatchingSettings batchingSettings ,
120+ ScheduledExecutorService executor ,
121+ @ Nullable FlowController flowControllerToUse ,
122+ @ Nullable FlowControlEventStats flowControlEventStatsToUse ) {
123+
105124 this .batchingDescriptor =
106125 Preconditions .checkNotNull (batchingDescriptor , "batching descriptor cannot be null" );
107126 this .unaryCallable = Preconditions .checkNotNull (unaryCallable , "callable cannot be null" );
108127 this .prototype = Preconditions .checkNotNull (prototype , "request prototype cannot be null" );
109128 this .batchingSettings =
110129 Preconditions .checkNotNull (batchingSettings , "batching setting cannot be null" );
111130 Preconditions .checkNotNull (executor , "executor cannot be null" );
131+ if (flowControllerToUse == null ) {
132+ flowControllerToUse = new FlowController (batchingSettings .getFlowControlSettings ());
133+ }
134+ // If throttling is enabled, make sure flow control limits are greater or equal to batch sizes
135+ // to avoid deadlocking
136+ if (flowControllerToUse .getLimitExceededBehavior () != LimitExceededBehavior .Ignore ) {
137+ Preconditions .checkArgument (
138+ flowControllerToUse .getMinOutstandingElementCount () == null
139+ || batchingSettings .getElementCountThreshold () == null
140+ || flowControllerToUse .getMinOutstandingElementCount ()
141+ >= batchingSettings .getElementCountThreshold (),
142+ "if throttling and batching on element count are enabled, FlowController#minOutstandingElementCount must be greater or equal to elementCountThreshold" );
143+ Preconditions .checkArgument (
144+ flowControllerToUse .getMinOutstandingRequestBytes () == null
145+ || batchingSettings .getRequestByteThreshold () == null
146+ || flowControllerToUse .getMinOutstandingRequestBytes ()
147+ >= batchingSettings .getRequestByteThreshold (),
148+ "if throttling and batching on request bytes are enabled, FlowController#minOutstandingRequestBytes must be greater or equal to requestByteThreshold" );
149+ }
150+ this .flowController = flowControllerToUse ;
112151 currentOpenBatch = new Batch <>(prototype , batchingDescriptor , batchingSettings , batcherStats );
113-
114152 if (batchingSettings .getDelayThreshold () != null ) {
115153 long delay = batchingSettings .getDelayThreshold ().toMillis ();
116154 PushCurrentBatchRunnable <ElementT , ElementResultT , RequestT , ResponseT > runnable =
@@ -121,14 +159,45 @@ public BatcherImpl(
121159 scheduledFuture = Futures .immediateCancelledFuture ();
122160 }
123161 currentBatcherReference = new BatcherReference (this );
162+ if (flowControlEventStatsToUse == null ) {
163+ flowControlEventStatsToUse = new FlowControlEventStats ();
164+ }
165+ this .flowControlEventStats = flowControlEventStatsToUse ;
124166 }
125167
126168 /** {@inheritDoc} */
127169 @ Override
128170 public ApiFuture <ElementResultT > add (ElementT element ) {
129171 Preconditions .checkState (!isClosed , "Cannot add elements on a closed batcher" );
130- SettableApiFuture <ElementResultT > result = SettableApiFuture .create ();
172+ // This is not the optimal way of throttling. It does not send out partial batches, which
173+ // means that the Batcher might not use up all the resources allowed by FlowController.
174+ // The more efficient implementation should look like:
175+ // if (!flowController.tryReserve(1, bytes)) {
176+ // sendOutstanding();
177+ // reserve(1, bytes);
178+ // }
179+ // where tryReserve() will return false if there isn't enough resources, or reserve and return
180+ // true.
181+ // However, with the current FlowController implementation, adding a tryReserve() could be
182+ // confusing. FlowController will end up having 3 different reserve behaviors: blocking,
183+ // non blocking and try reserve. And we'll also need to add a tryAcquire() to the Semaphore64
184+ // class, which makes it seemed unnecessary to have blocking and non-blocking semaphore
185+ // implementations. Some refactoring may be needed for the optimized implementation. So we'll
186+ // defer it till we decide on if refactoring FlowController is necessary.
187+ try {
188+ Stopwatch stopwatch = Stopwatch .createStarted ();
189+ flowController .reserve (1 , batchingDescriptor .countBytes (element ));
190+ long elapsed = stopwatch .elapsed (TimeUnit .NANOSECONDS );
191+ if (elapsed >= TimeUnit .MILLISECONDS .toNanos (1 )) {
192+ flowControlEventStats .recordFlowControlEvent (FlowControlEvent .create (elapsed ));
193+ }
194+ } catch (FlowControlException e ) {
195+ // This exception will only be thrown if the FlowController is set to ThrowException behavior
196+ flowControlEventStats .recordFlowControlEvent (FlowControlEvent .create (e ));
197+ throw new RuntimeException (e );
198+ }
131199
200+ SettableApiFuture <ElementResultT > result = SettableApiFuture .create ();
132201 synchronized (elementLock ) {
133202 currentOpenBatch .add (element , result );
134203 }
@@ -158,7 +227,6 @@ public void sendOutstanding() {
158227 accumulatedBatch = currentOpenBatch ;
159228 currentOpenBatch = new Batch <>(prototype , batchingDescriptor , batchingSettings , batcherStats );
160229 }
161-
162230 final ApiFuture <ResponseT > batchResponse =
163231 unaryCallable .futureCall (accumulatedBatch .builder .build ());
164232
@@ -169,6 +237,7 @@ public void sendOutstanding() {
169237 @ Override
170238 public void onSuccess (ResponseT response ) {
171239 try {
240+ flowController .release (accumulatedBatch .elementCounter , accumulatedBatch .byteCounter );
172241 accumulatedBatch .onBatchSuccess (response );
173242 } finally {
174243 onBatchCompletion ();
@@ -178,6 +247,7 @@ public void onSuccess(ResponseT response) {
178247 @ Override
179248 public void onFailure (Throwable throwable ) {
180249 try {
250+ flowController .release (accumulatedBatch .elementCounter , accumulatedBatch .byteCounter );
181251 accumulatedBatch .onBatchFailure (throwable );
182252 } finally {
183253 onBatchCompletion ();
@@ -224,6 +294,16 @@ public void close() throws InterruptedException {
224294 }
225295 }
226296
297+ @ InternalApi
298+ public FlowController getFlowController () {
299+ return flowController ;
300+ }
301+
302+ @ InternalApi
303+ public FlowControlEventStats getFlowControlEventStats () {
304+ return flowControlEventStats ;
305+ }
306+
227307 /**
228308 * This class represent one logical Batch. It accumulates all the elements and their corresponding
229309 * future results for one batch.
0 commit comments