2222import com .google .api .gax .core .CurrentMillisClock ;
2323import com .google .api .gax .core .FlowControlSettings ;
2424import com .google .api .gax .core .FlowController ;
25+ import com .google .api .gax .grpc .ChannelProvider ;
2526import com .google .api .gax .grpc .ExecutorProvider ;
2627import com .google .api .gax .grpc .InstantiatingExecutorProvider ;
2728import com .google .api .stats .Distribution ;
3233import com .google .common .base .Preconditions ;
3334import com .google .common .primitives .Ints ;
3435import com .google .pubsub .v1 .SubscriptionName ;
35- import io .grpc .ManagedChannelBuilder ;
36+ import io .grpc .ManagedChannel ;
3637import io .grpc .Status ;
3738import io .grpc .StatusRuntimeException ;
38- import io .grpc .netty .GrpcSslContexts ;
39- import io .grpc .netty .NegotiationType ;
40- import io .grpc .netty .NettyChannelBuilder ;
4139import java .io .IOException ;
4240import java .util .ArrayList ;
4341import java .util .List ;
@@ -98,8 +96,8 @@ public class Subscriber extends AbstractApiService {
9896 new Distribution (MAX_ACK_DEADLINE_SECONDS + 1 );
9997 private final int numChannels ;
10098 private final FlowController flowController ;
101- private final ManagedChannelBuilder <? extends ManagedChannelBuilder <?>> channelBuilder ;
102- private final Credentials credentials ;
99+ private final ChannelProvider channelProvider ;
100+ private final List < ManagedChannel > channels ;
103101 private final MessageReceiver receiver ;
104102 private final List <StreamingSubscriberConnection > streamingSubscriberConnections ;
105103 private final List <PollingSubscriberConnection > pollingSubscriberConnections ;
@@ -134,29 +132,10 @@ public void close() throws IOException {
134132 });
135133 }
136134
137- // TODO(pongad): remove this when we move to ManagedChannelBuilder
138- String defaultEndpoint = SubscriptionAdminSettings .getDefaultEndpoint ();
139- int colonPos = defaultEndpoint .indexOf (':' );
140-
141- channelBuilder =
142- builder .channelBuilder .isPresent ()
143- ? builder .channelBuilder .get ()
144- : NettyChannelBuilder .forAddress (
145- defaultEndpoint .substring (0 , colonPos ),
146- Integer .parseInt (defaultEndpoint .substring (colonPos +1 )))
147- .maxMessageSize (MAX_INBOUND_MESSAGE_SIZE )
148- .flowControlWindow (5000000 ) // 2.5 MB
149- .negotiationType (NegotiationType .TLS )
150- .sslContext (GrpcSslContexts .forClient ().ciphers (null ).build ())
151- .executor (executor );
152-
153- credentials =
154- builder .credentials .isPresent ()
155- ? builder .credentials .get ()
156- : GoogleCredentials .getApplicationDefault ()
157- .createScoped (SubscriptionAdminSettings .getDefaultServiceScopes ());
135+ channelProvider = builder .channelProvider ;
158136
159137 numChannels = Math .max (1 , Runtime .getRuntime ().availableProcessors ()) * CHANNELS_PER_CORE ;
138+ channels = new ArrayList <ManagedChannel >(numChannels );
160139 streamingSubscriberConnections = new ArrayList <StreamingSubscriberConnection >(numChannels );
161140 pollingSubscriberConnections = new ArrayList <PollingSubscriberConnection >(numChannels );
162141 }
@@ -218,6 +197,29 @@ public ApiService startAsync() {
218197 @ Override
219198 protected void doStart () {
220199 logger .log (Level .FINE , "Starting subscriber group." );
200+
201+ try {
202+ for (int i = 0 ; i < numChannels ; i ++) {
203+ final ManagedChannel channel =
204+ channelProvider .needsExecutor ()
205+ ? channelProvider .getChannel (executor )
206+ : channelProvider .getChannel ();
207+ channels .add (channel );
208+ if (channelProvider .shouldAutoClose ()) {
209+ closeables .add (
210+ new AutoCloseable () {
211+ @ Override
212+ public void close () {
213+ channel .shutdown ();
214+ }
215+ });
216+ }
217+ }
218+ } catch (IOException e ) {
219+ // doesn't matter what we throw, the Service will just catch it and fail to start.
220+ throw new IllegalStateException (e );
221+ }
222+
221223 // Streaming pull is not enabled on the service yet.
222224 // startStreamingConnections();
223225 startPollingConnections ();
@@ -244,13 +246,12 @@ private void startStreamingConnections() {
244246 streamingSubscriberConnections .add (
245247 new StreamingSubscriberConnection (
246248 cachedSubscriptionNameString ,
247- credentials ,
248249 receiver ,
249250 ackExpirationPadding ,
250251 maxAckExtensionPeriod ,
251252 streamAckDeadlineSeconds ,
252253 ackLatencyDistribution ,
253- channelBuilder . build ( ),
254+ channels . get ( i ),
254255 flowController ,
255256 executor ,
256257 clock ));
@@ -321,12 +322,11 @@ private void startPollingConnections() {
321322 pollingSubscriberConnections .add (
322323 new PollingSubscriberConnection (
323324 cachedSubscriptionNameString ,
324- credentials ,
325325 receiver ,
326326 ackExpirationPadding ,
327327 maxAckExtensionPeriod ,
328328 ackLatencyDistribution ,
329- channelBuilder . build ( ),
329+ channels . get ( i ),
330330 flowController ,
331331 executor ,
332332 clock ));
@@ -433,8 +433,10 @@ public static final class Builder {
433433 FlowControlSettings flowControlSettings = FlowControlSettings .getDefaultInstance ();
434434
435435 ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER ;
436- Optional <ManagedChannelBuilder <? extends ManagedChannelBuilder <?>>> channelBuilder =
437- Optional .absent ();
436+ ChannelProvider channelProvider =
437+ SubscriptionAdminSettings .defaultChannelProviderBuilder ()
438+ .setMaxInboundMessageSize (MAX_INBOUND_MESSAGE_SIZE )
439+ .build ();
438440 Optional <ApiClock > clock = Optional .absent ();
439441
440442 Builder (SubscriptionName subscriptionName , MessageReceiver receiver ) {
@@ -453,15 +455,15 @@ public Builder setCredentials(Credentials credentials) {
453455 }
454456
455457 /**
456- * ManagedChannelBuilder to use to create Channels.
458+ * {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub
459+ * endpoint.
457460 *
458- * <p>Must point at Cloud Pub/Sub endpoint.
461+ * <p>For performance, this client benefits from having multiple channels open at once. Users
462+ * are encouraged to provide instances of {@code ChannelProvider} that creates new channels
463+ * instead of returning pre-initialized ones.
459464 */
460- public Builder setChannelBuilder (
461- ManagedChannelBuilder <? extends ManagedChannelBuilder <?>> channelBuilder ) {
462- this .channelBuilder =
463- Optional .<ManagedChannelBuilder <? extends ManagedChannelBuilder <?>>>of (
464- Preconditions .checkNotNull (channelBuilder ));
465+ public Builder setChannelProvider (ChannelProvider channelProvider ) {
466+ this .channelProvider = Preconditions .checkNotNull (channelProvider );
465467 return this ;
466468 }
467469
@@ -521,4 +523,3 @@ public Subscriber build() throws IOException {
521523 }
522524 }
523525}
524-
0 commit comments