Skip to content

Commit 7e291f6

Browse files
olavloitekolea2
authored andcommitted
---
yaml --- r: 28749 b: refs/heads/autosynth-dlp c: 52dc8d0 h: refs/heads/master i: 28747: e2d249f
1 parent a183f0c commit 7e291f6

5 files changed

Lines changed: 368 additions & 70 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ refs/tags/v0.60.0: 4cd518d0612329f8a8e53484eef4cd1651e32855
103103
refs/tags/v0.61.0: e4b526656bb1bf5eefd0ee578b7405147821225e
104104
refs/tags/v0.62.0: bbede7385d48ba08f487bdd29ec10668ace96396
105105
refs/heads/0.60.0-alpha: 10939381ffe0b8da32db4fe3087c86e3aa7f3e55
106-
refs/heads/autosynth-dlp: 9285b8df8b826d4d2fc9fc866e4a8415bce21393
106+
refs/heads/autosynth-dlp: 52dc8d08fde544080ad29a42dc682cf3eacdb4d4
107107
refs/heads/autosynth-logging: eca54b98c8cf82050bbdfc5c19139673dff9e5b8
108108
refs/heads/dupes: 3478c5d81fd242d0e985656645a679420a2060c2
109109
refs/tags/v0.63.0: 94f19b71d40f46b36120e7b9d78a1a3d41bfcbd6

branches/autosynth-dlp/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import com.google.api.core.ApiFunction;
1920
import com.google.api.gax.grpc.GrpcInterceptorProvider;
2021
import com.google.api.gax.rpc.TransportChannelProvider;
2122
import com.google.cloud.ServiceDefaults;
@@ -29,6 +30,7 @@
2930
import com.google.common.base.Preconditions;
3031
import com.google.common.collect.ImmutableMap;
3132
import com.google.common.collect.ImmutableSet;
33+
import io.grpc.ManagedChannelBuilder;
3234
import java.net.MalformedURLException;
3335
import java.net.URL;
3436
import java.util.Map;
@@ -46,6 +48,10 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
4648
"https://www.googleapis.com/auth/spanner.data");
4749
private static final int MAX_CHANNELS = 256;
4850
private final TransportChannelProvider channelProvider;
51+
52+
@SuppressWarnings("rawtypes")
53+
private final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
54+
4955
private final GrpcInterceptorProvider interceptorProvider;
5056
private final SessionPoolOptions sessionPoolOptions;
5157
private final int prefetchChunks;
@@ -82,6 +88,7 @@ private SpannerOptions(Builder builder) {
8288
numChannels);
8389

8490
channelProvider = builder.channelProvider;
91+
channelConfigurator = builder.channelConfigurator;
8592
interceptorProvider = builder.interceptorProvider;
8693
sessionPoolOptions =
8794
builder.sessionPoolOptions != null
@@ -96,6 +103,10 @@ public static class Builder
96103
extends ServiceOptions.Builder<Spanner, SpannerOptions, SpannerOptions.Builder> {
97104
private static final int DEFAULT_PREFETCH_CHUNKS = 4;
98105
private TransportChannelProvider channelProvider;
106+
107+
@SuppressWarnings("rawtypes")
108+
private ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
109+
99110
private GrpcInterceptorProvider interceptorProvider;
100111

101112
/** By default, we create 4 channels per {@link SpannerOptions} */
@@ -114,6 +125,7 @@ private Builder() {}
114125
this.prefetchChunks = options.prefetchChunks;
115126
this.sessionLabels = options.sessionLabels;
116127
this.channelProvider = options.channelProvider;
128+
this.channelConfigurator = options.channelConfigurator;
117129
this.interceptorProvider = options.interceptorProvider;
118130
}
119131

@@ -135,6 +147,17 @@ public Builder setChannelProvider(TransportChannelProvider channelProvider) {
135147
return this;
136148
}
137149

