1717package com .google .cloud .pubsub .v1 ;
1818
1919import com .google .api .core .ApiFuture ;
20+ import com .google .api .core .ApiFutureCallback ;
21+ import com .google .api .core .ApiFutures ;
2022import com .google .api .core .BetaApi ;
21- import com .google .api .core .InternalApi ;
2223import com .google .api .core .SettableApiFuture ;
2324import com .google .api .gax .batching .BatchingSettings ;
2425import com .google .api .gax .core .CredentialsProvider ;
2526import com .google .api .gax .core .ExecutorAsBackgroundResource ;
2627import com .google .api .gax .core .ExecutorProvider ;
28+ import com .google .api .gax .core .FixedExecutorProvider ;
2729import com .google .api .gax .core .InstantiatingExecutorProvider ;
2830import com .google .api .gax .grpc .GrpcStatusCode ;
29- import com .google .api .gax .grpc .GrpcTransportChannel ;
3031import com .google .api .gax .retrying .RetrySettings ;
3132import com .google .api .gax .rpc .ApiException ;
3233import com .google .api .gax .rpc .ApiExceptionFactory ;
3334import com .google .api .gax .rpc .HeaderProvider ;
3435import com .google .api .gax .rpc .NoHeaderProvider ;
36+ import com .google .api .gax .rpc .StatusCode ;
3537import com .google .api .gax .rpc .TransportChannelProvider ;
36- import com .google .auth .Credentials ;
3738import com .google .auth .oauth2 .GoogleCredentials ;
39+ import com .google .cloud .pubsub .v1 .stub .GrpcPublisherStub ;
40+ import com .google .cloud .pubsub .v1 .stub .PublisherStub ;
41+ import com .google .cloud .pubsub .v1 .stub .PublisherStubSettings ;
3842import com .google .common .base .Preconditions ;
3943import com .google .common .collect .ImmutableList ;
40- import com .google .common .collect .ImmutableMap ;
41- import com .google .common .util .concurrent .FutureCallback ;
42- import com .google .common .util .concurrent .Futures ;
4344import com .google .pubsub .v1 .PublishRequest ;
4445import com .google .pubsub .v1 .PublishResponse ;
45- import com .google .pubsub .v1 .PublisherGrpc ;
46- import com .google .pubsub .v1 .PublisherGrpc .PublisherFutureStub ;
4746import com .google .pubsub .v1 .PubsubMessage ;
4847import com .google .pubsub .v1 .TopicName ;
4948import com .google .pubsub .v1 .TopicNames ;
50- import io .grpc .CallCredentials ;
51- import io .grpc .Channel ;
5249import io .grpc .Status ;
53- import io .grpc .auth .MoreCallCredentials ;
5450import java .io .IOException ;
55- import java .util .ArrayList ;
51+ import java .util .Collections ;
5652import java .util .Iterator ;
5753import java .util .LinkedList ;
5854import java .util .List ;
59- import java .util .Map ;
6055import java .util .concurrent .ScheduledExecutorService ;
6156import java .util .concurrent .ScheduledFuture ;
62- import java .util .concurrent .ThreadLocalRandom ;
6357import java .util .concurrent .TimeUnit ;
6458import java .util .concurrent .atomic .AtomicBoolean ;
6559import java .util .concurrent .locks .Lock ;
6660import java .util .concurrent .locks .ReentrantLock ;
6761import java .util .logging .Level ;
6862import java .util .logging .Logger ;
69- import javax .annotation .Nullable ;
7063import org .threeten .bp .Duration ;
7164
7265/**
@@ -93,21 +86,18 @@ public class Publisher {
9386 private final String topicName ;
9487
9588 private final BatchingSettings batchingSettings ;
96- private final RetrySettings retrySettings ;
97- private final LongRandom longRandom ;
9889
9990 private final Lock messagesBatchLock ;
10091 private List <OutstandingPublish > messagesBatch ;
10192 private int batchedBytes ;
10293
10394 private final AtomicBoolean activeAlarm ;
10495
105- private final Channel channel ;
106- @ Nullable private final CallCredentials callCredentials ;
96+ private final PublisherStub publisherStub ;
10797
10898 private final ScheduledExecutorService executor ;
10999 private final AtomicBoolean shutdown ;
110- private final List <AutoCloseable > closeables = new ArrayList <>() ;
100+ private final List <AutoCloseable > closeables ;
111101 private final MessageWaiter messagesWaiter ;
112102 private ScheduledFuture <?> currentAlarmFuture ;
113103
@@ -125,40 +115,43 @@ private Publisher(Builder builder) throws IOException {
125115 topicName = builder .topicName ;
126116
127117 this .batchingSettings = builder .batchingSettings ;
128- this .retrySettings = builder .retrySettings ;
129- this .longRandom = builder .longRandom ;
130118
131119 messagesBatch = new LinkedList <>();
132120 messagesBatchLock = new ReentrantLock ();
133121 activeAlarm = new AtomicBoolean (false );
134122 executor = builder .executorProvider .getExecutor ();
135123 if (builder .executorProvider .shouldAutoClose ()) {
136- closeables .add (new ExecutorAsBackgroundResource (executor ));
124+ closeables =
125+ Collections .<AutoCloseable >singletonList (new ExecutorAsBackgroundResource (executor ));
126+ } else {
127+ closeables = Collections .emptyList ();
137128 }
138- TransportChannelProvider channelProvider = builder .channelProvider ;
139- if (channelProvider .needsExecutor ()) {
140- channelProvider = channelProvider .withExecutor (executor );
141- }
142- if (channelProvider .needsHeaders ()) {
143- Map <String , String > headers =
144- ImmutableMap .<String , String >builder ()
145- .putAll (builder .headerProvider .getHeaders ())
146- .putAll (builder .internalHeaderProvider .getHeaders ())
147- .build ();
148- channelProvider = channelProvider .withHeaders (headers );
149- }
150- if (channelProvider .needsEndpoint ()) {
151- channelProvider = channelProvider .withEndpoint (TopicAdminSettings .getDefaultEndpoint ());
152- }
153- GrpcTransportChannel transportChannel =
154- (GrpcTransportChannel ) channelProvider .getTransportChannel ();
155- channel = transportChannel .getChannel ();
156- if (channelProvider .shouldAutoClose ()) {
157- closeables .add (transportChannel );
129+
130+ // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
131+ // We post-process this here to keep backward-compatibility.
132+ RetrySettings retrySettings = builder .retrySettings ;
133+ if (retrySettings .getMaxAttempts () == 0 ) {
134+ retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
158135 }
159136
160- Credentials credentials = builder .credentialsProvider .getCredentials ();
161- callCredentials = credentials == null ? null : MoreCallCredentials .from (credentials );
137+ PublisherStubSettings .Builder stubSettings =
138+ PublisherStubSettings .newBuilder ()
139+ .setCredentialsProvider (builder .credentialsProvider )
140+ .setExecutorProvider (FixedExecutorProvider .create (executor ))
141+ .setTransportChannelProvider (builder .channelProvider );
142+ stubSettings
143+ .publishSettings ()
144+ .setRetryableCodes (
145+ StatusCode .Code .ABORTED ,
146+ StatusCode .Code .CANCELLED ,
147+ StatusCode .Code .DEADLINE_EXCEEDED ,
148+ StatusCode .Code .INTERNAL ,
149+ StatusCode .Code .RESOURCE_EXHAUSTED ,
150+ StatusCode .Code .UNKNOWN ,
151+ StatusCode .Code .UNAVAILABLE )
152+ .setRetrySettings (retrySettings )
153+ .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
154+ this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
162155
163156 shutdown = new AtomicBoolean (false );
164157 messagesWaiter = new MessageWaiter ();
@@ -320,21 +313,9 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
320313 publishRequest .addMessages (outstandingPublish .message );
321314 }
322315
323- long rpcTimeoutMs =
324- Math .round (
325- retrySettings .getInitialRpcTimeout ().toMillis ()
326- * Math .pow (retrySettings .getRpcTimeoutMultiplier (), outstandingBatch .attempt - 1 ));
327- rpcTimeoutMs = Math .min (rpcTimeoutMs , retrySettings .getMaxRpcTimeout ().toMillis ());
328-
329- PublisherFutureStub stub =
330- PublisherGrpc .newFutureStub (channel ).withDeadlineAfter (rpcTimeoutMs , TimeUnit .MILLISECONDS );
331- if (callCredentials != null ) {
332- stub = stub .withCallCredentials (callCredentials );
333- }
334-
335- Futures .addCallback (
336- stub .publish (publishRequest .build ()),
337- new FutureCallback <PublishResponse >() {
316+ ApiFutures .addCallback (
317+ publisherStub .publishCallable ().futureCall (publishRequest .build ()),
318+ new ApiFutureCallback <PublishResponse >() {
338319 @ Override
339320 public void onSuccess (PublishResponse result ) {
340321 try {
@@ -364,37 +345,13 @@ public void onSuccess(PublishResponse result) {
364345
365346 @ Override
366347 public void onFailure (Throwable t ) {
367- long nextBackoffDelay =
368- computeNextBackoffDelayMs (outstandingBatch , retrySettings , longRandom );
369-
370- if (!isRetryable (t )
371- || retrySettings .getMaxAttempts () > 0
372- && outstandingBatch .getAttempt () > retrySettings .getMaxAttempts ()
373- || System .currentTimeMillis () + nextBackoffDelay
374- > outstandingBatch .creationTime + retrySettings .getTotalTimeout ().toMillis ()) {
375- try {
376- ApiException gaxException =
377- ApiExceptionFactory .createException (
378- t , GrpcStatusCode .of (Status .fromThrowable (t ).getCode ()), false );
379- for (OutstandingPublish outstandingPublish :
380- outstandingBatch .outstandingPublishes ) {
381- outstandingPublish .publishResult .setException (gaxException );
382- }
383- } finally {
384- messagesWaiter .incrementPendingMessages (-outstandingBatch .size ());
348+ try {
349+ for (OutstandingPublish outstandingPublish : outstandingBatch .outstandingPublishes ) {
350+ outstandingPublish .publishResult .setException (t );
385351 }
386- return ;
352+ } finally {
353+ messagesWaiter .incrementPendingMessages (-outstandingBatch .size ());
387354 }
388-
389- executor .schedule (
390- new Runnable () {
391- @ Override
392- public void run () {
393- publishOutstandingBatch (outstandingBatch );
394- }
395- },
396- nextBackoffDelay ,
397- TimeUnit .MILLISECONDS );
398355 }
399356 });
400357 }
@@ -459,43 +416,13 @@ public void shutdown() throws Exception {
459416 for (AutoCloseable closeable : closeables ) {
460417 closeable .close ();
461418 }
419+ publisherStub .shutdown ();
462420 }
463421
464422 private boolean hasBatchingBytes () {
465423 return getMaxBatchBytes () > 0 ;
466424 }
467425
468- private static long computeNextBackoffDelayMs (
469- OutstandingBatch outstandingBatch , RetrySettings retrySettings , LongRandom longRandom ) {
470- long delayMillis =
471- Math .round (
472- retrySettings .getInitialRetryDelay ().toMillis ()
473- * Math .pow (retrySettings .getRetryDelayMultiplier (), outstandingBatch .attempt - 1 ));
474- delayMillis = Math .min (retrySettings .getMaxRetryDelay ().toMillis (), delayMillis );
475- outstandingBatch .attempt ++;
476- return longRandom .nextLong (0 , delayMillis );
477- }
478-
479- private boolean isRetryable (Throwable t ) {
480- Status status = Status .fromThrowable (t );
481- switch (status .getCode ()) {
482- case ABORTED :
483- case CANCELLED :
484- case DEADLINE_EXCEEDED :
485- case INTERNAL :
486- case RESOURCE_EXHAUSTED :
487- case UNKNOWN :
488- case UNAVAILABLE :
489- return true ;
490- default :
491- return false ;
492- }
493- }
494-
495- interface LongRandom {
496- long nextLong (long least , long bound );
497- }
498-
499426 /**
500427 * Constructs a new {@link Builder} using the given topic.
501428 *
@@ -565,13 +492,6 @@ public static final class Builder {
565492 .setRpcTimeoutMultiplier (2 )
566493 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
567494 .build ();
568- static final LongRandom DEFAULT_LONG_RANDOM =
569- new LongRandom () {
570- @ Override
571- public long nextLong (long least , long bound ) {
572- return ThreadLocalRandom .current ().nextLong (least , bound );
573- }
574- };
575495
576496 private static final int THREADS_PER_CPU = 5 ;
577497 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
@@ -585,7 +505,6 @@ public long nextLong(long least, long bound) {
585505 BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS ;
586506
587507 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
588- LongRandom longRandom = DEFAULT_LONG_RANDOM ;
589508
590509 TransportChannelProvider channelProvider =
591510 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
@@ -673,12 +592,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
673592 return this ;
674593 }
675594
676- @ InternalApi
677- Builder setLongRandom (LongRandom longRandom ) {
678- this .longRandom = Preconditions .checkNotNull (longRandom );
679- return this ;
680- }
681-
682595 /** Gives the ability to set a custom executor to be used by the library. */
683596 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
684597 this .executorProvider = Preconditions .checkNotNull (executorProvider );
0 commit comments