Skip to content

Commit c8eff5e

Browse files
authored
---
yaml --- r: 8097 b: refs/heads/tswast-patch-1 c: a08d3fd h: refs/heads/master i: 8095: 00d01f4
1 parent 6b21782 commit c8eff5e

3 files changed

Lines changed: 45 additions & 9 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ refs/tags/v0.18.0: 9d193c4c4b9d1c6f21515dd8e50836b9194ec9bb
5757
refs/tags/v0.19.0: e67b56e4d8dad5f9a7b38c9b2107c23c828f2ed5
5858
refs/tags/v0.20.0: 839f7fb7156535146aa1cb2c5aadd8d375d854e8
5959
refs/tags/v0.20.1: 370471f437f1f4f68a11e068df5cd6bf39edb1fa
60-
refs/heads/tswast-patch-1: 31f044b5914230a207149232ea83aa5ac16cc7f7
60+
refs/heads/tswast-patch-1: a08d3fdf2b28eb49d4003bbfe77b4b730be2812a
6161
refs/heads/pubsub-streaming-pull: 19262b752ee874eb2ca3b950eb2aef44d5a5267b

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import com.google.api.gax.batching.BatchingSettings;
2323
import com.google.api.gax.batching.FlowControlSettings;
2424
import com.google.api.gax.batching.FlowController;
25+
import com.google.api.gax.core.CredentialsProvider;
2526
import com.google.api.gax.grpc.ChannelProvider;
2627
import com.google.api.gax.grpc.ExecutorProvider;
2728
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
2829
import com.google.api.gax.retrying.RetrySettings;
30+
import com.google.auth.Credentials;
2931
import com.google.auth.oauth2.GoogleCredentials;
3032
import com.google.common.annotations.VisibleForTesting;
3133
import com.google.common.base.Preconditions;
@@ -35,10 +37,13 @@
3537
import com.google.pubsub.v1.PublishRequest;
3638
import com.google.pubsub.v1.PublishResponse;
3739
import com.google.pubsub.v1.PublisherGrpc;
40+
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
3841
import com.google.pubsub.v1.PubsubMessage;
3942
import com.google.pubsub.v1.TopicName;
43+
import io.grpc.CallCredentials;
4044
import io.grpc.ManagedChannel;
4145
import io.grpc.Status;
46+
import io.grpc.auth.MoreCallCredentials;
4247
import java.io.IOException;
4348
import java.util.ArrayList;
4449
import java.util.Iterator;
@@ -53,6 +58,7 @@
5358
import java.util.concurrent.locks.ReentrantLock;
5459
import java.util.logging.Level;
5560
import java.util.logging.Logger;
61+
import javax.annotation.Nullable;
5662
import org.threeten.bp.Duration;
5763