150+
/**
151+
* Sets an {@link ApiFunction} that will be used to configure the transport channel. This will
152+
* only be used if no custom {@link TransportChannelProvider} has been set.
153+
*/
154+
public Builder setChannelConfigurator(
155+
@SuppressWarnings("rawtypes")
156+
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator) {
157+
this.channelConfigurator = channelConfigurator;
158+
return this;
159+
}
160+
138161
/**
139162
* Sets the {@code GrpcInterceptorProvider}. {@link GapicSpannerRpc} would create a default one
140163
* if none is provided.
@@ -214,6 +237,11 @@ public TransportChannelProvider getChannelProvider() {
214237
return channelProvider;
215238
}
216239

240+
@SuppressWarnings("rawtypes")
241+
public ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> getChannelConfigurator() {
242+
return channelConfigurator;
243+
}
244+
217245
public GrpcInterceptorProvider getInterceptorProvider() {
218246
return interceptorProvider;
219247
}

branches/autosynth-dlp/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.api.core.ApiFunction;
2222
import com.google.api.core.NanoClock;
2323
import com.google.api.gax.core.CredentialsProvider;
24+
import com.google.api.gax.core.ExecutorProvider;
2425
import com.google.api.gax.core.GaxProperties;
2526
import com.google.api.gax.grpc.GaxGrpcProperties;
2627
import com.google.api.gax.grpc.GrpcCallContext;
@@ -102,18 +103,57 @@
102103
import com.google.spanner.v1.Session;
103104
import com.google.spanner.v1.Transaction;
104105
import io.grpc.Context;
106+
import java.util.LinkedList;
105107
import java.util.List;
106108
import java.util.Map;
107109
import java.util.concurrent.CancellationException;
108110
import java.util.concurrent.ExecutionException;
109111
import java.util.concurrent.Executors;
110112
import java.util.concurrent.Future;
111113
import java.util.concurrent.ScheduledExecutorService;
114+
import java.util.concurrent.ScheduledThreadPoolExecutor;
115+
import java.util.concurrent.ThreadFactory;
112116
import javax.annotation.Nullable;
113117
import org.threeten.bp.Duration;
114118

115119
/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
116120
public class GapicSpannerRpc implements SpannerRpc {
121+
/**
122+
* {@link ExecutorProvider} that keeps track of the executors that are created and shuts these
123+
* down when the {@link SpannerRpc} is closed.
124+
*/
125+
private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {
126+
private static final int DEFAULT_THREAD_COUNT = 4;
127+
private final List<ScheduledExecutorService> executors = new LinkedList<>();
128+
private final ThreadFactory threadFactory;
129+
130+
private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) {
131+
this.threadFactory = threadFactory;
132+
}
133+
134+
@Override
135+
public boolean shouldAutoClose() {
136+
return false;
137+
}
138+
139+
@Override
140+
public ScheduledExecutorService getExecutor() {
141+
ScheduledExecutorService executor =
142+
new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT, threadFactory);
143+
synchronized (this) {
144+
executors.add(executor);
145+
}
146+
return executor;
147+
}
148+
149+
/** Shuts down all executors that have been created by this {@link ExecutorProvider}. */
150+
private synchronized void shutdown() {
151+
for (ScheduledExecutorService executor : executors) {
152+
executor.shutdown();
153+
}
154+
}
155+
}
156+
117157
private static final PathTemplate PROJECT_NAME_TEMPLATE =
118158
PathTemplate.create("projects/{project}");
119159
private static final PathTemplate OPERATION_NAME_TEMPLATE =
@@ -127,6 +167,7 @@ public class GapicSpannerRpc implements SpannerRpc {
127167
private static final int DEFAULT_TIMEOUT_SECONDS = 30 * 60;
128168
private static final int DEFAULT_PERIOD_SECONDS = 10;
129169

170+
private final ManagedInstantiatingExecutorProvider executorProvider;
130171
private final SpannerStub spannerStub;
131172
private final InstanceAdminStub instanceAdminStub;
132173
private final DatabaseAdminStub databaseAdminStub;
@@ -170,16 +211,25 @@ public GapicSpannerRpc(SpannerOptions options) {
170211
mergedHeaderProvider.getHeaders(),
171212
internalHeaderProviderBuilder.getResourceHeaderKey());
172213

214+
// Create a managed executor provider.
215+
this.executorProvider =
216+
new ManagedInstantiatingExecutorProvider(
217+
new ThreadFactoryBuilder()
218+
.setDaemon(true)
219+
.setNameFormat("Cloud-Spanner-TransportChannel-%d")
220+
.build());
173221
// First check if SpannerOptions provides a TransportChannerProvider. Create one
174222
// with information gathered from SpannerOptions if none is provided
175223
TransportChannelProvider channelProvider =
176224
MoreObjects.firstNonNull(
177225
options.getChannelProvider(),
178226
InstantiatingGrpcChannelProvider.newBuilder()
227+
.setChannelConfigurator(options.getChannelConfigurator())
179228
.setEndpoint(options.getEndpoint())
180229
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
181230
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
182231
.setPoolSize(options.getNumChannels())
232+
.setExecutorProvider(executorProvider)
183233

184234
// Then check if SpannerOptions provides an InterceptorProvider. Create a default
185235
// SpannerInterceptorProvider if none is provided
@@ -585,6 +635,7 @@ public void shutdown() {
585635
this.instanceAdminStub.close();
586636
this.databaseAdminStub.close();
587637
this.spannerWatchdog.shutdown();
638+
this.executorProvider.shutdown();
588639
}
589640

590641
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Copyright 2019 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.it;
18+
19+
import static org.hamcrest.CoreMatchers.equalTo;
20+
import static org.hamcrest.CoreMatchers.is;
21+
import static org.junit.Assert.assertThat;
22+
23+
import com.google.cloud.spanner.Database;
24+
import com.google.cloud.spanner.DatabaseAdminClient;
25+
import com.google.cloud.spanner.DatabaseClient;
26+
import com.google.cloud.spanner.InstanceAdminClient;
27+
import com.google.cloud.spanner.IntegrationTest;
28+
import com.google.cloud.spanner.IntegrationTestEnv;
29+
import com.google.cloud.spanner.ResultSet;
30+
import com.google.cloud.spanner.Spanner;
31+
import com.google.cloud.spanner.SpannerOptions;
32+
import com.google.cloud.spanner.Statement;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import java.util.regex.Pattern;
36+
import org.junit.After;
37+
import org.junit.Before;
38+
import org.junit.ClassRule;
39+
import org.junit.Rule;
40+
import org.junit.Test;
41+
import org.junit.experimental.categories.Category;
42+
import org.junit.rules.ExpectedException;
43+
import org.junit.runner.RunWith;
44+
import org.junit.runners.JUnit4;
45+
46+
@Category(IntegrationTest.class)
47+
@RunWith(JUnit4.class)
48+
public class ITSpannerOptionsTest {
49+
@ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
50+
@Rule public ExpectedException expectedException = ExpectedException.none();
51+
private static Database db;
52+
53+
@Before
54+
public void setUp() throws Exception {
55+
db = env.getTestHelper().createTestDatabase();
56+
}
57+
58+
@After
59+
public void tearDown() throws Exception {
60+
db.drop();
61+
}
62+
63+
private static final int NUMBER_OF_TEST_RUNS = 2;
64+
private static final int DEFAULT_NUM_CHANNELS = 4;
65+
private static final int NUM_THREADS_PER_CHANNEL = 4;
66+
private static final String SPANNER_THREAD_NAME = "Cloud-Spanner-TransportChannel";
67+
private static final String THREAD_PATTERN = "%s-[0-9]+";
68+
69+
@Test
70+
public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException {
71+
// The IT environment has already started some worker threads.
72+
int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME);
73+
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
74+
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME), is(equalTo(baseThreadCount)));
75+
// Create Spanner instance.
76+
// We make a copy of the options instance, as SpannerOptions caches any service object
77+
// that has been handed out.
78+
SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
79+
Spanner spanner = options.getService();
80+
// Get a database client and do a query. This should initiate threads for the Spanner service.
81+
DatabaseClient client = spanner.getDatabaseClient(db.getId());
82+
List<ResultSet> resultSets = new ArrayList<>();
83+
// SpannerStub affiliates a channel with a session, so we need to use multiple sessions
84+
// to ensure we also hit multiple channels.
85+
for (int i2 = 0; i2 < options.getSessionPoolOptions().getMaxSessions(); i2++) {
86+
ResultSet rs =
87+
client
88+
.singleUse()
89+
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2"));
90+
// Execute ResultSet#next() to send the query to Spanner.
91+
rs.next();
92+
// Delay closing the result set in order to force the use of multiple sessions.
93+
// As each session is linked to one transport channel, using multiple different
94+
// sessions should initialize multiple transport channels.
95+
resultSets.add(rs);
96+
// Check whether the number of expected threads has been reached.
97+
if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME)
98+
== DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount) {
99+
break;
100+
}
101+
}
102+
for (ResultSet rs : resultSets) {
103+
rs.close();
104+
}
105+
// Check the number of threads after the query. Doing a request should initialize a thread
106+
// pool for the underlying SpannerClient.
107+
assertThat(
108+
getNumberOfThreadsWithName(SPANNER_THREAD_NAME),
109+
is(equalTo(DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount)));
110+
111+
// Then do a request to the InstanceAdmin service and check the number of threads.
112+
// Doing a request should initialize a thread pool for the underlying InstanceAdminClient.
113+
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) {
114+
InstanceAdminClient instanceAdminClient = spanner.getInstanceAdminClient();
115+
instanceAdminClient.listInstances();
116+
}
117+
assertThat(
118+
getNumberOfThreadsWithName(SPANNER_THREAD_NAME),
119+
is(equalTo(2 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount)));
120+
121+
// Then do a request to the DatabaseAdmin service and check the number of threads.
122+
// Doing a request should initialize a thread pool for the underlying DatabaseAdminClient.
123+
for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS * 2; i2++) {
124+
DatabaseAdminClient databaseAdminClient = spanner.getDatabaseAdminClient();
125+
databaseAdminClient.listDatabases(db.getId().getInstanceId().getInstance());
126+
}
127+
assertThat(
128+
getNumberOfThreadsWithName(SPANNER_THREAD_NAME),
129+
is(equalTo(3 * DEFAULT_NUM_CHANNELS * NUM_THREADS_PER_CHANNEL + baseThreadCount)));
130+
131+
// Now close the Spanner instance and check whether the threads are shutdown or not.
132+
spanner.close();
133+
// Wait a little to allow the threads to actually shutdown.
134+
Thread.sleep(200L);
135+
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME), is(equalTo(baseThreadCount)));
136+
}
137+
}
138+
139+
@Test
140+
public void testMultipleSpannersFromSameSpannerOptions() throws InterruptedException {
141+
int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME);
142+
SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
143+
try (Spanner spanner1 = options.getService()) {
144+
// Having both in the try-with-resources block is not possible, as it is the same instance.
145+
// One will be closed before the other, and the closing of the second instance would fail.
146+
Spanner spanner2 = options.getService();
147+
assertThat(spanner1 == spanner2, is(true));
148+
DatabaseClient client1 = spanner1.getDatabaseClient(db.getId());
149+
DatabaseClient client2 = spanner2.getDatabaseClient(db.getId());
150+
assertThat(client1 == client2, is(true));
151+
try (ResultSet rs1 =
152+
client1
153+
.singleUse()
154+
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2"));
155+
ResultSet rs2 =
156+
client2
157+
.singleUse()
158+
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2")); ) {
159+
while (rs1.next() && rs2.next()) {
160+
// Do nothing, just consume the result sets.
161+
}
162+
}
163+
}
164+
Thread.sleep(200L);
165+
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME), is(equalTo(baseThreadCount)));
166+
}
167+
168+
private int getNumberOfThreadsWithName(String serviceName) {
169+
Pattern pattern = Pattern.compile(String.format(THREAD_PATTERN, serviceName));
170+
ThreadGroup group = Thread.currentThread().getThreadGroup();
171+
while (group.getParent() != null) {
172+
group = group.getParent();
173+
}
174+
Thread[] threads = new Thread[100 * NUMBER_OF_TEST_RUNS];
175+
int numberOfThreads = group.enumerate(threads);
176+
int res = 0;
177+
for (int i = 0; i < numberOfThreads; i++) {
178+
if (pattern.matcher(threads[i].getName()).matches()) {
179+
res++;
180+
}
181+
}
182+
return res;
183+
}
184+
}

0 commit comments

Comments
 (0)