Skip to content

Commit 8c9636b

Browse files
pongadgarrettjonesgoogle
authored andcommitted
make Subscriber use ChannelProvider (#1911)
1 parent 05d5811 commit 8c9636b

6 files changed

Lines changed: 60 additions & 132 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.api.gax.core.ApiClock;
2121
import com.google.api.gax.core.FlowController;
2222
import com.google.api.stats.Distribution;
23-
import com.google.auth.Credentials;
2423
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor;
2524
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline;
2625
import com.google.common.collect.Lists;
@@ -36,7 +35,6 @@
3635
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
3736
import com.google.pubsub.v1.Subscription;
3837
import io.grpc.Channel;
39-
import io.grpc.auth.MoreCallCredentials;
4038
import java.util.List;
4139
import java.util.concurrent.ScheduledExecutorService;
4240
import java.util.concurrent.TimeUnit;
@@ -64,7 +62,6 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac
6462

6563
public PollingSubscriberConnection(
6664
String subscription,
67-
Credentials credentials,
6865
MessageReceiver receiver,
6966
Duration ackExpirationPadding,
7067
Duration maxAckExtensionPeriod,
@@ -75,9 +72,7 @@ public PollingSubscriberConnection(
7572
ApiClock clock) {
7673
this.subscription = subscription;
7774
this.executor = executor;
78-
stub =
79-
SubscriberGrpc.newFutureStub(channel)
80-
.withCallCredentials(MoreCallCredentials.from(credentials));
75+
stub = SubscriberGrpc.newFutureStub(channel);
8176
messageDispatcher =
8277
new MessageDispatcher(
8378
receiver,

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.api.gax.core.ApiClock;
2121
import com.google.api.gax.core.FlowController;
2222
import com.google.api.stats.Distribution;
23-
import com.google.auth.Credentials;
2423
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor;
2524
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline;
2625
import com.google.common.annotations.VisibleForTesting;
@@ -34,7 +33,6 @@
3433
import io.grpc.CallOptions;
3534
import io.grpc.Channel;
3635
import io.grpc.Status;
37-
import io.grpc.auth.MoreCallCredentials;
3836
import io.grpc.stub.ClientCallStreamObserver;
3937
import io.grpc.stub.ClientCalls;
4038
import io.grpc.stub.ClientResponseObserver;
@@ -58,7 +56,6 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
5856
private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
5957

6058
private final Channel channel;
61-
private final Credentials credentials;
6259

6360
private final String subscription;
6461
private final ScheduledExecutorService executor;
@@ -67,7 +64,6 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
6764

6865
public StreamingSubscriberConnection(
6966
String subscription,
70-
Credentials credentials,
7167
MessageReceiver receiver,
7268
Duration ackExpirationPadding,
7369
Duration maxAckExtensionPeriod,
@@ -79,7 +75,6 @@ public StreamingSubscriberConnection(
7975
ApiClock clock) {
8076
this.subscription = subscription;
8177
this.executor = executor;
82-
this.credentials = credentials;
8378
this.channel = channel;
8479
this.messageDispatcher =
8580
new MessageDispatcher(
@@ -152,9 +147,7 @@ private void initialize() {
152147
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
153148
(ClientCallStreamObserver<StreamingPullRequest>)
154149
(ClientCalls.asyncBidiStreamingCall(
155-
channel.newCall(
156-
SubscriberGrpc.METHOD_STREAMING_PULL,
157-
CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials))),
150+
channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, CallOptions.DEFAULT),
158151
responseObserver));
159152
logger.log(
160153
Level.FINER,

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.gax.core.CurrentMillisClock;
2323
import com.google.api.gax.core.FlowControlSettings;
2424
import com.google.api.gax.core.FlowController;
25+
import com.google.api.gax.grpc.ChannelProvider;
2526
import com.google.api.gax.grpc.ExecutorProvider;
2627
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
2728
import com.google.api.stats.Distribution;
@@ -32,12 +33,9 @@
3233
import com.google.common.base.Preconditions;
3334
import com.google.common.primitives.Ints;
3435
import com.google.pubsub.v1.SubscriptionName;
35-
import io.grpc.ManagedChannelBuilder;
36+
import io.grpc.ManagedChannel;
3637
import io.grpc.Status;
3738
import io.grpc.StatusRuntimeException;
38-
import io.grpc.netty.GrpcSslContexts;
39-
import io.grpc.netty.NegotiationType;
40-
import io.grpc.netty.NettyChannelBuilder;
4139
import java.io.IOException;
4240
import java.util.ArrayList;
4341
import java.util.List;
@@ -98,8 +96,8 @@ public class Subscriber extends AbstractApiService {
9896
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
9997
private final int numChannels;
10098
private final FlowController flowController;
101-
private final ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder;
102-
private final Credentials credentials;
99+
private final ChannelProvider channelProvider;
100+
private final List<ManagedChannel> channels;
103101
private final MessageReceiver receiver;
104102
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
105103
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
@@ -134,29 +132,10 @@ public void close() throws IOException {
134132
});
135133
}
136134

137-
// TODO(pongad): remove this when we move to ManagedChannelBuilder
138-
String defaultEndpoint = SubscriptionAdminSettings.getDefaultEndpoint();
139-
int colonPos = defaultEndpoint.indexOf(':');
140-
141-
channelBuilder =
142-
builder.channelBuilder.isPresent()
143-
? builder.channelBuilder.get()
144-
: NettyChannelBuilder.forAddress(
145-
defaultEndpoint.substring(0, colonPos),
146-
Integer.parseInt(defaultEndpoint.substring(colonPos+1)))
147-
.maxMessageSize(MAX_INBOUND_MESSAGE_SIZE)
148-
.flowControlWindow(5000000) // 2.5 MB
149-
.negotiationType(NegotiationType.TLS)
150-
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
151-
.executor(executor);
152-
153-
credentials =
154-
builder.credentials.isPresent()
155-
? builder.credentials.get()
156-
: GoogleCredentials.getApplicationDefault()
157-
.createScoped(SubscriptionAdminSettings.getDefaultServiceScopes());
135+
channelProvider = builder.channelProvider;
158136

159137
numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
138+
channels = new ArrayList<ManagedChannel>(numChannels);
160139
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
161140
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);
162141
}
@@ -218,6 +197,29 @@ public ApiService startAsync() {
218197
@Override
219198
protected void doStart() {
220199
logger.log(Level.FINE, "Starting subscriber group.");
200+
201+
try {
202+
for (int i = 0; i < numChannels; i++) {
203+
final ManagedChannel channel =
204+
channelProvider.needsExecutor()
205+
? channelProvider.getChannel(executor)
206+
: channelProvider.getChannel();
207+
channels.add(channel);
208+
if (channelProvider.shouldAutoClose()) {
209+
closeables.add(
210+
new AutoCloseable() {
211+
@Override
212+
public void close() {
213+
channel.shutdown();
214+
}
215+
});
216+
}
217+
}
218+
} catch (IOException e) {
219+
// doesn't matter what we throw, the Service will just catch it and fail to start.
220+
throw new IllegalStateException(e);
221+
}
222+
221223
// Streaming pull is not enabled on the service yet.
222224
// startStreamingConnections();
223225
startPollingConnections();
@@ -244,13 +246,12 @@ private void startStreamingConnections() {
244246
streamingSubscriberConnections.add(
245247
new StreamingSubscriberConnection(
246248
cachedSubscriptionNameString,
247-
credentials,
248249
receiver,
249250
ackExpirationPadding,
250251
maxAckExtensionPeriod,
251252
streamAckDeadlineSeconds,
252253
ackLatencyDistribution,
253-
channelBuilder.build(),
254+
channels.get(i),
254255
flowController,
255256
executor,
256257
clock));
@@ -321,12 +322,11 @@ private void startPollingConnections() {
321322
pollingSubscriberConnections.add(
322323
new PollingSubscriberConnection(
323324
cachedSubscriptionNameString,
324-
credentials,
325325
receiver,
326326
ackExpirationPadding,
327327
maxAckExtensionPeriod,
328328
ackLatencyDistribution,
329-
channelBuilder.build(),
329+
channels.get(i),
330330
flowController,
331331
executor,
332332
clock));
@@ -433,8 +433,10 @@ public static final class Builder {
433433
FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance();
434434

435435
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
436-
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
437-
Optional.absent();
436+
ChannelProvider channelProvider =
437+
SubscriptionAdminSettings.defaultChannelProviderBuilder()
438+
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
439+
.build();
438440
Optional<ApiClock> clock = Optional.absent();
439441

440442
Builder(SubscriptionName subscriptionName, MessageReceiver receiver) {
@@ -453,15 +455,15 @@ public Builder setCredentials(Credentials credentials) {
453455
}
454456

455457
/**
456-
* ManagedChannelBuilder to use to create Channels.
458+
* {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub
459+
* endpoint.
457460
*
458-
* <p>Must point at Cloud Pub/Sub endpoint.
461+
* <p>For performance, this client benefits from having multiple channels open at once. Users
462+
* are encouraged to provide instances of {@code ChannelProvider} that creates new channels
463+
* instead of returning pre-initialized ones.
459464
*/
460-
public Builder setChannelBuilder(
461-
ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder) {
462-
this.channelBuilder =
463-
Optional.<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>of(
464-
Preconditions.checkNotNull(channelBuilder));
465+
public Builder setChannelProvider(ChannelProvider channelProvider) {
466+
this.channelProvider = Preconditions.checkNotNull(channelProvider);
465467
return this;
466468
}
467469

@@ -521,4 +523,3 @@ public Subscriber build() throws IOException {
521523
}
522524
}
523525
}
524-

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeCredentials.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@ public ManagedChannel getChannel(Executor executor) {
8282

8383
private FakeScheduledExecutorService fakeExecutor;
8484

85-
private FakeCredentials testCredentials;
86-
8785
private FakePublisherServiceImpl testPublisherServiceImpl;
8886

8987
private ServerImpl testServer;

0 commit comments

Comments
 (0)