@@ -95,6 +95,11 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
9595 private final FlowController flowController ;
9696 private final ApiCallContext callContext ;
9797
98+ // If element threshold or bytes threshold is 0, it means that it'll always flush every element
99+ // without batching
100+ private final long elementThreshold ;
101+ private final long bytesThreshold ;
102+
98103 /**
99104 * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
100105 * into wrappers request and response
@@ -192,7 +197,7 @@ public BatcherImpl(
192197 + "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold" );
193198 }
194199 this .flowController = flowController ;
195- currentOpenBatch = new Batch <>(prototype , batchingDescriptor , batchingSettings , batcherStats );
200+ currentOpenBatch = new Batch <>(prototype , batchingDescriptor , batcherStats );
196201 if (batchingSettings .getDelayThreshold () != null ) {
197202 long delay = batchingSettings .getDelayThreshold ().toMillis ();
198203 PushCurrentBatchRunnable <ElementT , ElementResultT , RequestT , ResponseT > runnable =
@@ -204,6 +209,11 @@ public BatcherImpl(
204209 }
205210 currentBatcherReference = new BatcherReference (this );
206211 this .callContext = callContext ;
212+
213+ Long elementCountThreshold = batchingSettings .getElementCountThreshold ();
214+ this .elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold ;
215+ Long requestByteThreshold = batchingSettings .getRequestByteThreshold ();
216+ this .bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold ;
207217 }
208218
209219 /** {@inheritDoc} */
@@ -213,7 +223,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
213223 // will only be done from a single calling thread.
214224 Preconditions .checkState (closeFuture == null , "Cannot add elements on a closed batcher" );
215225
216- long bytesSize = batchingDescriptor .countBytes (element );
226+ BatchResource newResource = batchingDescriptor .createResource (element );
217227
218228 // This is not the optimal way of throttling. It does not send out partial batches, which
219229 // means that the Batcher might not use up all the resources allowed by FlowController.
@@ -232,7 +242,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
232242 // defer it till we decide on if refactoring FlowController is necessary.
233243 Stopwatch stopwatch = Stopwatch .createStarted ();
234244 try {
235- flowController .reserve (1 , bytesSize );
245+ flowController .reserve (newResource . getElementCount (), newResource . getByteCount () );
236246 } catch (FlowControlException e ) {
237247 // This exception will only be thrown if the FlowController is set to ThrowException behavior
238248 throw FlowControlRuntimeException .fromFlowControlException (e );
@@ -241,12 +251,16 @@ public ApiFuture<ElementResultT> add(ElementT element) {
241251
242252 SettableApiFuture <ElementResultT > result = SettableApiFuture .create ();
243253 synchronized (elementLock ) {
244- currentOpenBatch .add (element , result , throttledTimeMs );
245- }
254+ if (currentOpenBatch
255+ .resource
256+ .add (newResource )
257+ .shouldFlush (elementThreshold , bytesThreshold )) {
258+ sendOutstanding ();
259+ }
246260
247- if (currentOpenBatch .hasAnyThresholdReached ()) {
248- sendOutstanding ();
261+ currentOpenBatch .add (element , newResource , result , throttledTimeMs );
249262 }
263+
250264 return result ;
251265 }
252266
@@ -267,7 +281,7 @@ public void sendOutstanding() {
267281 return ;
268282 }
269283 accumulatedBatch = currentOpenBatch ;
270- currentOpenBatch = new Batch <>(prototype , batchingDescriptor , batchingSettings , batcherStats );
284+ currentOpenBatch = new Batch <>(prototype , batchingDescriptor , batcherStats );
271285 }
272286
273287 // This check is for old clients that instantiated the batcher without ApiCallContext
@@ -291,7 +305,9 @@ public void sendOutstanding() {
291305 @ Override
292306 public void onSuccess (ResponseT response ) {
293307 try {
294- flowController .release (accumulatedBatch .elementCounter , accumulatedBatch .byteCounter );
308+ flowController .release (
309+ accumulatedBatch .resource .getElementCount (),
310+ accumulatedBatch .resource .getByteCount ());
295311 accumulatedBatch .onBatchSuccess (response );
296312 } finally {
297313 onBatchCompletion ();
@@ -301,7 +317,9 @@ public void onSuccess(ResponseT response) {
301317 @ Override
302318 public void onFailure (Throwable throwable ) {
303319 try {
304- flowController .release (accumulatedBatch .elementCounter , accumulatedBatch .byteCounter );
320+ flowController .release (
321+ accumulatedBatch .resource .getElementCount (),
322+ accumulatedBatch .resource .getByteCount ());
305323 accumulatedBatch .onBatchFailure (throwable );
306324 } finally {
307325 onBatchCompletion ();
@@ -412,34 +430,30 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
412430 private final BatchingRequestBuilder <ElementT , RequestT > builder ;
413431 private final List <BatchEntry <ElementT , ElementResultT >> entries ;
414432 private final BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > descriptor ;
415- private final BatcherStats batcherStats ;
416- private final long elementThreshold ;
417- private final long bytesThreshold ;
418433
419- private long elementCounter = 0 ;
420- private long byteCounter = 0 ;
434+ private final BatcherStats batcherStats ;
421435 private long totalThrottledTimeMs = 0 ;
436+ private BatchResource resource ;
422437
423438 private Batch (
424439 RequestT prototype ,
425440 BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > descriptor ,
426- BatchingSettings batchingSettings ,
427441 BatcherStats batcherStats ) {
428442 this .descriptor = descriptor ;
429443 this .builder = descriptor .newRequestBuilder (prototype );
430444 this .entries = new ArrayList <>();
431- Long elementCountThreshold = batchingSettings .getElementCountThreshold ();
432- this .elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold ;
433- Long requestByteThreshold = batchingSettings .getRequestByteThreshold ();
434- this .bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold ;
435445 this .batcherStats = batcherStats ;
446+ this .resource = descriptor .createEmptyResource ();
436447 }
437448
438- void add (ElementT element , SettableApiFuture <ElementResultT > result , long throttledTimeMs ) {
449+ void add (
450+ ElementT element ,
451+ BatchResource newResource ,
452+ SettableApiFuture <ElementResultT > result ,
453+ long throttledTimeMs ) {
439454 builder .add (element );
440455 entries .add (BatchEntry .create (element , result ));
441- elementCounter ++;
442- byteCounter += descriptor .countBytes (element );
456+ resource = resource .add (newResource );
443457 totalThrottledTimeMs += throttledTimeMs ;
444458 }
445459
@@ -464,11 +478,7 @@ void onBatchFailure(Throwable throwable) {
464478 }
465479
466480 boolean isEmpty () {
467- return elementCounter == 0 ;
468- }
469-
470- boolean hasAnyThresholdReached () {
471- return elementCounter >= elementThreshold || byteCounter >= bytesThreshold ;
481+ return resource .getElementCount () == 0 ;
472482 }
473483 }
474484
0 commit comments