Skip to content

Commit 003348b

Browse files
committed
Restore experimental property in @after of test
Fix fallback logic for new watch triggering fallback to use the right CPC. Some non-functional clarity improvement Change hasSubscribers detection to be across all types and not trying to involve any logic in ControlPlaneClient.
1 parent 22b5a0c commit 003348b

5 files changed

Lines changed: 42 additions & 43 deletions

File tree

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -280,16 +280,6 @@ void sendDiscoveryRequests() {
280280
}
281281
}
282282

283-
@SuppressWarnings("rawtypes")
284-
boolean hasSubscribedResources(String authority) {
285-
for (XdsResourceType<?> type : resourceStore.getSubscribedResourceTypesWithTypeUrl().values()) {
286-
if (resourceStore.hasSubscribers(type, authority)) {
287-
return true;
288-
}
289-
}
290-
return false;
291-
}
292-
293283
public boolean isStartingUp() {
294284
return startingUp;
295285
}

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,6 @@ Collection<String> getSubscribedResources(
428428

429429
Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl();
430430

431-
boolean hasSubscribers(XdsResourceType<? extends ResourceUpdate> type, String authority);
432-
431+
boolean hasSubscribers(String authority);
433432
}
434433
}

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,17 @@ public Collection<String> getSubscribedResources(
199199
}
200200

201201
@Override
202-
public boolean hasSubscribers(XdsResourceType<? extends ResourceUpdate> type, String authority) {
203-
Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
204-
resourceSubscribers.getOrDefault(type, Collections.emptyMap());
205-
return resources.values().stream()
206-
.anyMatch(res -> Objects.equals(res.authority, authority));
202+
public boolean hasSubscribers(String authority) {
203+
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> map
204+
: resourceSubscribers.values()) {
205+
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : map.values()) {
206+
if (subscriber != null && Objects.equals(subscriber.authority, authority)) {
207+
return true;
208+
}
209+
}
210+
}
211+
212+
return false;
207213
}
208214