5864
/**
@@ -95,6 +101,7 @@ public class Publisher {
95101
private final FlowController flowController;
96102
private final ManagedChannel[] channels;
97103
private final AtomicRoundRobin channelIndex;
104+
@Nullable private final CallCredentials callCredentials;
98105

99106
private final ScheduledExecutorService executor;
100107
private final AtomicBoolean shutdown;
@@ -155,6 +162,10 @@ public void close() {
155162
});
156163
}
157164
channelIndex = new AtomicRoundRobin(channels.length);
165+
166+
Credentials credentials = builder.credentialsProvider.getCredentials();
167+
callCredentials = credentials == null ? null : MoreCallCredentials.from(credentials);
168+
158169
shutdown = new AtomicBoolean(false);
159170
messagesWaiter = new MessageWaiter();
160171
}
@@ -328,10 +339,15 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
328339
* Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBatch.attempt - 1));
329340
rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().toMillis());
330341

331-
Futures.addCallback(
342+
PublisherFutureStub stub =
332343
PublisherGrpc.newFutureStub(channels[currentChannel])
333-
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS)
334-
.publish(publishRequest.build()),
344+
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS);
345+
if (callCredentials != null) {
346+
stub = stub.withCallCredentials(callCredentials);
347+
}
348+
349+
Futures.addCallback(
350+
stub.publish(publishRequest.build()),
335351
new FutureCallback<PublishResponse>() {
336352
@Override
337353
public void onSuccess(PublishResponse result) {
@@ -582,6 +598,8 @@ public long nextLong(long least, long bound) {
582598

583599
ChannelProvider channelProvider = TopicAdminSettings.defaultChannelProviderBuilder().build();
584600
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
601+
CredentialsProvider credentialsProvider =
602+
TopicAdminSettings.defaultCredentialsProviderBuilder().build();
585603

586604
private Builder(TopicName topic) {
587605
this.topicName = Preconditions.checkNotNull(topic);
@@ -600,6 +618,11 @@ public Builder setChannelProvider(ChannelProvider channelProvider) {
600618
return this;
601619
}
602620

621+
public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
622+
this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider);
623+
return this;
624+
}
625+
603626
// Batching options
604627
public Builder setBatchingSettings(BatchingSettings batchingSettings) {
605628
Preconditions.checkNotNull(batchingSettings);

branches/tswast-patch-1/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19+
import static org.junit.Assert.*;
20+
1921
import com.google.api.core.ApiFuture;
2022
import com.google.api.gax.batching.BatchingSettings;
2123
import com.google.api.gax.batching.FlowControlSettings;
2224
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
25+
import com.google.api.gax.core.CredentialsProvider;
2326
import com.google.api.gax.grpc.ChannelProvider;
2427
import com.google.api.gax.grpc.ExecutorProvider;
2528
import com.google.api.gax.grpc.FixedExecutorProvider;
2629
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
30+
import com.google.auth.Credentials;
2731
import com.google.cloud.pubsub.spi.v1.Publisher.Builder;
2832
import com.google.protobuf.ByteString;
2933
import com.google.pubsub.v1.PublishResponse;
@@ -35,18 +39,15 @@
3539
import io.grpc.inprocess.InProcessChannelBuilder;
3640
import io.grpc.inprocess.InProcessServerBuilder;
3741
import io.grpc.internal.ServerImpl;
42+
import java.util.concurrent.ExecutionException;
43+
import java.util.concurrent.Executor;
3844
import org.junit.After;
3945
import org.junit.Before;
4046
import org.junit.Test;
4147
import org.junit.runner.RunWith;
4248
import org.junit.runners.JUnit4;
4349
import org.threeten.bp.Duration;
4450

45-
import java.util.concurrent.ExecutionException;
46-
import java.util.concurrent.Executor;
47-
48-
import static org.junit.Assert.*;
49-
5051
@RunWith(JUnit4.class)
5152
public class PublisherImplTest {
5253

@@ -78,6 +79,16 @@ public ManagedChannel getChannel(Executor executor) {
7879
}
7980
};
8081

82+
// Gax declares a similar type, which can be used after gax is upgraded.
83+
@Deprecated
84+
private static final CredentialsProvider NO_CREDENTIALS_PROVIDER =
85+
new CredentialsProvider() {
86+
@Override
87+
public Credentials getCredentials() {
88+
return null;
89+
}
90+
};
91+
8192
private FakeScheduledExecutorService fakeExecutor;
8293

8394
private FakePublisherServiceImpl testPublisherServiceImpl;
@@ -427,6 +438,7 @@ public void testPublisherGetters() throws Exception {
427438
.setDelayThreshold(Duration.ofMillis(11))
428439
.setElementCountThreshold(12L)
429440
.build());
441+
builder.setCredentialsProvider(NO_CREDENTIALS_PROVIDER);
430442
builder.setFlowControlSettings(
431443
FlowControlSettings.newBuilder()
432444
.setMaxOutstandingRequestBytes(13)
@@ -661,6 +673,7 @@ private Builder getTestPublisherBuilder() {
661673
return Publisher.defaultBuilder(TEST_TOPIC)
662674
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
663675
.setChannelProvider(TEST_CHANNEL_PROVIDER)
676+
.setCredentialsProvider(NO_CREDENTIALS_PROVIDER)
664677
.setLongRandom(
665678
new Publisher.LongRandom() {
666679
@Override

0 commit comments

Comments
 (0)