2323import com .google .api .gax .grpc .InstantiatingExecutorProvider ;
2424import com .google .auth .Credentials ;
2525import com .google .auth .oauth2 .GoogleCredentials ;
26+ import com .google .common .annotations .VisibleForTesting ;
2627import com .google .common .base .Optional ;
2728import com .google .common .base .Preconditions ;
2829import com .google .common .collect .ImmutableList ;
@@ -125,6 +126,7 @@ public static long getApiMaxRequestBytes() {
125126
126127 private final BundlingSettings bundlingSettings ;
127128 private final RetrySettings retrySettings ;
129+ private final LongRandom longRandom ;
128130
129131 private final FlowController .Settings flowControlSettings ;
130132 private final boolean failOnFlowControlLimits ;
@@ -151,6 +153,7 @@ private Publisher(Builder builder) throws IOException {
151153
152154 this .bundlingSettings = builder .bundlingSettings ;
153155 this .retrySettings = builder .retrySettings ;
156+ this .longRandom = builder .longRandom ;
154157
155158 flowControlSettings = builder .flowControlSettings ;
156159 failOnFlowControlLimits = builder .failOnFlowControlLimits ;
@@ -380,7 +383,8 @@ public void onSuccess(PublishResponse result) {
380383
381384 @ Override
382385 public void onFailure (Throwable t ) {
383- long nextBackoffDelay = computeNextBackoffDelayMs (outstandingBundle , retrySettings );
386+ long nextBackoffDelay =
387+ computeNextBackoffDelayMs (outstandingBundle , retrySettings , longRandom );
384388
385389 if (!isRetryable (t )
386390 || System .currentTimeMillis () + nextBackoffDelay
@@ -494,14 +498,14 @@ private boolean hasBundlingBytes() {
494498 }
495499
496500 private static long computeNextBackoffDelayMs (
497- OutstandingBundle outstandingBundle , RetrySettings retrySettings ) {
501+ OutstandingBundle outstandingBundle , RetrySettings retrySettings , LongRandom longRandom ) {
498502 long delayMillis =
499503 Math .round (
500504 retrySettings .getInitialRetryDelay ().getMillis ()
501505 * Math .pow (retrySettings .getRetryDelayMultiplier (), outstandingBundle .attempt - 1 ));
502506 delayMillis = Math .min (retrySettings .getMaxRetryDelay ().getMillis (), delayMillis );
503507 outstandingBundle .attempt ++;
504- return ThreadLocalRandom . current (). nextLong (delayMillis / 2 , delayMillis );
508+ return longRandom . nextLong (0 , delayMillis );
505509 }
506510
507511 private boolean isRetryable (Throwable t ) {
@@ -520,6 +524,10 @@ private boolean isRetryable(Throwable t) {
520524 }
521525 }
522526
527+ interface LongRandom {
528+ long nextLong (long least , long bound );
529+ }
530+
523531 /** A builder of {@link Publisher}s. */
524532 public static final class Builder {
525533 static final Duration MIN_TOTAL_TIMEOUT = new Duration (10 * 1000 ); // 10 seconds
@@ -547,6 +555,13 @@ public static final class Builder {
547555 .setRpcTimeoutMultiplier (2 )
548556 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
549557 .build ();
558+ static final LongRandom DEFAULT_LONG_RANDOM =
559+ new LongRandom () {
560+ @ Override
561+ public long nextLong (long least , long bound ) {
562+ return ThreadLocalRandom .current ().nextLong (least , bound );
563+ }
564+ };
550565
551566 private static final int THREADS_PER_CPU = 5 ;
552567 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
@@ -564,6 +579,7 @@ public static final class Builder {
564579 boolean failOnFlowControlLimits = false ;
565580
566581 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
582+ LongRandom longRandom = DEFAULT_LONG_RANDOM ;
567583
568584 // Channels and credentials
569585 Optional <Credentials > userCredentials = Optional .absent ();
@@ -659,6 +675,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
659675 return this ;
660676 }
661677
678+ @ VisibleForTesting
679+ Builder setLongRandom (LongRandom longRandom ) {
680+ this .longRandom = Preconditions .checkNotNull (longRandom );
681+ return this ;
682+ }
683+
662684 /** Gives the ability to set a custom executor to be used by the library. */
663685 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
664686 this .executorProvider = Preconditions .checkNotNull (executorProvider );
0 commit comments