209215
// As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
@@ -261,7 +267,6 @@ public void run() {
261267
subscriber.addWatcher(watcher, watcherExecutor);
262268
boolean hadErrorBeforeCpc = subscriber.errorDescription != null;
263269

264-
ControlPlaneClient originalCpc = getActiveCpc(subscriber.authority);
265270
ControlPlaneClient cpcToUse = subscriber.loadControlPlane();
266271

267272
if (subscriber.errorDescription != null) {
@@ -273,9 +278,10 @@ public void run() {
273278
return;
274279
}
275280

276-
doFallbackIfNecessary(originalCpc, subscriber.authority);
281+
doFallbackIfNecessary(cpcToUse, subscriber.authority);
277282

278283
if (cpcToUse != null) {
284+
addCpcToAuthority(subscriber.authority, cpcToUse);
279285
cpcToUse.adjustResourceSubscription(type);
280286
subscriber.restartTimer();
281287
} else {
@@ -291,16 +297,14 @@ public void run() {
291297
}
292298

293299
private void addCpcToAuthority(String authority, ControlPlaneClient cpcToUse) {
294-
List<ControlPlaneClient> controlPlaneClients = activeCpClients.get(authority);
295-
if (controlPlaneClients == null) {
296-
controlPlaneClients = new ArrayList<>();
297-
activeCpClients.put(authority, controlPlaneClients);
298-
}
300+
List<ControlPlaneClient> controlPlaneClients =
301+
activeCpClients.computeIfAbsent(authority, k -> new ArrayList<>());
302+
299303
if (controlPlaneClients.contains(cpcToUse)) {
300304
return;
301305
}
302306

303-
// if there are any missing cpcs between the last one and cpcToUse, add them + cpcToUse
307+
// if there are any missing CPCs between the last one and cpcToUse, add them + add cpcToUse
304308
ImmutableList<ServerInfo> serverInfos = getServerInfos(authority);
305309
for (int i = controlPlaneClients.size(); i < serverInfos.size(); i++) {
306310
ServerInfo serverInfo = serverInfos.get(i);
@@ -607,8 +611,8 @@ private void shutdownLowerPriorityCpcs(ControlPlaneClient activatedCpc) {
607611
int index = cpcsForAuth.indexOf(activatedCpc);
608612
if (index > -1 && index < cpcsForAuth.size() - 1) {
609613
cpcsToShutdown.addAll(cpcsForAuth.subList(index + 1, cpcsForAuth.size()));
610-
for (int i = cpcsForAuth.size() - 1; i > index; i--) {
611-
cpcsForAuth.remove(i);
614+
if (cpcsForAuth.size() > index + 1) {
615+
cpcsForAuth.subList(index + 1, cpcsForAuth.size()).clear();
612616
}
613617
}
614618
}
@@ -660,7 +664,7 @@ private boolean fallBackToCpc(ControlPlaneClient fallbackCpc, String authority,
660664
fallbackCpc.getServerInfo().target());
661665

662666
// Get authorities that aren't falling back
663-
// If don't already have a cached LDS resource, cache current data value
667+
// If we don't already have a cached LDS resource, cache the current data value
664668

665669
addCpcToAuthority(authority, fallbackCpc);
666670
restartMatchingSubscriberTimers(authority);
@@ -673,17 +677,15 @@ private boolean fallBackToCpc(ControlPlaneClient fallbackCpc, String authority,
673677
return didFallback;
674678
}
675679

676-
private void doFallbackIfNecessary(ControlPlaneClient cpc, String authority) {
677-
if (cpc == null || !BootstrapperImpl.isEnabledXdsFallback()) {
680+
private void doFallbackIfNecessary(ControlPlaneClient cpcToUse, String authority) {
681+
if (cpcToUse == null) {
678682
return;
679683
}
680684

681685
ControlPlaneClient activeCpClient = getActiveCpc(authority);
682-
if (cpc == activeCpClient || activeCpClient == null) {
683-
return;
686+
if (cpcToUse != activeCpClient && activeCpClient != null) {
687+
fallBackToCpc(cpcToUse, authority, activeCpClient);
684688
}
685-
686-
fallBackToCpc(cpc, authority, activeCpClient);
687689
}
688690

689691
private void restartMatchingSubscriberTimers(String authority) {
@@ -744,7 +746,7 @@ private ControlPlaneClient loadControlPlane() {
744746
return null;
745747
}
746748

747-
ControlPlaneClient controlPlaneClient = null;
749+
ControlPlaneClient controlPlaneClient;
748750
try {
749751
controlPlaneClient = getOrCreateControlPlaneClient(serverInfos);
750752
if (controlPlaneClient == null) {
@@ -1014,8 +1016,7 @@ public void handleStreamClosed(Status status, boolean inRetry,
10141016
}
10151017

10161018
Status error = status.isOk() ? Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER) : status;
1017-
boolean checkForFallback = !status.isOk() && BootstrapperImpl.isEnabledXdsFallback()
1018-
&& !inRetry;
1019+
boolean checkForFallback = !status.isOk() && !inRetry;
10191020

10201021
List<String> authoritiesForClosedCpc = getActiveAuthorities(cpcThatClosed);
10211022
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
@@ -1026,13 +1027,14 @@ public void handleStreamClosed(Status status, boolean inRetry,
10261027
}
10271028

10281029
// try to fallback to lower priority control plane client
1029-
if (checkForFallback && doFallbackForAuthority(
1030-
cpcThatClosed, serverInfo, subscriber.serverInfos, subscriber.authority)) {
1030+
if (checkForFallback
1031+
&& doFallbackForAuthority(
1032+
cpcThatClosed, serverInfo, subscriber.serverInfos, subscriber.authority)) {
10311033
authoritiesForClosedCpc.remove(subscriber.authority);
10321034
if (authoritiesForClosedCpc.isEmpty()) {
10331035
return; // optimization: no need to continue once all authorities have done fallback
10341036
}
1035-
continue;
1037+
continue; // since we did fallback, don't consider it an error
10361038
}
10371039

10381040
subscriber.onError(error, null);
@@ -1053,7 +1055,7 @@ public void handleStreamRestarted(ServerInfo serverInfo) {
10531055
}
10541056

10551057
for (String authority : getAuthoritiesForServerInfo(serverInfo)) {
1056-
if (controlPlaneClient.hasSubscribedResources(authority)) {
1058+
if (hasSubscribers(authority)) {
10571059
internalHandleStreamReady(serverInfo, controlPlaneClient, authority);
10581060
}
10591061
}

xds/src/test/java/io/grpc/xds/CsdsServiceTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -508,8 +508,7 @@ public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
508508
}
509509

510510
@Override
511-
public boolean hasSubscribers(XdsResourceType<? extends ResourceUpdate> type,
512-
String authority) {
511+
public boolean hasSubscribers(String authority) {
513512
return true;
514513
}
515514
}

xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class GrpcBootstrapperImplTest {
6161
private String originalBootstrapPathFromSysProp;
6262
private String originalBootstrapConfigFromEnvVar;
6363
private String originalBootstrapConfigFromSysProp;
64+
private String originalExperimentalXdsFallbackFlag;
6465

6566
@Before
6667
public void setUp() {
@@ -74,6 +75,8 @@ private void saveEnvironment() {
7475
originalBootstrapPathFromSysProp = bootstrapper.bootstrapPathFromSysProp;
7576
originalBootstrapConfigFromEnvVar = bootstrapper.bootstrapConfigFromEnvVar;
7677
originalBootstrapConfigFromSysProp = bootstrapper.bootstrapConfigFromSysProp;
78+
originalExperimentalXdsFallbackFlag =
79+
System.getProperty(BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK);
7780
}
7881

7982
@After
@@ -82,6 +85,12 @@ public void restoreEnvironment() {
8285
bootstrapper.bootstrapPathFromSysProp = originalBootstrapPathFromSysProp;
8386
bootstrapper.bootstrapConfigFromEnvVar = originalBootstrapConfigFromEnvVar;
8487
bootstrapper.bootstrapConfigFromSysProp = originalBootstrapConfigFromSysProp;
88+
if (originalExperimentalXdsFallbackFlag != null) {
89+
System.setProperty(
90+
BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK, originalExperimentalXdsFallbackFlag);
91+
} else {
92+
System.clearProperty(BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK);
93+
}
8594
}
8695

8796
@Test

0 commit comments

Comments
 (0)