Skip to content

Commit 7d21123

Browse files
authored
---
yaml --- r: 8099 b: refs/heads/tswast-patch-1 c: 3814cb6 h: refs/heads/master i: 8097: c8eff5e 8095: 00d01f4
1 parent a1f49c3 commit 7d21123

6 files changed

Lines changed: 32 additions & 90 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ refs/tags/v0.18.0: 9d193c4c4b9d1c6f21515dd8e50836b9194ec9bb
5757
refs/tags/v0.19.0: e67b56e4d8dad5f9a7b38c9b2107c23c828f2ed5
5858
refs/tags/v0.20.0: 839f7fb7156535146aa1cb2c5aadd8d375d854e8
5959
refs/tags/v0.20.1: 370471f437f1f4f68a11e068df5cd6bf39edb1fa
60-
refs/heads/tswast-patch-1: 8c4c4bd754eb167f6b10a1b5af3b7d0ede87c831
60+
refs/heads/tswast-patch-1: 3814cb64747bd03c0c08c43e64b868fadfd52753
6161
refs/heads/pubsub-streaming-pull: 19262b752ee874eb2ca3b950eb2aef44d5a5267b

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,8 @@
3232
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
3333
import com.google.pubsub.v1.PullRequest;
3434
import com.google.pubsub.v1.PullResponse;
35-
import com.google.pubsub.v1.SubscriberGrpc;
3635
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
3736
import com.google.pubsub.v1.Subscription;
38-
import io.grpc.Channel;
3937
import java.util.List;
4038
import java.util.concurrent.ScheduledExecutorService;
4139
import java.util.concurrent.TimeUnit;
@@ -69,15 +67,15 @@ public PollingSubscriberConnection(
6967
Duration ackExpirationPadding,
7068
Duration maxAckExtensionPeriod,
7169
Distribution ackLatencyDistribution,
72-
Channel channel,
70+
SubscriberFutureStub stub,
7371
FlowController flowController,
7472
@Nullable Long maxDesiredPulledMessages,
7573
ScheduledExecutorService executor,
7674
ScheduledExecutorService systemExecutor,
7775
ApiClock clock) {
7876
this.subscription = subscription;
7977
this.pollingExecutor = systemExecutor;
80-
stub = SubscriberGrpc.newFutureStub(channel);
78+
this.stub = stub;
8179
messageDispatcher =
8280
new MessageDispatcher(
8381
receiver,

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ public Builder setChannelProvider(ChannelProvider channelProvider) {
618618
return this;
619619
}
620620

621+
/** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
621622
public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
622623
this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider);
623624
return this;

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 26 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,24 @@
2323
import com.google.api.gax.batching.FlowControlSettings;
2424
import com.google.api.gax.batching.FlowController;
2525
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
26+
import com.google.api.gax.core.CredentialsProvider;
2627
import com.google.api.gax.core.Distribution;
2728
import com.google.api.gax.grpc.ChannelProvider;
2829
import com.google.api.gax.grpc.ExecutorProvider;
2930
import com.google.api.gax.grpc.FixedExecutorProvider;
3031
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
32+
import com.google.auth.Credentials;
3133
import com.google.auth.oauth2.GoogleCredentials;
3234
import com.google.common.annotations.VisibleForTesting;
3335
import com.google.common.base.Optional;
3436
import com.google.common.base.Preconditions;
3537
import com.google.common.primitives.Ints;
38+
import com.google.pubsub.v1.SubscriberGrpc;
39+
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
3640
import com.google.pubsub.v1.SubscriptionName;
41+
import io.grpc.CallCredentials;
3742
import io.grpc.ManagedChannel;
38-
import io.grpc.Status;
39-
import io.grpc.StatusRuntimeException;
43+
import io.grpc.auth.MoreCallCredentials;
4044
import java.io.IOException;
4145
import java.util.ArrayList;
4246
import java.util.List;
@@ -103,6 +107,7 @@ public class Subscriber extends AbstractApiService {
103107
private final int numChannels;
104108
private final FlowController flowController;
105109
private final ChannelProvider channelProvider;
110+
private final CredentialsProvider credentialsProvider;
106111
private final List<ManagedChannel> channels;
107112
private final MessageReceiver receiver;
108113
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
@@ -156,6 +161,7 @@ public void close() throws IOException {
156161
}
157162

158163
channelProvider = builder.channelProvider;
164+
credentialsProvider = builder.credentialsProvider;
159165

160166
numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
161167
channels = new ArrayList<ManagedChannel>(numChannels);
@@ -264,7 +270,7 @@ public void run() {
264270

265271
@Override
266272
protected void doStop() {
267-
stopAllStreamingConnections();
273+
// stopAllStreamingConnections();
268274
stopAllPollingConnections();
269275
try {
270276
for (AutoCloseable closeable : closeables) {
@@ -276,96 +282,24 @@ protected void doStop() {
276282
}
277283
}
278284

279-
private void startStreamingConnections() {
280-
synchronized (streamingSubscriberConnections) {
281-
for (int i = 0; i < numChannels; i++) {
282-
streamingSubscriberConnections.add(
283-
new StreamingSubscriberConnection(
284-
cachedSubscriptionNameString,
285-
receiver,
286-
ackExpirationPadding,
287-
maxAckExtensionPeriod,
288-
streamAckDeadlineSeconds,
289-
ackLatencyDistribution,
290-
channels.get(i),
291-
flowController,
292-
executor,
293-
alarmsExecutor,
294-
clock));
295-
}
296-
startConnections(
297-
streamingSubscriberConnections,
298-
new Listener() {
299-
@Override
300-
public void failed(State from, Throwable failure) {
301-
// If a connection failed is because of a fatal error, we should fail the
302-
// whole subscriber.
303-
stopAllStreamingConnections();
304-
if (failure instanceof StatusRuntimeException
305-
&& ((StatusRuntimeException) failure).getStatus().getCode()
306-
== Status.Code.UNIMPLEMENTED) {
307-
logger.info("Unable to open streaming connections, falling back to polling.");
308-
startPollingConnections();
309-
return;
310-
}
311-
notifyFailed(failure);
312-
}
313-
});
314-
}
315-
316-
ackDeadlineUpdater =
317-
executor.scheduleAtFixedRate(
318-
new Runnable() {
319-
@Override
320-
public void run() {
321-
// It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
322-
long ackLatency =
323-
ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
324-
if (ackLatency > 0) {
325-
long ackExpirationPaddingMillis = ackExpirationPadding.toMillis();
326-
int possibleStreamAckDeadlineSeconds =
327-
Math.max(
328-
MIN_ACK_DEADLINE_SECONDS,
329-
Ints.saturatedCast(
330-
Math.max(ackLatency,
331-
TimeUnit.MILLISECONDS.toSeconds(ackExpirationPaddingMillis))));
332-
if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
333-
streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
334-
logger.log(
335-
Level.FINER,
336-
"Updating stream deadline to {0} seconds.",
337-
streamAckDeadlineSeconds);
338-
for (StreamingSubscriberConnection subscriberConnection :
339-
streamingSubscriberConnections) {
340-
subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds);
341-
}
342-
}
343-
}
344-
}
345-
},
346-
ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
347-
ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
348-
TimeUnit.MILLISECONDS);
349-
}
350-
351-
private void stopAllStreamingConnections() {
352-
stopConnections(streamingSubscriberConnections);
353-
if (ackDeadlineUpdater != null) {
354-
ackDeadlineUpdater.cancel(true);
355-
}
356-
}
357-
358-
private void startPollingConnections() {
285+
private void startPollingConnections() throws IOException {
359286
synchronized (pollingSubscriberConnections) {
287+
Credentials credentials = credentialsProvider.getCredentials();
288+
CallCredentials callCredentials =
289+
credentials == null ? null : MoreCallCredentials.from(credentials);
360290
for (int i = 0; i < numChannels; i++) {
291+
SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channels.get(i));
292+
if (callCredentials != null) {
293+
stub = stub.withCallCredentials(callCredentials);
294+
}
361295
pollingSubscriberConnections.add(
362296
new PollingSubscriberConnection(
363297
cachedSubscriptionNameString,
364298
receiver,
365299
ackExpirationPadding,
366300
maxAckExtensionPeriod,
367301
ackLatencyDistribution,
368-
channels.get(i),
302+
stub,
369303
flowController,
370304
flowControlSettings.getMaxOutstandingElementCount(),
371305
executor,
@@ -460,6 +394,8 @@ public static final class Builder {
460394
SubscriptionAdminSettings.defaultChannelProviderBuilder()
461395
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
462396
.build();
397+
CredentialsProvider credentialsProvider =
398+
SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build();
463399
Optional<ApiClock> clock = Optional.absent();
464400

465401
Builder(SubscriptionName subscriptionName, MessageReceiver receiver) {
@@ -525,6 +461,12 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
525461
return this;
526462
}
527463

464+
/** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
465+
public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
466+
this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider);
467+
return this;
468+
}
469+
528470
/**
529471
* Gives the ability to set a custom executor for managing lease extensions. If none is provided
530472
* a shared one will be used by all {@link Subscriber} instances.

branches/tswast-patch-1/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public ManagedChannel getChannel(Executor executor) {
8181

8282
// Gax declares a similar type, which can be used after gax is upgraded.
8383
@Deprecated
84-
private static final CredentialsProvider NO_CREDENTIALS_PROVIDER =
84+
static final CredentialsProvider NO_CREDENTIALS_PROVIDER =
8585
new CredentialsProvider() {
8686
@Override
8787
public Credentials getCredentials() {

branches/tswast-patch-1/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ private Builder getTestSubscriberBuilder(MessageReceiver receiver) {
538538
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
539539
.setLeaseAlarmsExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
540540
.setChannelProvider(FixedChannelProvider.create(testChannel))
541+
.setCredentialsProvider(PublisherImplTest.NO_CREDENTIALS_PROVIDER)
541542
.setClock(fakeExecutor.getClock());
542543
}
543544

0 commit comments

Comments
 (0)