2020
2121import com .google .common .annotations .VisibleForTesting ;
2222import io .grpc .Context ;
23+ import io .grpc .SynchronizationContext ;
2324import io .grpc .internal .ExponentialBackoffPolicy ;
2425import io .grpc .internal .GrpcUtil ;
2526import io .grpc .internal .ObjectPool ;
3233import java .util .Map ;
3334import java .util .concurrent .ScheduledExecutorService ;
3435import java .util .concurrent .atomic .AtomicReference ;
36+ import java .util .logging .Level ;
37+ import java .util .logging .Logger ;
3538import javax .annotation .Nullable ;
3639import javax .annotation .concurrent .GuardedBy ;
3740import javax .annotation .concurrent .ThreadSafe ;
4245 */
4346@ ThreadSafe
4447final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
45-
48+ private static final Logger log = Logger . getLogger ( SharedXdsClientPoolProvider . class . getName ());
4649 private final Bootstrapper bootstrapper ;
4750 private final Object lock = new Object ();
4851 private final AtomicReference <Map <String , ?>> bootstrapOverride = new AtomicReference <>();
4952 private volatile ObjectPool <XdsClient > xdsClientPool ;
53+ private final SynchronizationContext syncContext ;
5054
5155 SharedXdsClientPoolProvider () {
5256 this (new BootstrapperImpl ());
5357 }
5458
59+ SharedXdsClientPoolProvider (SynchronizationContext syncContext ) {
60+ this (new BootstrapperImpl (), syncContext );
61+ }
62+
5563 @ VisibleForTesting
5664 SharedXdsClientPoolProvider (Bootstrapper bootstrapper ) {
65+ this (bootstrapper , new SynchronizationContext (
66+ new Thread .UncaughtExceptionHandler () {
67+ @ Override
68+ public void uncaughtException (Thread t , Throwable e ) {
69+ log .log (Level .WARNING ,
70+ "Uncaught exception in XdsClient SynchronizationContext. Panic!" , e );
71+ throw new AssertionError (e );
72+ }
73+ }));
74+ }
75+
76+ SharedXdsClientPoolProvider (Bootstrapper bootstrapper , SynchronizationContext syncContext ) {
5777 this .bootstrapper = checkNotNull (bootstrapper , "bootstrapper" );
78+ this .syncContext = checkNotNull (syncContext , "syncContext" );
5879 }
5980
6081 static SharedXdsClientPoolProvider getDefaultProvider () {
@@ -89,7 +110,7 @@ public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
89110 if (bootstrapInfo .servers ().isEmpty ()) {
90111 throw new XdsInitializationException ("No xDS server provided" );
91112 }
92- ref = xdsClientPool = new RefCountedXdsClientObjectPool (bootstrapInfo );
113+ ref = xdsClientPool = new RefCountedXdsClientObjectPool (bootstrapInfo , syncContext );
93114 }
94115 }
95116 }
@@ -112,10 +133,27 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
112133 private XdsClient xdsClient ;
113134 @ GuardedBy ("lock" )
114135 private int refCount ;
136+ private final SynchronizationContext syncContext ;
115137
116138 @ VisibleForTesting
117139 RefCountedXdsClientObjectPool (BootstrapInfo bootstrapInfo ) {
140+ this (bootstrapInfo , new SynchronizationContext (
141+ new Thread .UncaughtExceptionHandler () {
142+ @ Override
143+ public void uncaughtException (Thread t , Throwable e ) {
144+ log .log (
145+ Level .WARNING ,
146+ "Uncaught exception in XdsClient SynchronizationContext. Panic!" ,
147+ e );
148+ // TODO(chengyuanzhang): better error handling.
149+ throw new AssertionError (e );
150+ }
151+ }));
152+ }
153+
154+ RefCountedXdsClientObjectPool (BootstrapInfo bootstrapInfo , SynchronizationContext syncContext ) {
118155 this .bootstrapInfo = checkNotNull (bootstrapInfo );
156+ this .syncContext = checkNotNull (syncContext );
119157 }
120158
121159 @ Override
@@ -131,7 +169,8 @@ public XdsClient getObject() {
131169 new ExponentialBackoffPolicy .Provider (),
132170 GrpcUtil .STOPWATCH_SUPPLIER ,
133171 TimeProvider .SYSTEM_TIME_PROVIDER ,
134- new TlsContextManagerImpl (bootstrapInfo ));
172+ new TlsContextManagerImpl (bootstrapInfo ),
173+ syncContext );
135174 }
136175 refCount ++;
137176 return xdsClient ;
0 commit comments