@@ -199,11 +199,17 @@ public Collection<String> getSubscribedResources(
199199 }
200200
201201 @ Override
202- public boolean hasSubscribers (XdsResourceType <? extends ResourceUpdate > type , String authority ) {
203- Map <String , ResourceSubscriber <? extends ResourceUpdate >> resources =
204- resourceSubscribers .getOrDefault (type , Collections .emptyMap ());
205- return resources .values ().stream ()
206- .anyMatch (res -> Objects .equals (res .authority , authority ));
202+ public boolean hasSubscribers (String authority ) {
203+ for (Map <String , ResourceSubscriber <? extends ResourceUpdate >> map
204+ : resourceSubscribers .values ()) {
205+ for (ResourceSubscriber <? extends ResourceUpdate > subscriber : map .values ()) {
206+ if (subscriber != null && Objects .equals (subscriber .authority , authority )) {
207+ return true ;
208+ }
209+ }
210+ }
211+
212+ return false ;
207213 }
208214
209215 // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
@@ -261,7 +267,6 @@ public void run() {
261267 subscriber .addWatcher (watcher , watcherExecutor );
262268 boolean hadErrorBeforeCpc = subscriber .errorDescription != null ;
263269
264- ControlPlaneClient originalCpc = getActiveCpc (subscriber .authority );
265270 ControlPlaneClient cpcToUse = subscriber .loadControlPlane ();
266271
267272 if (subscriber .errorDescription != null ) {
@@ -273,9 +278,10 @@ public void run() {
273278 return ;
274279 }
275280
276- doFallbackIfNecessary (originalCpc , subscriber .authority );
281+ doFallbackIfNecessary (cpcToUse , subscriber .authority );
277282
278283 if (cpcToUse != null ) {
284+ addCpcToAuthority (subscriber .authority , cpcToUse );
279285 cpcToUse .adjustResourceSubscription (type );
280286 subscriber .restartTimer ();
281287 } else {
@@ -291,16 +297,14 @@ public void run() {
291297 }
292298
293299 private void addCpcToAuthority (String authority , ControlPlaneClient cpcToUse ) {
294- List <ControlPlaneClient > controlPlaneClients = activeCpClients .get (authority );
295- if (controlPlaneClients == null ) {
296- controlPlaneClients = new ArrayList <>();
297- activeCpClients .put (authority , controlPlaneClients );
298- }
300+ List <ControlPlaneClient > controlPlaneClients =
301+ activeCpClients .computeIfAbsent (authority , k -> new ArrayList <>());
302+
299303 if (controlPlaneClients .contains (cpcToUse )) {
300304 return ;
301305 }
302306
303- // if there are any missing cpcs between the last one and cpcToUse, add them + cpcToUse
307+ // if there are any missing CPCs between the last one and cpcToUse, add them + add cpcToUse
304308 ImmutableList <ServerInfo > serverInfos = getServerInfos (authority );
305309 for (int i = controlPlaneClients .size (); i < serverInfos .size (); i ++) {
306310 ServerInfo serverInfo = serverInfos .get (i );
@@ -607,8 +611,8 @@ private void shutdownLowerPriorityCpcs(ControlPlaneClient activatedCpc) {
607611 int index = cpcsForAuth .indexOf (activatedCpc );
608612 if (index > -1 && index < cpcsForAuth .size () - 1 ) {
609613 cpcsToShutdown .addAll (cpcsForAuth .subList (index + 1 , cpcsForAuth .size ()));
610- for ( int i = cpcsForAuth .size () - 1 ; i > index ; i -- ) {
611- cpcsForAuth .remove ( i );
614+ if ( cpcsForAuth .size () > index + 1 ) {
615+ cpcsForAuth .subList ( index + 1 , cpcsForAuth . size ()). clear ( );
612616 }
613617 }
614618 }
@@ -660,7 +664,7 @@ private boolean fallBackToCpc(ControlPlaneClient fallbackCpc, String authority,
660664 fallbackCpc .getServerInfo ().target ());
661665
662666 // Get authorities that aren't falling back
663- // If don't already have a cached LDS resource, cache current data value
667+ // If we don't already have a cached LDS resource, cache the current data value
664668
665669 addCpcToAuthority (authority , fallbackCpc );
666670 restartMatchingSubscriberTimers (authority );
@@ -673,17 +677,15 @@ private boolean fallBackToCpc(ControlPlaneClient fallbackCpc, String authority,
673677 return didFallback ;
674678 }
675679
676- private void doFallbackIfNecessary (ControlPlaneClient cpc , String authority ) {
677- if (cpc == null || ! BootstrapperImpl . isEnabledXdsFallback () ) {
680+ private void doFallbackIfNecessary (ControlPlaneClient cpcToUse , String authority ) {
681+ if (cpcToUse == null ) {
678682 return ;
679683 }
680684
681685 ControlPlaneClient activeCpClient = getActiveCpc (authority );
682- if (cpc == activeCpClient || activeCpClient = = null ) {
683- return ;
686+ if (cpcToUse != activeCpClient && activeCpClient ! = null ) {
687+ fallBackToCpc ( cpcToUse , authority , activeCpClient ) ;
684688 }
685-
686- fallBackToCpc (cpc , authority , activeCpClient );
687689 }
688690
689691 private void restartMatchingSubscriberTimers (String authority ) {
@@ -744,7 +746,7 @@ private ControlPlaneClient loadControlPlane() {
744746 return null ;
745747 }
746748
747- ControlPlaneClient controlPlaneClient = null ;
749+ ControlPlaneClient controlPlaneClient ;
748750 try {
749751 controlPlaneClient = getOrCreateControlPlaneClient (serverInfos );
750752 if (controlPlaneClient == null ) {
@@ -1014,8 +1016,7 @@ public void handleStreamClosed(Status status, boolean inRetry,
10141016 }
10151017
10161018 Status error = status .isOk () ? Status .UNAVAILABLE .withDescription (CLOSED_BY_SERVER ) : status ;
1017- boolean checkForFallback = !status .isOk () && BootstrapperImpl .isEnabledXdsFallback ()
1018- && !inRetry ;
1019+ boolean checkForFallback = !status .isOk () && !inRetry ;
10191020
10201021 List <String > authoritiesForClosedCpc = getActiveAuthorities (cpcThatClosed );
10211022 for (Map <String , ResourceSubscriber <? extends ResourceUpdate >> subscriberMap :
@@ -1026,13 +1027,14 @@ public void handleStreamClosed(Status status, boolean inRetry,
10261027 }
10271028
10281029 // try to fallback to lower priority control plane client
1029- if (checkForFallback && doFallbackForAuthority (
1030- cpcThatClosed , serverInfo , subscriber .serverInfos , subscriber .authority )) {
1030+ if (checkForFallback
1031+ && doFallbackForAuthority (
1032+ cpcThatClosed , serverInfo , subscriber .serverInfos , subscriber .authority )) {
10311033 authoritiesForClosedCpc .remove (subscriber .authority );
10321034 if (authoritiesForClosedCpc .isEmpty ()) {
10331035 return ; // optimization: no need to continue once all authorities have done fallback
10341036 }
1035- continue ;
1037+ continue ; // since we did fallback, don't consider it an error
10361038 }
10371039
10381040 subscriber .onError (error , null );
@@ -1053,7 +1055,7 @@ public void handleStreamRestarted(ServerInfo serverInfo) {
10531055 }
10541056
10551057 for (String authority : getAuthoritiesForServerInfo (serverInfo )) {
1056- if (controlPlaneClient . hasSubscribedResources (authority )) {
1058+ if (hasSubscribers (authority )) {
10571059 internalHandleStreamReady (serverInfo , controlPlaneClient , authority );
10581060 }
10591061 }
0 commit comments