Skip to content

Commit b1f62e5

Browse files
committed
sync context
1 parent 169dacc commit b1f62e5

4 files changed

Lines changed: 61 additions & 20 deletions

File tree

xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import io.grpc.Context;
23+
import io.grpc.SynchronizationContext;
2324
import io.grpc.internal.ExponentialBackoffPolicy;
2425
import io.grpc.internal.GrpcUtil;
2526
import io.grpc.internal.ObjectPool;
@@ -32,6 +33,8 @@
3233
import java.util.Map;
3334
import java.util.concurrent.ScheduledExecutorService;
3435
import java.util.concurrent.atomic.AtomicReference;
36+
import java.util.logging.Level;
37+
import java.util.logging.Logger;
3538
import javax.annotation.Nullable;
3639
import javax.annotation.concurrent.GuardedBy;
3740
import javax.annotation.concurrent.ThreadSafe;
@@ -42,19 +45,37 @@
4245
*/
4346
@ThreadSafe
4447
final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
45-
48+
private static final Logger log = Logger.getLogger(SharedXdsClientPoolProvider.class.getName());
4649
private final Bootstrapper bootstrapper;
4750
private final Object lock = new Object();
4851
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
4952
private volatile ObjectPool<XdsClient> xdsClientPool;
53+
private final SynchronizationContext syncContext;
5054

5155
SharedXdsClientPoolProvider() {
5256
this(new BootstrapperImpl());
5357
}
5458

59+
SharedXdsClientPoolProvider(SynchronizationContext syncContext) {
60+
this(new BootstrapperImpl(), syncContext);
61+
}
62+
5563
@VisibleForTesting
5664
SharedXdsClientPoolProvider(Bootstrapper bootstrapper) {
65+
this(bootstrapper, new SynchronizationContext(
66+
new Thread.UncaughtExceptionHandler() {
67+
@Override
68+
public void uncaughtException(Thread t, Throwable e) {
69+
log.log(Level.WARNING,
70+
"Uncaught exception in XdsClient SynchronizationContext. Panic!", e);
71+
throw new AssertionError(e);
72+
}
73+
}));
74+
}
75+
76+
SharedXdsClientPoolProvider(Bootstrapper bootstrapper, SynchronizationContext syncContext) {
5777
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
78+
this.syncContext = checkNotNull(syncContext, "syncContext");
5879
}
5980

6081
static SharedXdsClientPoolProvider getDefaultProvider() {
@@ -89,7 +110,7 @@ public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
89110
if (bootstrapInfo.servers().isEmpty()) {
90111
throw new XdsInitializationException("No xDS server provided");
91112
}
92-
ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
113+
ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo, syncContext);
93114
}
94115
}
95116
}
@@ -112,10 +133,27 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
112133
private XdsClient xdsClient;
113134
@GuardedBy("lock")
114135
private int refCount;
136+
private final SynchronizationContext syncContext;
115137

116138
@VisibleForTesting
117139
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo) {
140+
this(bootstrapInfo, new SynchronizationContext(
141+
new Thread.UncaughtExceptionHandler() {
142+
@Override
143+
public void uncaughtException(Thread t, Throwable e) {
144+
log.log(
145+
Level.WARNING,
146+
"Uncaught exception in XdsClient SynchronizationContext. Panic!",
147+
e);
148+
// TODO(chengyuanzhang): better error handling.
149+
throw new AssertionError(e);
150+
}
151+
}));
152+
}
153+
154+
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, SynchronizationContext syncContext) {
118155
this.bootstrapInfo = checkNotNull(bootstrapInfo);
156+
this.syncContext = checkNotNull(syncContext);
119157
}
120158

121159
@Override
@@ -131,7 +169,8 @@ public XdsClient getObject() {
131169
new ExponentialBackoffPolicy.Provider(),
132170
GrpcUtil.STOPWATCH_SUPPLIER,
133171
TimeProvider.SYSTEM_TIME_PROVIDER,
134-
new TlsContextManagerImpl(bootstrapInfo));
172+
new TlsContextManagerImpl(bootstrapInfo),
173+
syncContext);
135174
}
136175
refCount++;
137176
return xdsClient;

