2222import com .google .api .gax .batching .BatchingSettings ;
2323import com .google .api .gax .batching .FlowControlSettings ;
2424import com .google .api .gax .batching .FlowController ;
25+ import com .google .api .gax .core .CredentialsProvider ;
2526import com .google .api .gax .grpc .ChannelProvider ;
2627import com .google .api .gax .grpc .ExecutorProvider ;
2728import com .google .api .gax .grpc .InstantiatingExecutorProvider ;
2829import com .google .api .gax .retrying .RetrySettings ;
30+ import com .google .auth .Credentials ;
2931import com .google .auth .oauth2 .GoogleCredentials ;
3032import com .google .common .annotations .VisibleForTesting ;
3133import com .google .common .base .Preconditions ;
3537import com .google .pubsub .v1 .PublishRequest ;
3638import com .google .pubsub .v1 .PublishResponse ;
3739import com .google .pubsub .v1 .PublisherGrpc ;
40+ import com .google .pubsub .v1 .PublisherGrpc .PublisherFutureStub ;
3841import com .google .pubsub .v1 .PubsubMessage ;
3942import com .google .pubsub .v1 .TopicName ;
43+ import io .grpc .CallCredentials ;
4044import io .grpc .ManagedChannel ;
4145import io .grpc .Status ;
46+ import io .grpc .auth .MoreCallCredentials ;
4247import java .io .IOException ;
4348import java .util .ArrayList ;
4449import java .util .Iterator ;
5358import java .util .concurrent .locks .ReentrantLock ;
5459import java .util .logging .Level ;
5560import java .util .logging .Logger ;
61+ import javax .annotation .Nullable ;
5662import org .threeten .bp .Duration ;
5763
5864/**
@@ -95,6 +101,7 @@ public class Publisher {
95101 private final FlowController flowController ;
96102 private final ManagedChannel [] channels ;
97103 private final AtomicRoundRobin channelIndex ;
104+ @ Nullable private final CallCredentials callCredentials ;
98105
99106 private final ScheduledExecutorService executor ;
100107 private final AtomicBoolean shutdown ;
@@ -155,6 +162,10 @@ public void close() {
155162 });
156163 }
157164 channelIndex = new AtomicRoundRobin (channels .length );
165+
166+ Credentials credentials = builder .credentialsProvider .getCredentials ();
167+ callCredentials = credentials == null ? null : MoreCallCredentials .from (credentials );
168+
158169 shutdown = new AtomicBoolean (false );
159170 messagesWaiter = new MessageWaiter ();
160171 }
@@ -328,10 +339,15 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
328339 * Math .pow (retrySettings .getRpcTimeoutMultiplier (), outstandingBatch .attempt - 1 ));
329340 rpcTimeoutMs = Math .min (rpcTimeoutMs , retrySettings .getMaxRpcTimeout ().toMillis ());
330341
331- Futures . addCallback (
342+ PublisherFutureStub stub =
332343 PublisherGrpc .newFutureStub (channels [currentChannel ])
333- .withDeadlineAfter (rpcTimeoutMs , TimeUnit .MILLISECONDS )
334- .publish (publishRequest .build ()),
344+ .withDeadlineAfter (rpcTimeoutMs , TimeUnit .MILLISECONDS );
345+ if (callCredentials != null ) {
346+ stub = stub .withCallCredentials (callCredentials );
347+ }
348+
349+ Futures .addCallback (
350+ stub .publish (publishRequest .build ()),
335351 new FutureCallback <PublishResponse >() {
336352 @ Override
337353 public void onSuccess (PublishResponse result ) {
@@ -582,6 +598,8 @@ public long nextLong(long least, long bound) {
582598
583599 ChannelProvider channelProvider = TopicAdminSettings .defaultChannelProviderBuilder ().build ();
584600 ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER ;
601+ CredentialsProvider credentialsProvider =
602+ TopicAdminSettings .defaultCredentialsProviderBuilder ().build ();
585603
586604 private Builder (TopicName topic ) {
587605 this .topicName = Preconditions .checkNotNull (topic );
@@ -600,6 +618,11 @@ public Builder setChannelProvider(ChannelProvider channelProvider) {
600618 return this ;
601619 }
602620
621+ public Builder setCredentialsProvider (CredentialsProvider credentialsProvider ) {
622+ this .credentialsProvider = Preconditions .checkNotNull (credentialsProvider );
623+ return this ;
624+ }
625+
603626 // Batching options
604627 public Builder setBatchingSettings (BatchingSettings batchingSettings ) {
605628 Preconditions .checkNotNull (batchingSettings );
0 commit comments