Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit 3d549ed

Browse files
committed
chore: add DirectPath fallback integration test
1 parent 0c30632 commit 3d549ed

2 files changed

Lines changed: 314 additions & 0 deletions

File tree

google-cloud-spanner/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,16 @@
142142
<groupId>io.grpc</groupId>
143143
<artifactId>grpc-protobuf</artifactId>
144144
</dependency>
145+
<dependency>
146+
<groupId>io.grpc</groupId>
147+
<artifactId>grpc-alts</artifactId>
148+
<scope>runtime</scope>
149+
</dependency>
150+
<!--
151+
grpc-stub is needed directly by our tests and transitively by grpc-alts at runtime.
152+
So it has to be declared as a direct dependency and to avoid overriding grpc-alts'
153+
runtime requirement it has to be promoted to the runtime scope.
154+
-->
145155
<dependency>
146156
<groupId>io.grpc</groupId>
147157
<artifactId>grpc-stub</artifactId>
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.it;
18+
19+
import static com.google.common.truth.Truth.assertWithMessage;
20+
import static com.google.common.truth.TruthJUnit.assume;
21+
22+
import com.google.api.core.ApiFunction;
23+
import com.google.api.gax.core.FixedCredentialsProvider;
24+
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
25+
import com.google.auth.oauth2.ComputeEngineCredentials;
26+
import com.google.cloud.spanner.Database;
27+
import com.google.cloud.spanner.DatabaseClient;
28+
import com.google.cloud.spanner.IntegrationTestEnv;
29+
import com.google.cloud.spanner.Key;
30+
import com.google.cloud.spanner.Mutation;
31+
import com.google.cloud.spanner.ParallelIntegrationTest;
32+
import com.google.cloud.spanner.SpannerOptions;
33+
import com.google.cloud.spanner.TimestampBound;
34+
import com.google.cloud.spanner.testing.RemoteSpannerHelper;
35+
import com.google.common.base.Stopwatch;
36+
import io.grpc.ManagedChannelBuilder;
37+
import io.grpc.alts.ComputeEngineChannelBuilder;
38+
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
39+
import io.grpc.netty.shaded.io.netty.channel.ChannelDuplexHandler;
40+
import io.grpc.netty.shaded.io.netty.channel.ChannelFactory;
41+
import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
42+
import io.grpc.netty.shaded.io.netty.channel.ChannelPromise;
43+
import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
44+
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
45+
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel;
46+
import io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil;
47+
import java.io.IOException;
48+
import java.lang.reflect.Field;
49+
import java.net.InetAddress;
50+
import java.net.InetSocketAddress;
51+
import java.net.SocketAddress;
52+
import java.util.ArrayList;
53+
import java.util.Arrays;
54+
import java.util.List;
55+
import java.util.concurrent.TimeUnit;
56+
import java.util.concurrent.TimeoutException;
57+
import java.util.concurrent.atomic.AtomicBoolean;
58+
import java.util.concurrent.atomic.AtomicInteger;
59+
import org.junit.After;
60+
import org.junit.Before;
61+
import org.junit.ClassRule;
62+
import org.junit.Test;
63+
import org.junit.experimental.categories.Category;
64+
import org.junit.runner.RunWith;
65+
import org.junit.runners.JUnit4;
66+
67+
/**
68+
* Test DirectPath fallback behavior by injecting a ChannelHandler into the netty stack that will
69+
* disrupt IPv6 communications.
70+
*
71+
* <p>WARNING: this test can only be run on a GCE VM and will explicitly ignore
72+
* GOOGLE_APPLICATION_CREDENTIALS and use the service account associated with the VM.
73+
*/
74+
@Category(ParallelIntegrationTest.class)
75+
@RunWith(JUnit4.class)
76+
public class ITDirectPathFallback {
77+
// A threshold of completed read calls to observe to ascertain IPv6 is working.
78+
// This was determined experimentally to account for both gRPC-LB RPCs and Bigtable api RPCs.
79+
private static final int MIN_COMPLETE_READ_CALLS = 40;
80+
private static final int NUM_RPCS_TO_SEND = 20;
81+
82+
// IP address prefixes allocated for DirectPath backends.
83+
private static final String DP_IPV6_PREFIX = "2001:4860:8040";
84+
private static final String DP_IPV4_PREFIX = "34.126";
85+
86+
@ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
87+
88+
private AtomicBoolean blackholeDpAddr = new AtomicBoolean();
89+
private AtomicInteger numBlocked = new AtomicInteger();
90+
private AtomicInteger numDpAddrRead = new AtomicInteger();
91+
private boolean isDpAddr;
92+
93+
private ChannelFactory<NioSocketChannel> channelFactory;
94+
private EventLoopGroup eventLoopGroup;
95+
private RemoteSpannerHelper testHelper;
96+
97+
private static final String TABLE_NAME = "TestTable";
98+
private static final List<String> ALL_COLUMNS = Arrays.asList("Key", "StringValue");
99+
private static Database db;
100+
private static DatabaseClient client;
101+
102+
// TODO(mohanli): Remove this temporary endpoint once DirectPath goes to public beta.
103+
private static final String DIRECT_PATH_ENDPOINT = "aa423245250f2bbf.sandbox.googleapis.com:443";
104+
private static final String ATTEMPT_DIRECT_PATH = "spanner.attempt_directpath";
105+
106+
public ITDirectPathFallback() {
107+
// Create a transport channel provider that can intercept ipv6 packets.
108+
channelFactory = new MyChannelFactory();
109+
eventLoopGroup = new NioEventLoopGroup();
110+
}
111+
112+
@Before
113+
public void setup() throws IOException, Throwable {
114+
assume()
115+
.withMessage("DirectPath integration tests can only run against DirectPathEnv")
116+
.that(Boolean.getBoolean(ATTEMPT_DIRECT_PATH))
117+
.isTrue();
118+
// Get default spanner options for Ingetration test
119+
SpannerOptions.Builder builder = env.getTestHelper().getOptions().toBuilder();
120+
// Set instrumented transport provider
121+
builder.setChannelProvider(
122+
InstantiatingGrpcChannelProvider.newBuilder()
123+
.setAttemptDirectPath(true)
124+
.setEndpoint(DIRECT_PATH_ENDPOINT)
125+
.setPoolSize(1)
126+
.setChannelConfigurator(
127+
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
128+
@Override
129+
public ManagedChannelBuilder apply(ManagedChannelBuilder builder) {
130+
injectNettyChannelHandler(builder);
131+
// Fail fast when blackhole is active
132+
builder.keepAliveTime(1, TimeUnit.SECONDS);
133+
builder.keepAliveTimeout(1, TimeUnit.SECONDS);
134+
return builder;
135+
}
136+
})
137+
.build());
138+
// Forcefully ignore GOOGLE_APPLICATION_CREDENTIALS
139+
builder.setCredentials(
140+
FixedCredentialsProvider.create(ComputeEngineCredentials.create()).getCredentials());
141+
142+
// Create a new testHelper with the instrumented transport provider
143+
testHelper = RemoteSpannerHelper.create(builder.build(), env.getTestHelper().getInstanceId());
144+
145+
db =
146+
testHelper.createTestDatabase(
147+
"CREATE TABLE TestTable ("
148+
+ " Key STRING(MAX) NOT NULL,"
149+
+ " StringValue STRING(MAX),"
150+
+ ") PRIMARY KEY (Key)");
151+
client = testHelper.getDatabaseClient(db);
152+
List<Mutation> mutations = new ArrayList<>();
153+
for (int i = 0; i < 3; ++i) {
154+
mutations.add(
155+
Mutation.newInsertOrUpdateBuilder(TABLE_NAME)
156+
.set("Key")
157+
.to("k" + i)
158+
.set("StringValue")
159+
.to("v" + i)
160+
.build());
161+
}
162+
client.write(mutations);
163+
}
164+
165+
@After
166+
public void teardown() {
167+
if (testHelper != null) {
168+
testHelper.cleanUp();
169+
testHelper.getClient().close();
170+
}
171+
if (eventLoopGroup != null) {
172+
eventLoopGroup.shutdownGracefully();
173+
}
174+
}
175+
176+
@Test
177+
public void testFallback() throws InterruptedException, TimeoutException {
178+
// Precondition: wait for DirectPath to connect
179+
assertWithMessage("Failed to observe RPCs over DirectPath").that(exerciseDirectPath()).isTrue();
180+
181+
// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
182+
blackholeDpAddr.set(true);
183+
184+
// Send a request, which should be routed over IPv4 and CFE.
185+
client.singleUse(TimestampBound.strong()).readRow(TABLE_NAME, Key.of("k0"), ALL_COLUMNS);
186+
187+
// Verify that the above check was meaningful, by verifying that the blackhole actually dropped
188+
// packets.
189+
assertWithMessage("Failed to detect any IPv6 traffic in blackhole")
190+
.that(numBlocked.get())
191+
.isGreaterThan(0);
192+
193+
// Make sure that the client will start reading from IPv6 again by sending new requests and
194+
// checking the injected IPv6 counter has been updated.
195+
blackholeDpAddr.set(false);
196+
197+
assertWithMessage("Failed to upgrade back to DirectPath").that(exerciseDirectPath()).isTrue();
198+
}
199+
200+
private boolean exerciseDirectPath() throws InterruptedException, TimeoutException {
201+
Stopwatch stopwatch = Stopwatch.createStarted();
202+
numDpAddrRead.set(0);
203+
204+
boolean seenEnough = false;
205+
206+
while (!seenEnough && stopwatch.elapsed(TimeUnit.MINUTES) < 2) {
207+
for (int i = 0; i < NUM_RPCS_TO_SEND; i++) {
208+
client.singleUse(TimestampBound.strong()).readRow(TABLE_NAME, Key.of("k0"), ALL_COLUMNS);
209+
}
210+
Thread.sleep(100);
211+
seenEnough = numDpAddrRead.get() >= MIN_COMPLETE_READ_CALLS;
212+
}
213+
return seenEnough;
214+
}
215+
216+
/**
217+
* This is a giant hack to enable testing DirectPath CFE fallback.
218+
*
219+
* <p>It unwraps the {@link ComputeEngineChannelBuilder} to inject a NettyChannelHandler to signal
220+
* IPv6 packet loss.
221+
*/
222+
private void injectNettyChannelHandler(ManagedChannelBuilder<?> channelBuilder) {
223+
try {
224+
// Extract the delegate NettyChannelBuilder using reflection
225+
Field delegateField = ComputeEngineChannelBuilder.class.getDeclaredField("delegate");
226+
delegateField.setAccessible(true);
227+
228+
ComputeEngineChannelBuilder gceChannelBuilder =
229+
((ComputeEngineChannelBuilder) channelBuilder);
230+
Object delegateChannelBuilder = delegateField.get(gceChannelBuilder);
231+
232+
NettyChannelBuilder nettyChannelBuilder = (NettyChannelBuilder) delegateChannelBuilder;
233+
nettyChannelBuilder.channelFactory(channelFactory);
234+
nettyChannelBuilder.eventLoopGroup(eventLoopGroup);
235+
} catch (NoSuchFieldException | IllegalAccessException e) {
236+
throw new RuntimeException("Failed to inject the netty ChannelHandler", e);
237+
}
238+
}
239+
240+
/** @see com.google.cloud.bigtable.data.v2.it.DirectPathFallbackIT.MyChannelHandler */
241+
private class MyChannelFactory implements ChannelFactory<NioSocketChannel> {
242+
@Override
243+
public NioSocketChannel newChannel() {
244+
NioSocketChannel channel = new NioSocketChannel();
245+
channel.pipeline().addLast(new MyChannelHandler());
246+
247+
return channel;
248+
}
249+
}
250+
251+
/**
252+
* A netty {@link io.grpc.netty.shaded.io.netty.channel.ChannelHandler} that can be instructed to
253+
* make IPv6 packets disappear
254+
*/
255+
private class MyChannelHandler extends ChannelDuplexHandler {
256+
@Override
257+
public void connect(
258+
ChannelHandlerContext ctx,
259+
SocketAddress remoteAddress,
260+
SocketAddress localAddress,
261+
ChannelPromise promise)
262+
throws Exception {
263+
264+
if (remoteAddress instanceof InetSocketAddress) {
265+
InetAddress inetAddress = ((InetSocketAddress) remoteAddress).getAddress();
266+
String addr = inetAddress.getHostAddress();
267+
isDpAddr = addr.startsWith(DP_IPV6_PREFIX) || addr.startsWith(DP_IPV4_PREFIX);
268+
}
269+
270+
if (!(isDpAddr && blackholeDpAddr.get())) {
271+
super.connect(ctx, remoteAddress, localAddress, promise);
272+
} else {
273+
// Fail the connection fast
274+
promise.setFailure(new IOException("fake error"));
275+
}
276+
}
277+
278+
@Override
279+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
280+
boolean dropCall = isDpAddr && blackholeDpAddr.get();
281+
if (dropCall) {
282+
// Don't notify the next handler and increment counter
283+
numBlocked.incrementAndGet();
284+
ReferenceCountUtil.release(msg);
285+
} else {
286+
super.channelRead(ctx, msg);
287+
}
288+
}
289+
290+
@Override
291+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
292+
boolean dropCall = isDpAddr && blackholeDpAddr.get();
293+
if (dropCall) {
294+
// Don't notify the next handler and increment counter
295+
numBlocked.incrementAndGet();
296+
} else {
297+
if (isDpAddr) {
298+
numDpAddrRead.incrementAndGet();
299+
}
300+
super.channelReadComplete(ctx);
301+
}
302+
}
303+
}
304+
}

0 commit comments

Comments
 (0)