xds/src/main/java/io/grpc/xds/XdsClientImpl.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,7 @@ final class XdsClientImpl extends XdsClient
7878
// Longest time to wait, since the subscription to some resource, for concluding its absence.
7979
@VisibleForTesting
8080
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
81-
private final SynchronizationContext syncContext = new SynchronizationContext(
82-
new Thread.UncaughtExceptionHandler() {
83-
@Override
84-
public void uncaughtException(Thread t, Throwable e) {
85-
logger.log(
86-
XdsLogLevel.ERROR,
87-
"Uncaught exception in XdsClient SynchronizationContext. Panic!",
88-
e);
89-
// TODO(chengyuanzhang): better error handling.
90-
throw new AssertionError(e);
91-
}
92-
});
81+
private final SynchronizationContext syncContext;
9382
private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
9483
private final LoadBalancerRegistry loadBalancerRegistry
9584
= LoadBalancerRegistry.getDefaultRegistry();
@@ -120,7 +109,8 @@ public void uncaughtException(Thread t, Throwable e) {
120109
BackoffPolicy.Provider backoffPolicyProvider,
121110
Supplier<Stopwatch> stopwatchSupplier,
122111
TimeProvider timeProvider,
123-
TlsContextManager tlsContextManager) {
112+
TlsContextManager tlsContextManager,
113+
SynchronizationContext syncContext) {
124114
this.xdsChannelFactory = xdsChannelFactory;
125115
this.bootstrapInfo = bootstrapInfo;
126116
this.context = context;
@@ -135,6 +125,7 @@ public void uncaughtException(Thread t, Throwable e) {
135125
if (LOG_XDS_NODE_ID) {
136126
classLogger.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
137127
}
128+
this.syncContext = checkNotNull(syncContext, "syncContext");
138129
}
139130

140131
private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {

xds/src/main/java/io/grpc/xds/XdsNameResolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ final class XdsNameResolver extends NameResolver {
138138
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
139139
@Nullable Map<String, ?> bootstrapOverride) {
140140
this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler,
141-
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
141+
new SharedXdsClientPoolProvider(syncContext), ThreadSafeRandomImpl.instance,
142142
FilterRegistry.getDefaultRegistry(), bootstrapOverride);
143143
}
144144

@@ -161,7 +161,7 @@ final class XdsNameResolver extends NameResolver {
161161
this.syncContext = checkNotNull(syncContext, "syncContext");
162162
this.scheduler = checkNotNull(scheduler, "scheduler");
163163
this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
164-
"xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
164+
"xdsClientPoolFactory") : new SharedXdsClientPoolProvider(syncContext);
165165
this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
166166
this.random = checkNotNull(random, "random");
167167
this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");

xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import io.grpc.Server;
5959
import io.grpc.Status;
6060
import io.grpc.Status.Code;
61+
import io.grpc.SynchronizationContext;
6162
import io.grpc.inprocess.InProcessChannelBuilder;
6263
import io.grpc.inprocess.InProcessServerBuilder;
6364
import io.grpc.internal.BackoffPolicy;
@@ -292,6 +293,13 @@ public long currentTimeNanos() {
292293
private final String serverName = InProcessServerBuilder.generateName();
293294
private BindableService adsService = createAdsService();
294295
private BindableService lrsService = createLrsService();
296+
private final SynchronizationContext syncContext = new SynchronizationContext(
297+
new Thread.UncaughtExceptionHandler() {
298+
@Override
299+
public void uncaughtException(Thread t, Throwable e) {
300+
throw new AssertionError(e);
301+
}
302+
});
295303

296304
@Before
297305
public void setUp() throws IOException {
@@ -358,6 +366,7 @@ ManagedChannel create(ServerInfo serverInfo) {
358366
.certProviders(ImmutableMap.of("cert-instance-name",
359367
CertificateProviderInfo.create("file-watcher", ImmutableMap.<String, Object>of())))
360368
.build();
369+
361370
xdsClient =
362371
new XdsClientImpl(
363372
xdsChannelFactory,
@@ -367,7 +376,8 @@ ManagedChannel create(ServerInfo serverInfo) {
367376
backoffPolicyProvider,
368377
fakeClock.getStopwatchSupplier(),
369378
timeProvider,
370-
tlsContextManager);
379+
tlsContextManager,
380+
syncContext);
371381

372382
assertThat(resourceDiscoveryCalls).isEmpty();
373383
assertThat(loadReportCalls).isEmpty();
@@ -3613,7 +3623,8 @@ private XdsClientImpl createXdsClient(String serverUri) {
36133623
backoffPolicyProvider,
36143624
fakeClock.getStopwatchSupplier(),
36153625
timeProvider,
3616-
tlsContextManager);
3626+
tlsContextManager,
3627+
syncContext);
36173628
}
36183629

36193630
private BootstrapInfo buildBootStrap(String serverUri) {

0 commit comments

Comments
 (0)