Skip to content

Commit 61b25f6

Browse files
committed
Update options and spi classes to catch GAX changes
1 parent e498a6a commit 61b25f6

4 files changed

Lines changed: 88 additions & 73 deletions

File tree

google-cloud-core/src/main/java/com/google/cloud/GrpcServiceOptions.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import static com.google.common.base.MoreObjects.firstNonNull;
2020

21-
import com.google.api.gax.core.ConnectionSettings;
21+
import com.google.api.gax.core.FixedCredentialsProvider;
2222
import com.google.api.gax.core.RetrySettings;
23+
import com.google.api.gax.grpc.ChannelProvider;
24+
import com.google.api.gax.grpc.InstantiatingChannelProvider;
2325
import com.google.api.gax.grpc.UnaryCallSettings;
2426
import com.google.auth.Credentials;
2527
import com.google.cloud.spi.ServiceRpcFactory;
@@ -304,26 +306,19 @@ protected UnaryCallSettings.Builder getApiCallSettings() {
304306
}
305307

306308
/**
307-
* Returns a builder for connection-related settings.
309+
* Returns a channel provider.
308310
*/
309-
@Deprecated
310-
protected ConnectionSettings.Builder connectionSettings() {
311-
return getConnectionSettings();
312-
}
313-
314-
/**
315-
* Returns a builder for connection-related settings.
316-
*/
317-
protected ConnectionSettings.Builder getConnectionSettings() {
311+
protected ChannelProvider getChannelProvider() {
318312
HostAndPort hostAndPort = HostAndPort.fromString(getHost());
319-
ConnectionSettings.Builder builder = ConnectionSettings.newBuilder()
313+
InstantiatingChannelProvider.Builder builder = InstantiatingChannelProvider.newBuilder()
320314
.setServiceAddress(hostAndPort.getHostText())
321-
.setPort(hostAndPort.getPort());
315+
.setPort(hostAndPort.getPort())
316+
.setClientLibHeader(getLibraryName(), firstNonNull(getLibraryVersion(), ""));
322317
Credentials scopedCredentials = getScopedCredentials();
323318
if (scopedCredentials != null && scopedCredentials != NoCredentials.getInstance()) {
324-
builder.provideCredentialsWith(scopedCredentials);
319+
builder.setCredentialsProvider(FixedCredentialsProvider.create(scopedCredentials));
325320
}
326-
return builder;
321+
return builder.build();
327322
}
328323

329324
/**

google-cloud-logging/src/main/java/com/google/cloud/logging/spi/DefaultLoggingRpc.java

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616

1717
package com.google.cloud.logging.spi;
1818

19-
import static com.google.common.base.MoreObjects.firstNonNull;
20-
21-
import com.google.api.gax.core.ConnectionSettings;
2219
import com.google.api.gax.grpc.ApiException;
20+
import com.google.api.gax.grpc.ChannelProvider;
21+
import com.google.api.gax.grpc.ExecutorProvider;
22+
import com.google.api.gax.grpc.FixedChannelProvider;
23+
import com.google.api.gax.grpc.FixedExecutorProvider;
24+
import com.google.api.gax.grpc.ProviderManager;
2325
import com.google.api.gax.grpc.UnaryCallSettings;
2426
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
27+
import com.google.cloud.NoCredentials;
2528
import com.google.cloud.logging.LoggingException;
2629
import com.google.cloud.logging.LoggingOptions;
2730
import com.google.cloud.logging.spi.v2.ConfigServiceV2Api;
@@ -73,6 +76,7 @@ public class DefaultLoggingRpc implements LoggingRpc {
7376
private final LoggingServiceV2Api loggingApi;
7477
private final MetricsServiceV2Api metricsApi;
7578
private final ScheduledExecutorService executor;
79+
private final ProviderManager providerManager;
7680
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
7781

7882
private boolean closed;
@@ -96,45 +100,46 @@ protected UnaryCallSettings.Builder getApiCallSettings() {
96100
}
97101

98102
@Override
99-
protected ConnectionSettings.Builder getConnectionSettings() {
100-
return super.getConnectionSettings();
103+
protected ChannelProvider getChannelProvider() {
104+
return super.getChannelProvider();
101105
}
102106
}
103107

104108
public DefaultLoggingRpc(LoggingOptions options) throws IOException {
105109
InternalLoggingOptions internalOptions = new InternalLoggingOptions(options);
106110
executorFactory = internalOptions.getExecutorFactory();
107111
executor = executorFactory.get();
108-
String libraryName = options.getLibraryName();
109-
String libraryVersion = firstNonNull(options.getLibraryVersion(), "");
110112
try {
111-
ConfigServiceV2Settings.Builder confBuilder = ConfigServiceV2Settings.defaultBuilder()
112-
.provideExecutorWith(executor, false)
113-
.setClientLibHeader(libraryName, libraryVersion);
114-
LoggingServiceV2Settings.Builder logBuilder = LoggingServiceV2Settings.defaultBuilder()
115-
.provideExecutorWith(executor, false)
116-
.setClientLibHeader(libraryName, libraryVersion);
117-
MetricsServiceV2Settings.Builder metricsBuilder = MetricsServiceV2Settings.defaultBuilder()
118-
.provideExecutorWith(executor, false)
119-
.setClientLibHeader(libraryName, libraryVersion);
120-
// todo(mziccard): PublisherSettings should support null/absent credentials for testing
121-
if (options.getHost().contains("localhost") || options.getCredentials() == null) {
122-
ManagedChannel channel = NettyChannelBuilder.forTarget(options.getHost())
113+
ExecutorProvider executorProvider = FixedExecutorProvider.create(executor);
114+
ChannelProvider channelProvider;
115+
// todo(mziccard): ChannelProvider should support null/absent credentials for testing
116+
if (options.getHost().contains("localhost")
117+
|| options.getCredentials().equals(NoCredentials.getInstance())) {
118+
ManagedChannel managedChannel = NettyChannelBuilder.forTarget(options.getHost())
123119
.negotiationType(NegotiationType.PLAINTEXT)
120+
.executor(executor)
124121
.build();
125-
confBuilder.provideChannelWith(channel, true);
126-
logBuilder.provideChannelWith(channel, true);
127-
metricsBuilder.provideChannelWith(channel, true);
122+
channelProvider = FixedChannelProvider.create(managedChannel);
128123
} else {
129-
ConnectionSettings connectionSettings = internalOptions.getConnectionSettings().build();
130-
confBuilder.provideChannelWith(connectionSettings);
131-
logBuilder.provideChannelWith(connectionSettings);
132-
metricsBuilder.provideChannelWith(connectionSettings);
124+
channelProvider = internalOptions.getChannelProvider();
133125
}
126+
providerManager = ProviderManager.newBuilder()
127+
.setChannelProvider(channelProvider)
128+
.setExecutorProvider(executorProvider)
129+
.build();
134130
UnaryCallSettings.Builder callSettingsBuilder = internalOptions.getApiCallSettings();
135-
confBuilder.applyToAllApiMethods(callSettingsBuilder);
136-
logBuilder.applyToAllApiMethods(callSettingsBuilder);
137-
metricsBuilder.applyToAllApiMethods(callSettingsBuilder);
131+
ConfigServiceV2Settings.Builder confBuilder = ConfigServiceV2Settings.defaultBuilder()
132+
.setExecutorProvider(providerManager)
133+
.setChannelProvider(providerManager)
134+
.applyToAllApiMethods(callSettingsBuilder);
135+
LoggingServiceV2Settings.Builder logBuilder = LoggingServiceV2Settings.defaultBuilder()
136+
.setExecutorProvider(providerManager)
137+
.setChannelProvider(providerManager)
138+
.applyToAllApiMethods(callSettingsBuilder);
139+
MetricsServiceV2Settings.Builder metricsBuilder = MetricsServiceV2Settings.defaultBuilder()
140+
.setExecutorProvider(providerManager)
141+
.setChannelProvider(providerManager)
142+
.applyToAllApiMethods(callSettingsBuilder);
138143
configApi = ConfigServiceV2Api.create(confBuilder.build());
139144
loggingApi = LoggingServiceV2Api.create(logBuilder.build());
140145
metricsApi = MetricsServiceV2Api.create(metricsBuilder.build());
@@ -245,6 +250,7 @@ public void close() throws Exception {
245250
configApi.close();
246251
loggingApi.close();
247252
metricsApi.close();
253+
providerManager.getChannel().shutdown();
248254
executorFactory.release(executor);
249255
}
250256
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616

1717
package com.google.cloud.pubsub.spi;
1818

19-
import static com.google.common.base.MoreObjects.firstNonNull;
20-
21-
import com.google.api.gax.core.ConnectionSettings;
2219
import com.google.api.gax.grpc.ApiException;
20+
import com.google.api.gax.grpc.ChannelProvider;
21+
import com.google.api.gax.grpc.ExecutorProvider;
22+
import com.google.api.gax.grpc.FixedChannelProvider;
23+
import com.google.api.gax.grpc.FixedExecutorProvider;
24+
import com.google.api.gax.grpc.ProviderManager;
2325
import com.google.api.gax.grpc.UnaryCallSettings;
2426
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
27+
import com.google.cloud.NoCredentials;
2528
import com.google.cloud.pubsub.PubSubException;
2629
import com.google.cloud.pubsub.PubSubOptions;
2730
import com.google.cloud.pubsub.spi.v1.PublisherApi;
@@ -75,6 +78,7 @@ public class DefaultPubSubRpc implements PubSubRpc {
7578
private final PublisherApi publisherApi;
7679
private final SubscriberApi subscriberApi;
7780
private final ScheduledExecutorService executor;
81+
private final ProviderManager providerManager;
7882
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
7983

8084
private boolean closed;
@@ -98,8 +102,8 @@ protected UnaryCallSettings.Builder getApiCallSettings() {
98102
}
99103

100104
@Override
101-
protected ConnectionSettings.Builder getConnectionSettings() {
102-
return super.getConnectionSettings();
105+
protected ChannelProvider getChannelProvider() {
106+
return super.getChannelProvider();
103107
}
104108
}
105109

@@ -131,30 +135,33 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException {
131135
InternalPubSubOptions internalOptions = new InternalPubSubOptions(options);
132136
executorFactory = internalOptions.getExecutorFactory();
133137
executor = executorFactory.get();
134-
String libraryName = options.getLibraryName();
135-
String libraryVersion = firstNonNull(options.getLibraryVersion(), "");
136138
try {
137-
PublisherSettings.Builder pubBuilder = PublisherSettings.defaultBuilder()
138-
.provideExecutorWith(executor, false)
139-
.setClientLibHeader(libraryName, libraryVersion);
140-
SubscriberSettings.Builder subBuilder = SubscriberSettings.defaultBuilder()
141-
.provideExecutorWith(executor, false)
142-
.setClientLibHeader(libraryName, libraryVersion);
143-
// todo(mziccard): PublisherSettings should support null/absent credentials for testing
144-
if (options.getHost().contains("localhost") || options.getCredentials() == null) {
145-
ManagedChannel channel = NettyChannelBuilder.forTarget(options.getHost())
139+
ExecutorProvider executorProvider = FixedExecutorProvider.create(executor);
140+
ChannelProvider channelProvider;
141+
// todo(mziccard): ChannelProvider should support null/absent credentials for testing
142+
if (options.getHost().contains("localhost")
143+
|| options.getCredentials().equals(NoCredentials.getInstance())) {
144+
ManagedChannel managedChannel = NettyChannelBuilder.forTarget(options.getHost())
146145
.negotiationType(NegotiationType.PLAINTEXT)
146+
.executor(executor)
147147
.build();
148-
pubBuilder.provideChannelWith(channel, true);
149-
subBuilder.provideChannelWith(channel, true);
148+
channelProvider = FixedChannelProvider.create(managedChannel);
150149
} else {
151-
ConnectionSettings connectionSettings = internalOptions.getConnectionSettings().build();
152-
pubBuilder.provideChannelWith(connectionSettings);
153-
subBuilder.provideChannelWith(connectionSettings);
150+
channelProvider = internalOptions.getChannelProvider();
154151
}
152+
providerManager = ProviderManager.newBuilder()
153+
.setChannelProvider(channelProvider)
154+
.setExecutorProvider(executorProvider)
155+
.build();
155156
UnaryCallSettings.Builder callSettingsBuilder = internalOptions.getApiCallSettings();
156-
pubBuilder.applyToAllApiMethods(callSettingsBuilder);
157-
subBuilder.applyToAllApiMethods(callSettingsBuilder);
157+
PublisherSettings.Builder pubBuilder = PublisherSettings.defaultBuilder()
158+
.setExecutorProvider(providerManager)
159+
.setChannelProvider(providerManager)
160+
.applyToAllApiMethods(callSettingsBuilder);
161+
SubscriberSettings.Builder subBuilder = SubscriberSettings.defaultBuilder()
162+
.setExecutorProvider(providerManager)
163+
.setChannelProvider(providerManager)
164+
.applyToAllApiMethods(callSettingsBuilder);
158165
publisherApi = PublisherApi.create(pubBuilder.build());
159166
subscriberApi = SubscriberApi.create(subBuilder.build());
160167
} catch (Exception ex) {
@@ -284,6 +291,7 @@ public void close() throws Exception {
284291
closed = true;
285292
subscriberApi.close();
286293
publisherApi.close();
294+
providerManager.getChannel().shutdown();
287295
executorFactory.release(executor);
288296
}
289297
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherApiTest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
package com.google.cloud.pubsub.spi.v1;
1616

1717
import com.google.api.gax.grpc.BundlingSettings;
18+
import com.google.api.gax.grpc.FixedChannelProvider;
1819
import com.google.cloud.pubsub.testing.LocalPubSubHelper;
1920
import com.google.protobuf.ByteString;
2021
import com.google.pubsub.v1.PubsubMessage;
2122
import com.google.pubsub.v1.PullResponse;
2223
import com.google.pubsub.v1.PushConfig;
2324
import com.google.pubsub.v1.Topic;
2425

26+
import io.grpc.ManagedChannel;
27+
2528
import org.joda.time.Duration;
2629
import org.junit.After;
2730
import org.junit.AfterClass;
@@ -30,8 +33,6 @@
3033
import org.junit.BeforeClass;
3134
import org.junit.Test;
3235

33-
import io.grpc.ManagedChannel;
34-
3536
import java.io.IOException;
3637
import java.util.ArrayList;
3738
import java.util.Collections;
@@ -42,6 +43,7 @@ public class PublisherApiTest {
4243
private PublisherApi publisherApi;
4344
private PublisherApi bundledPublisherApi;
4445
private SubscriberApi subscriberApi;
46+
private ManagedChannel channel;
4547

4648
@BeforeClass
4749
public static void startServer() throws IOException, InterruptedException {
@@ -56,11 +58,12 @@ public static void stopServer() throws IOException, InterruptedException {
5658

5759
@Before
5860
public void setUp() throws Exception {
59-
ManagedChannel channel = pubsubHelper.createChannel();
61+
channel = pubsubHelper.createChannel();
62+
FixedChannelProvider fixedChannelProvider = FixedChannelProvider.create(channel);
6063

6164
PublisherSettings publisherSettings =
6265
PublisherSettings.defaultBuilder()
63-
.provideChannelWith(channel, true)
66+
.setChannelProvider(fixedChannelProvider)
6467
.build();
6568
publisherApi = PublisherApi.create(publisherSettings);
6669

@@ -71,15 +74,15 @@ public void setUp() throws Exception {
7174

7275
PublisherSettings.Builder bundledPublisherSettingsBuilder = PublisherSettings.defaultBuilder();
7376
bundledPublisherSettingsBuilder
74-
.provideChannelWith(channel, true)
77+
.setChannelProvider(fixedChannelProvider)
7578
.publishSettings()
7679
.setBundlingSettingsBuilder(bundlingSettings);
7780

7881
PublisherSettings bundledPublisherSettings = bundledPublisherSettingsBuilder.build();
7982
bundledPublisherApi = PublisherApi.create(bundledPublisherSettings);
8083

8184
SubscriberSettings subscriberSettings = SubscriberSettings.defaultBuilder()
82-
.provideChannelWith(channel, true)
85+
.setChannelProvider(fixedChannelProvider)
8386
.build();
8487
subscriberApi = SubscriberApi.create(subscriberSettings);
8588
}
@@ -95,6 +98,9 @@ public void tearDown() throws Exception {
9598
if (bundledPublisherApi != null) {
9699
bundledPublisherApi.close();
97100
}
101+
if (channel != null) {
102+
channel.shutdown();
103+
}
98104
pubsubHelper.reset();
99105
}
100106

0 commit comments

Comments
 (0)