Skip to content

Commit c46faf2

Browse files
authored
use ChannelProvider instead of ChannelBuilder (#1563)
* use ChannelProvider instead of ChannelBuilder * document that ChannelProvider should normally create new channels
1 parent 16957b0 commit c46faf2

2 files changed

Lines changed: 57 additions & 74 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 29 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919
import com.google.api.gax.bundling.FlowController;
2020
import com.google.api.gax.core.RetrySettings;
2121
import com.google.api.gax.grpc.BundlingSettings;
22+
import com.google.api.gax.grpc.ChannelProvider;
2223
import com.google.api.gax.grpc.ExecutorProvider;
2324
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
24-
import com.google.auth.Credentials;
2525
import com.google.auth.oauth2.GoogleCredentials;
2626
import com.google.common.annotations.VisibleForTesting;
27-
import com.google.common.base.Optional;
2827
import com.google.common.base.Preconditions;
2928
import com.google.common.collect.ImmutableList;
3029
import com.google.common.util.concurrent.FutureCallback;
@@ -36,14 +35,8 @@
3635
import com.google.pubsub.v1.PublisherGrpc;
3736
import com.google.pubsub.v1.PubsubMessage;
3837
import com.google.pubsub.v1.TopicName;
39-
import io.grpc.CallCredentials;
40-
import io.grpc.Channel;
41-
import io.grpc.ManagedChannelBuilder;
38+
import io.grpc.ManagedChannel;
4239
import io.grpc.Status;
43-
import io.grpc.auth.MoreCallCredentials;
44-
import io.grpc.netty.GrpcSslContexts;
45-
import io.grpc.netty.NegotiationType;
46-
import io.grpc.netty.NettyChannelBuilder;
4740
import java.io.IOException;
4841
import java.util.ArrayList;
4942
import java.util.Iterator;
@@ -139,9 +132,8 @@ public static long getApiMaxRequestBytes() {
139132
private final AtomicBoolean activeAlarm;
140133

141134
private final FlowController flowController;
142-
private final Channel[] channels;
135+
private final ManagedChannel[] channels;
143136
private final AtomicRoundRobin channelIndex;
144-
private final CallCredentials credentials;
145137

146138
private final ScheduledExecutorService executor;
147139
private final AtomicBoolean shutdown;
@@ -164,37 +156,35 @@ private Publisher(Builder builder) throws IOException {
164156
messagesBundle = new LinkedList<>();
165157
messagesBundleLock = new ReentrantLock();
166158
activeAlarm = new AtomicBoolean(false);
167-
int numCores = Math.max(1, Runtime.getRuntime().availableProcessors());
168159
executor = builder.executorProvider.getExecutor();
169160
if (builder.executorProvider.shouldAutoClose()) {
170161
closeables.add(
171162
new AutoCloseable() {
172163
@Override
173-
public void close() throws IOException {
164+
public void close() {
174165
executor.shutdown();
175166
}
176167
});
177168
}
178-
channels = new Channel[numCores];
179-
channelIndex = new AtomicRoundRobin(channels.length);
180-
for (int i = 0; i < numCores; i++) {
169+
channels = new ManagedChannel[Runtime.getRuntime().availableProcessors()];
170+
for (int i = 0; i < channels.length; i++) {
181171
channels[i] =
182-
builder.channelBuilder.isPresent()
183-
? builder.channelBuilder.get().build()
184-
: NettyChannelBuilder.forAddress(
185-
PublisherSettings.getDefaultServiceAddress(),
186-
PublisherSettings.getDefaultServicePort())
187-
.negotiationType(NegotiationType.TLS)
188-
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
189-
.executor(executor)
190-
.build();
172+
builder.channelProvider.needsExecutor()
173+
? builder.channelProvider.getChannel(executor)
174+
: builder.channelProvider.getChannel();
175+
}
176+
if (builder.channelProvider.shouldAutoClose()) {
177+
closeables.add(
178+
new AutoCloseable() {
179+
@Override
180+
public void close() {
181+
for (int i = 0; i < channels.length; i++) {
182+
channels[i].shutdown();
183+
}
184+
}
185+
});
191186
}
192-
credentials =
193-
MoreCallCredentials.from(
194-
builder.userCredentials.isPresent()
195-
? builder.userCredentials.get()
196-
: GoogleCredentials.getApplicationDefault()
197-
.createScoped(PublisherSettings.getDefaultServiceScopes()));
187+
channelIndex = new AtomicRoundRobin(channels.length);
198188
shutdown = new AtomicBoolean(false);
199189
messagesWaiter = new MessagesWaiter();
200190
}
@@ -350,7 +340,6 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle)
350340

351341
Futures.addCallback(
352342
PublisherGrpc.newFutureStub(channels[currentChannel])
353-
.withCallCredentials(credentials)
354343
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS)
355344
.publish(publishRequest.build()),
356345
new FutureCallback<PublishResponse>() {
@@ -588,37 +577,23 @@ public long nextLong(long least, long bound) {
588577
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
589578
LongRandom longRandom = DEFAULT_LONG_RANDOM;
590579

591-
// Channels and credentials
592-
Optional<Credentials> userCredentials = Optional.absent();
593-
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
594-
Optional.absent();
595-
580+
ChannelProvider channelProvider = PublisherSettings.defaultChannelProviderBuilder().build();
596581
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
597582

598583
private Builder(TopicName topic) {
599584
this.topicName = Preconditions.checkNotNull(topic);
600585
}
601586

602587
/**
603-
* Credentials to authenticate with.
604-
*
605-
* <p>Must be properly scoped for accessing Cloud Pub/Sub APIs.
606-
*/
607-
public Builder setCredentials(Credentials userCredentials) {
608-
this.userCredentials = Optional.of(Preconditions.checkNotNull(userCredentials));
609-
return this;
610-
}
611-
612-
/**
613-
* ManagedChannelBuilder to use to create Channels.
588+
* {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub
589+
* endpoint.
614590
*
615-
* <p>Must point at Cloud Pub/Sub endpoint.
591+
* <p>For performance, this client benefits from having multiple channels open at once. Users
592+
* are encouraged to provide instances of {@code ChannelProvider} that creates new channels
593+
* instead of returning pre-initialized ones.
616594
*/
617-
public Builder setChannelBuilder(
618-
ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder) {
619-
this.channelBuilder =
620-
Optional.<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>of(
621-
Preconditions.checkNotNull(channelBuilder));
595+
public Builder setChannelProvider(ChannelProvider channelProvider) {
596+
this.channelProvider = Preconditions.checkNotNull(channelProvider);
622597
return this;
623598
}
624599

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import com.google.api.gax.bundling.FlowController;
2727
import com.google.api.gax.grpc.BundlingSettings;
28+
import com.google.api.gax.grpc.ChannelProvider;
2829
import com.google.api.gax.grpc.ExecutorProvider;
2930
import com.google.api.gax.grpc.FixedExecutorProvider;
3031
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
@@ -36,13 +37,15 @@
3637
import com.google.pubsub.v1.PublishResponse;
3738
import com.google.pubsub.v1.PubsubMessage;
3839
import com.google.pubsub.v1.TopicName;
40+
import io.grpc.ManagedChannel;
3941
import io.grpc.Status;
4042
import io.grpc.StatusException;
4143
import io.grpc.inprocess.InProcessChannelBuilder;
4244
import io.grpc.inprocess.InProcessServerBuilder;
4345
import io.grpc.internal.ServerImpl;
4446
import io.grpc.stub.StreamObserver;
4547
import java.util.concurrent.ExecutionException;
48+
import java.util.concurrent.Executor;
4649
import org.joda.time.Duration;
4750
import org.junit.After;
4851
import org.junit.Before;
@@ -63,7 +66,28 @@ public class PublisherImplTest {
6366
private static final ExecutorProvider SINGLE_THREAD_EXECUTOR =
6467
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
6568

66-
private InProcessChannelBuilder testChannelBuilder;
69+
private static final ChannelProvider TEST_CHANNEL_PROVIDER =
70+
new ChannelProvider() {
71+
@Override
72+
public boolean shouldAutoClose() {
73+
return true;
74+
}
75+
76+
@Override
77+
public boolean needsExecutor() {
78+
return false;
79+
}
80+
81+
@Override
82+
public ManagedChannel getChannel() {
83+
return InProcessChannelBuilder.forName("test-server").build();
84+
}
85+
86+
@Override
87+
public ManagedChannel getChannel(Executor executor) {
88+
throw new IllegalArgumentException("testChannelprovider doesn't need an executor");
89+
}
90+
};
6791

6892
@Captor private ArgumentCaptor<PublishRequest> requestCaptor;
6993

@@ -82,8 +106,6 @@ public void setUp() throws Exception {
82106
testPublisherServiceImpl = Mockito.spy(new FakePublisherServiceImpl());
83107

84108
InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("test-server");
85-
testChannelBuilder = InProcessChannelBuilder.forName("test-server");
86-
InProcessChannelBuilder.forName("publisher_test");
87109
serverBuilder.addService(testPublisherServiceImpl);
88110
testServer = serverBuilder.build();
89111
testServer.start();
@@ -92,7 +114,6 @@ public void setUp() throws Exception {
92114
testPublisherServiceImpl.reset();
93115
Mockito.reset(testPublisherServiceImpl);
94116
fakeExecutor = new FakeScheduledExecutorService();
95-
testCredentials = new FakeCredentials();
96117
}
97118

98119
@After
@@ -350,11 +371,8 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
350371

351372
@Test
352373
public void testPublisherGetters() throws Exception {
353-
FakeCredentials credentials = new FakeCredentials();
354-
355374
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
356-
builder.setChannelBuilder(testChannelBuilder);
357-
builder.setCredentials(credentials);
375+
builder.setChannelProvider(TEST_CHANNEL_PROVIDER);
358376
builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
359377
builder.setFailOnFlowControlLimits(true);
360378
builder.setBundlingSettings(
@@ -386,7 +404,6 @@ public void testPublisherGetters() throws Exception {
386404
public void testBuilderParametersAndDefaults() {
387405
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
388406
assertEquals(TEST_TOPIC, builder.topicName);
389-
assertEquals(Optional.absent(), builder.channelBuilder);
390407
assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
391408
assertFalse(builder.failOnFlowControlLimits);
392409
assertEquals(
@@ -399,22 +416,14 @@ public void testBuilderParametersAndDefaults() {
399416
builder.bundlingSettings.getElementCountThreshold().longValue());
400417
assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings);
401418
assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings);
402-
assertEquals(Optional.absent(), builder.userCredentials);
403419
}
404420

405421
@Test
406422
public void testBuilderInvalidArguments() {
407423
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
408424

409425
try {
410-
builder.setChannelBuilder(null);
411-
fail("Should have thrown an IllegalArgumentException");
412-
} catch (NullPointerException expected) {
413-
// Expected
414-
}
415-
416-
try {
417-
builder.setCredentials(null);
426+
builder.setChannelProvider(null);
418427
fail("Should have thrown an IllegalArgumentException");
419428
} catch (NullPointerException expected) {
420429
// Expected
@@ -602,9 +611,8 @@ public void testBuilderInvalidArguments() {
602611

603612
private Builder getTestPublisherBuilder() {
604613
return Publisher.newBuilder(TEST_TOPIC)
605-
.setCredentials(testCredentials)
606614
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
607-
.setChannelBuilder(testChannelBuilder)
615+
.setChannelProvider(TEST_CHANNEL_PROVIDER)
608616
.setLongRandom(
609617
new Publisher.LongRandom() {
610618
@Override

0 commit comments

Comments
 (0)