@@ -320,7 +320,7 @@ private ClusterState(String name) {
320320
321321 private void start () {
322322 shutdown = false ;
323- xdsClient .watchXdsResource (XdsClusterResource .getInstance (), name , this );
323+ xdsClient .watchXdsResource (XdsClusterResource .getInstance (), name , this , syncContext );
324324 }
325325
326326 void shutdown () {
@@ -341,102 +341,85 @@ public void onError(Status error) {
341341 String .format ("Unable to load CDS %s. xDS server returned: %s: %s" ,
342342 name , error .getCode (), error .getDescription ()))
343343 .withCause (error .getCause ());
344- syncContext .execute (new Runnable () {
345- @ Override
346- public void run () {
347- if (shutdown ) {
348- return ;
349- }
350- // All watchers should receive the same error, so we only propagate it once.
351- if (ClusterState .this == root ) {
352- handleClusterDiscoveryError (status );
353- }
354- }
355- });
344+ if (shutdown ) {
345+ return ;
346+ }
347+ // All watchers should receive the same error, so we only propagate it once.
348+ if (ClusterState .this == root ) {
349+ handleClusterDiscoveryError (status );
350+ }
356351 }
357352
358353 @ Override
359354 public void onResourceDoesNotExist (String resourceName ) {
360- syncContext .execute (new Runnable () {
361- @ Override
362- public void run () {
363- if (shutdown ) {
364- return ;
365- }
366- discovered = true ;
367- result = null ;
368- if (childClusterStates != null ) {
369- for (ClusterState state : childClusterStates .values ()) {
370- state .shutdown ();
371- }
372- childClusterStates = null ;
373- }
374- handleClusterDiscovered ();
355+ if (shutdown ) {
356+ return ;
357+ }
358+ discovered = true ;
359+ result = null ;
360+ if (childClusterStates != null ) {
361+ for (ClusterState state : childClusterStates .values ()) {
362+ state .shutdown ();
375363 }
376- });
364+ childClusterStates = null ;
365+ }
366+ handleClusterDiscovered ();
377367 }
378368
379369 @ Override
380370 public void onChanged (final CdsUpdate update ) {
381- class ClusterDiscovered implements Runnable {
382- @ Override
383- public void run () {
384- if (shutdown ) {
385- return ;
371+ if (shutdown ) {
372+ return ;
373+ }
374+ logger .log (XdsLogLevel .DEBUG , "Received cluster update {0}" , update );
375+ discovered = true ;
376+ result = update ;
377+ if (update .clusterType () == ClusterType .AGGREGATE ) {
378+ isLeaf = false ;
379+ logger .log (XdsLogLevel .INFO , "Aggregate cluster {0}, underlying clusters: {1}" ,
380+ update .clusterName (), update .prioritizedClusterNames ());
381+ Map <String , ClusterState > newChildStates = new LinkedHashMap <>();
382+ for (String cluster : update .prioritizedClusterNames ()) {
383+ if (newChildStates .containsKey (cluster )) {
384+ logger .log (XdsLogLevel .WARNING ,
385+ String .format ("duplicate cluster name %s in aggregate %s is being ignored" ,
386+ cluster , update .clusterName ()));
387+ continue ;
386388 }
387-
388- logger .log (XdsLogLevel .DEBUG , "Received cluster update {0}" , update );
389- discovered = true ;
390- result = update ;
391- if (update .clusterType () == ClusterType .AGGREGATE ) {
392- isLeaf = false ;
393- logger .log (XdsLogLevel .INFO , "Aggregate cluster {0}, underlying clusters: {1}" ,
394- update .clusterName (), update .prioritizedClusterNames ());
395- Map <String , ClusterState > newChildStates = new LinkedHashMap <>();
396- for (String cluster : update .prioritizedClusterNames ()) {
397- if (newChildStates .containsKey (cluster )) {
398- logger .log (XdsLogLevel .WARNING ,
399- String .format ("duplicate cluster name %s in aggregate %s is being ignored" ,
400- cluster , update .clusterName ()));
401- continue ;
402- }
403- if (childClusterStates == null || !childClusterStates .containsKey (cluster )) {
404- ClusterState childState ;
405- if (clusterStates .containsKey (cluster )) {
406- childState = clusterStates .get (cluster );
407- if (childState .shutdown ) {
408- childState .start ();
409- }
410- } else {
411- childState = new ClusterState (cluster );
412- clusterStates .put (cluster , childState );
413- childState .start ();
414- }
415- newChildStates .put (cluster , childState );
416- } else {
417- newChildStates .put (cluster , childClusterStates .remove (cluster ));
418- }
419- }
420- if (childClusterStates != null ) { // stop subscribing to revoked child clusters
421- for (ClusterState watcher : childClusterStates .values ()) {
422- watcher .shutdown ();
389+ if (childClusterStates == null || !childClusterStates .containsKey (cluster )) {
390+ ClusterState childState ;
391+ if (clusterStates .containsKey (cluster )) {
392+ childState = clusterStates .get (cluster );
393+ if (childState .shutdown ) {
394+ childState .start ();
423395 }
396+ } else {
397+ childState = new ClusterState (cluster );
398+ clusterStates .put (cluster , childState );
399+ childState .start ();
424400 }
425- childClusterStates = newChildStates ;
426- } else if ( update . clusterType () == ClusterType . EDS ) {
427- isLeaf = true ;
428- logger . log ( XdsLogLevel . INFO , "EDS cluster {0}, edsServiceName: {1}" ,
429- update . clusterName (), update . edsServiceName ());
430- } else { // logical DNS
431- isLeaf = true ;
432- logger . log ( XdsLogLevel . INFO , "Logical DNS cluster {0}" , update . clusterName () );
401+ newChildStates . put ( cluster , childState ) ;
402+ } else {
403+ newChildStates . put ( cluster , childClusterStates . remove ( cluster )) ;
404+ }
405+ }
406+ if ( childClusterStates != null ) { // stop subscribing to revoked child clusters
407+ for ( ClusterState watcher : childClusterStates . values ()) {
408+ watcher . shutdown ( );
433409 }
434- handleClusterDiscovered ();
435410 }
411+ childClusterStates = newChildStates ;
412+ } else if (update .clusterType () == ClusterType .EDS ) {
413+ isLeaf = true ;
414+ logger .log (XdsLogLevel .INFO , "EDS cluster {0}, edsServiceName: {1}" ,
415+ update .clusterName (), update .edsServiceName ());
416+ } else { // logical DNS
417+ isLeaf = true ;
418+ logger .log (XdsLogLevel .INFO , "Logical DNS cluster {0}" , update .clusterName ());
436419 }
437-
438- syncContext .execute (new ClusterDiscovered ());
420+ handleClusterDiscovered ();
439421 }
422+
440423 }
441424 }
442425}
0 commit comments