1919import com .google .api .gax .bundling .FlowController ;
2020import com .google .api .gax .core .RetrySettings ;
2121import com .google .api .gax .grpc .BundlingSettings ;
22+ import com .google .api .gax .grpc .ChannelProvider ;
2223import com .google .api .gax .grpc .ExecutorProvider ;
2324import com .google .api .gax .grpc .InstantiatingExecutorProvider ;
24- import com .google .auth .Credentials ;
2525import com .google .auth .oauth2 .GoogleCredentials ;
2626import com .google .common .annotations .VisibleForTesting ;
27- import com .google .common .base .Optional ;
2827import com .google .common .base .Preconditions ;
2928import com .google .common .collect .ImmutableList ;
3029import com .google .common .util .concurrent .FutureCallback ;
3635import com .google .pubsub .v1 .PublisherGrpc ;
3736import com .google .pubsub .v1 .PubsubMessage ;
3837import com .google .pubsub .v1 .TopicName ;
39- import io .grpc .CallCredentials ;
40- import io .grpc .Channel ;
41- import io .grpc .ManagedChannelBuilder ;
38+ import io .grpc .ManagedChannel ;
4239import io .grpc .Status ;
43- import io .grpc .auth .MoreCallCredentials ;
44- import io .grpc .netty .GrpcSslContexts ;
45- import io .grpc .netty .NegotiationType ;
46- import io .grpc .netty .NettyChannelBuilder ;
4740import java .io .IOException ;
4841import java .util .ArrayList ;
4942import java .util .Iterator ;
@@ -139,9 +132,8 @@ public static long getApiMaxRequestBytes() {
139132 private final AtomicBoolean activeAlarm ;
140133
141134 private final FlowController flowController ;
142- private final Channel [] channels ;
135+ private final ManagedChannel [] channels ;
143136 private final AtomicRoundRobin channelIndex ;
144- private final CallCredentials credentials ;
145137
146138 private final ScheduledExecutorService executor ;
147139 private final AtomicBoolean shutdown ;
@@ -164,37 +156,35 @@ private Publisher(Builder builder) throws IOException {
164156 messagesBundle = new LinkedList <>();
165157 messagesBundleLock = new ReentrantLock ();
166158 activeAlarm = new AtomicBoolean (false );
167- int numCores = Math .max (1 , Runtime .getRuntime ().availableProcessors ());
168159 executor = builder .executorProvider .getExecutor ();
169160 if (builder .executorProvider .shouldAutoClose ()) {
170161 closeables .add (
171162 new AutoCloseable () {
172163 @ Override
173- public void close () throws IOException {
164+ public void close () {
174165 executor .shutdown ();
175166 }
176167 });
177168 }
178- channels = new Channel [numCores ];
179- channelIndex = new AtomicRoundRobin (channels .length );
180- for (int i = 0 ; i < numCores ; i ++) {
169+ channels = new ManagedChannel [Runtime .getRuntime ().availableProcessors ()];
170+ for (int i = 0 ; i < channels .length ; i ++) {
181171 channels [i ] =
182- builder .channelBuilder .isPresent ()
183- ? builder .channelBuilder .get ().build ()
184- : NettyChannelBuilder .forAddress (
185- PublisherSettings .getDefaultServiceAddress (),
186- PublisherSettings .getDefaultServicePort ())
187- .negotiationType (NegotiationType .TLS )
188- .sslContext (GrpcSslContexts .forClient ().ciphers (null ).build ())
189- .executor (executor )
190- .build ();
172+ builder .channelProvider .needsExecutor ()
173+ ? builder .channelProvider .getChannel (executor )
174+ : builder .channelProvider .getChannel ();
175+ }
176+ if (builder .channelProvider .shouldAutoClose ()) {
177+ closeables .add (
178+ new AutoCloseable () {
179+ @ Override
180+ public void close () {
181+ for (int i = 0 ; i < channels .length ; i ++) {
182+ channels [i ].shutdown ();
183+ }
184+ }
185+ });
191186 }
192- credentials =
193- MoreCallCredentials .from (
194- builder .userCredentials .isPresent ()
195- ? builder .userCredentials .get ()
196- : GoogleCredentials .getApplicationDefault ()
197- .createScoped (PublisherSettings .getDefaultServiceScopes ()));
187+ channelIndex = new AtomicRoundRobin (channels .length );
198188 shutdown = new AtomicBoolean (false );
199189 messagesWaiter = new MessagesWaiter ();
200190 }
@@ -350,7 +340,6 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle)
350340
351341 Futures .addCallback (
352342 PublisherGrpc .newFutureStub (channels [currentChannel ])
353- .withCallCredentials (credentials )
354343 .withDeadlineAfter (rpcTimeoutMs , TimeUnit .MILLISECONDS )
355344 .publish (publishRequest .build ()),
356345 new FutureCallback <PublishResponse >() {
@@ -588,37 +577,23 @@ public long nextLong(long least, long bound) {
588577 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
589578 LongRandom longRandom = DEFAULT_LONG_RANDOM ;
590579
591- // Channels and credentials
592- Optional <Credentials > userCredentials = Optional .absent ();
593- Optional <ManagedChannelBuilder <? extends ManagedChannelBuilder <?>>> channelBuilder =
594- Optional .absent ();
595-
580+ ChannelProvider channelProvider = PublisherSettings .defaultChannelProviderBuilder ().build ();
596581 ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER ;
597582
598583 private Builder (TopicName topic ) {
599584 this .topicName = Preconditions .checkNotNull (topic );
600585 }
601586
602587 /**
603- * Credentials to authenticate with.
604- *
605- * <p>Must be properly scoped for accessing Cloud Pub/Sub APIs.
606- */
607- public Builder setCredentials (Credentials userCredentials ) {
608- this .userCredentials = Optional .of (Preconditions .checkNotNull (userCredentials ));
609- return this ;
610- }
611-
612- /**
613- * ManagedChannelBuilder to use to create Channels.
588+ * {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub
589+ * endpoint.
614590 *
615- * <p>Must point at Cloud Pub/Sub endpoint.
591+ * <p>For performance, this client benefits from having multiple channels open at once. Users
592+ * are encouraged to provide instances of {@code ChannelProvider} that creates new channels
593+ * instead of returning pre-initialized ones.
616594 */
617- public Builder setChannelBuilder (
618- ManagedChannelBuilder <? extends ManagedChannelBuilder <?>> channelBuilder ) {
619- this .channelBuilder =
620- Optional .<ManagedChannelBuilder <? extends ManagedChannelBuilder <?>>>of (
621- Preconditions .checkNotNull (channelBuilder ));
595+ public Builder setChannelProvider (ChannelProvider channelProvider ) {
596+ this .channelProvider = Preconditions .checkNotNull (channelProvider );
622597 return this ;
623598 }
624599
0 commit comments