Skip to content

Commit 846e008

Browse files
authored
xds: Revert xds flow control change. (grpc#10784)
* Revert "xds: fix flow control test failure (grpc#10773)" This reverts commit f67ec2e. * Revert "xDS: implement ADS stream flow control mechanism (grpc#10674)" This reverts commit 0a704a5.
1 parent f67ec2e commit 846e008

14 files changed

Lines changed: 342 additions & 520 deletions

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Lines changed: 81 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ private ClusterState(String name) {
320320

321321
private void start() {
322322
shutdown = false;
323-
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
323+
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
324324
}
325325

326326
void shutdown() {
@@ -341,85 +341,102 @@ public void onError(Status error) {
341341
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
342342
name, error.getCode(), error.getDescription()))
343343
.withCause(error.getCause());
344-
if (shutdown) {
345-
return;
346-
}
347-
// All watchers should receive the same error, so we only propagate it once.
348-
if (ClusterState.this == root) {
349-
handleClusterDiscoveryError(status);
350-
}
344+
syncContext.execute(new Runnable() {
345+
@Override
346+
public void run() {
347+
if (shutdown) {
348+
return;
349+
}
350+
// All watchers should receive the same error, so we only propagate it once.
351+
if (ClusterState.this == root) {
352+
handleClusterDiscoveryError(status);
353+
}
354+
}
355+
});
351356
}
352357

353358
@Override
354359
public void onResourceDoesNotExist(String resourceName) {
355-
if (shutdown) {
356-
return;
357-
}
358-
discovered = true;
359-
result = null;
360-
if (childClusterStates != null) {
361-
for (ClusterState state : childClusterStates.values()) {
362-
state.shutdown();
360+
syncContext.execute(new Runnable() {
361+
@Override
362+
public void run() {
363+
if (shutdown) {
364+
return;
365+
}
366+
discovered = true;
367+
result = null;
368+
if (childClusterStates != null) {
369+
for (ClusterState state : childClusterStates.values()) {
370+
state.shutdown();
371+
}
372+
childClusterStates = null;
373+
}
374+
handleClusterDiscovered();
363375
}
364-
childClusterStates = null;
365-
}
366-
handleClusterDiscovered();
376+
});
367377
}
368378

369379
@Override
370380
public void onChanged(final CdsUpdate update) {
371-
if (shutdown) {
372-
return;
373-
}
374-
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
375-
discovered = true;
376-
result = update;
377-
if (update.clusterType() == ClusterType.AGGREGATE) {
378-
isLeaf = false;
379-
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
380-
update.clusterName(), update.prioritizedClusterNames());
381-
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
382-
for (String cluster : update.prioritizedClusterNames()) {
383-
if (newChildStates.containsKey(cluster)) {
384-
logger.log(XdsLogLevel.WARNING,
385-
String.format("duplicate cluster name %s in aggregate %s is being ignored",
386-
cluster, update.clusterName()));
387-
continue;
381+
class ClusterDiscovered implements Runnable {
382+
@Override
383+
public void run() {
384+
if (shutdown) {
385+
return;
388386
}
389-
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
390-
ClusterState childState;
391-
if (clusterStates.containsKey(cluster)) {
392-
childState = clusterStates.get(cluster);
393-
if (childState.shutdown) {
394-
childState.start();
387+
388+
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
389+
discovered = true;
390+
result = update;
391+
if (update.clusterType() == ClusterType.AGGREGATE) {
392+
isLeaf = false;
393+
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
394+
update.clusterName(), update.prioritizedClusterNames());
395+
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
396+
for (String cluster : update.prioritizedClusterNames()) {
397+
if (newChildStates.containsKey(cluster)) {
398+
logger.log(XdsLogLevel.WARNING,
399+
String.format("duplicate cluster name %s in aggregate %s is being ignored",
400+
cluster, update.clusterName()));
401+
continue;
402+
}
403+
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
404+
ClusterState childState;
405+
if (clusterStates.containsKey(cluster)) {
406+
childState = clusterStates.get(cluster);
407+
if (childState.shutdown) {
408+
childState.start();
409+
}
410+
} else {
411+
childState = new ClusterState(cluster);
412+
clusterStates.put(cluster, childState);
413+
childState.start();
414+
}
415+
newChildStates.put(cluster, childState);
416+
} else {
417+
newChildStates.put(cluster, childClusterStates.remove(cluster));
395418
}
396-
} else {
397-
childState = new ClusterState(cluster);
398-
clusterStates.put(cluster, childState);
399-
childState.start();
400419
}
401-
newChildStates.put(cluster, childState);
402-
} else {
403-
newChildStates.put(cluster, childClusterStates.remove(cluster));
404-
}
405-
}
406-
if (childClusterStates != null) { // stop subscribing to revoked child clusters
407-
for (ClusterState watcher : childClusterStates.values()) {
408-
watcher.shutdown();
420+
if (childClusterStates != null) { // stop subscribing to revoked child clusters
421+
for (ClusterState watcher : childClusterStates.values()) {
422+
watcher.shutdown();
423+
}
424+
}
425+
childClusterStates = newChildStates;
426+
} else if (update.clusterType() == ClusterType.EDS) {
427+
isLeaf = true;
428+
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
429+
update.clusterName(), update.edsServiceName());
430+
} else { // logical DNS
431+
isLeaf = true;
432+
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
409433
}
434+
handleClusterDiscovered();
410435
}
411-
childClusterStates = newChildStates;
412-
} else if (update.clusterType() == ClusterType.EDS) {
413-
isLeaf = true;
414-
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
415-
update.clusterName(), update.edsServiceName());
416-
} else { // logical DNS
417-
isLeaf = true;
418-
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
419436
}
420-
handleClusterDiscovered();
421-
}
422437

438+
syncContext.execute(new ClusterDiscovered());
439+
}
423440
}
424441
}
425442
}

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,7 @@ private EdsClusterState(String name, @Nullable String edsServiceName,
366366
void start() {
367367
String resourceName = edsServiceName != null ? edsServiceName : name;
368368
logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
369-
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
370-
resourceName, this, syncContext);
369+
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this);
371370
}
372371

373372
@Override
@@ -453,7 +452,7 @@ public void run() {
453452
}
454453
}
455454

456-
new EndpointsUpdated().run();
455+
syncContext.execute(new EndpointsUpdated());
457456
}
458457

459458
private List<String> generatePriorityNames(String name,
@@ -492,28 +491,38 @@ private List<String> generatePriorityNames(String name,
492491

493492
@Override
494493
public void onResourceDoesNotExist(final String resourceName) {
495-
if (shutdown) {
496-
return;
497-
}
498-
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
499-
status = Status.OK;
500-
resolved = true;
501-
result = null; // resource revoked
502-
handleEndpointResourceUpdate();
494+
syncContext.execute(new Runnable() {
495+
@Override
496+
public void run() {
497+
if (shutdown) {
498+
return;
499+
}
500+
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
501+
status = Status.OK;
502+
resolved = true;
503+
result = null; // resource revoked
504+
handleEndpointResourceUpdate();
505+
}
506+
});
503507
}
504508

505509
@Override
506510
public void onError(final Status error) {
507-
if (shutdown) {
508-
return;
509-
}
510-
String resourceName = edsServiceName != null ? edsServiceName : name;
511-
status = Status.UNAVAILABLE
512-
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
513-
resourceName, error.getCode(), error.getDescription()))
514-
.withCause(error.getCause());
515-
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
516-
handleEndpointResolutionError();
511+
syncContext.execute(new Runnable() {
512+
@Override
513+
public void run() {
514+
if (shutdown) {
515+
return;
516+
}
517+
String resourceName = edsServiceName != null ? edsServiceName : name;
518+
status = Status.UNAVAILABLE
519+
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
520+
resourceName, error.getCode(), error.getDescription()))
521+
.withCause(error.getCause());
522+
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
523+
handleEndpointResolutionError();
524+
}
525+
});
517526
}
518527
}
519528

xds/src/main/java/io/grpc/xds/ControlPlaneClient.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@
3838
import io.grpc.internal.BackoffPolicy;
3939
import io.grpc.stub.ClientCallStreamObserver;
4040
import io.grpc.stub.ClientResponseObserver;
41+
import io.grpc.stub.StreamObserver;
4142
import io.grpc.xds.Bootstrapper.ServerInfo;
4243
import io.grpc.xds.EnvoyProtoData.Node;
43-
import io.grpc.xds.XdsClient.ProcessingTracker;
4444
import io.grpc.xds.XdsClient.ResourceStore;
4545
import io.grpc.xds.XdsClient.XdsResponseHandler;
4646
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
@@ -288,8 +288,6 @@ private abstract class AbstractAdsStream {
288288

289289
abstract boolean isReady();
290290

291-
abstract void request(int count);
292-
293291
/**
294292
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
295293
* {@code errorDetail}. Used for reacting to a specific discovery response. For
@@ -316,10 +314,7 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
316314
}
317315
responseReceived = true;
318316
respNonces.put(type, nonce);
319-
ProcessingTracker processingTracker = new ProcessingTracker(() -> request(1), syncContext);
320-
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
321-
processingTracker);
322-
processingTracker.onComplete();
317+
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
323318
}
324319

325320
final void handleRpcError(Throwable t) {
@@ -377,15 +372,14 @@ private void cleanUp() {
377372
}
378373

379374
private final class AdsStreamV3 extends AbstractAdsStream {
380-
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
375+
private StreamObserver<DiscoveryRequest> requestWriter;
381376

382377
@Override
383378
public boolean isReady() {
384379
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
385380
}
386381

387382
@Override
388-
@SuppressWarnings("unchecked")
389383
void start() {
390384
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
391385
AggregatedDiscoveryServiceGrpc.newStub(channel);
@@ -395,7 +389,6 @@ final class AdsClientResponseObserver
395389

396390
@Override
397391
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
398-
requestStream.disableAutoRequestWithInitial(1);
399392
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
400393
}
401394

@@ -444,8 +437,7 @@ public void run() {
444437
}
445438
}
446439

447-
requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
448-
new AdsClientResponseObserver());
440+
requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
449441
}
450442

451443
@Override
@@ -475,11 +467,6 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
475467
}
476468
}
477469

478-
@Override
479-
void request(int count) {
480-
requestWriter.request(count);
481-
}
482-
483470
@Override
484471
void sendError(Exception error) {
485472
requestWriter.onError(error);

xds/src/main/java/io/grpc/xds/XdsClient.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.common.base.Splitter;
2525
import com.google.common.net.UrlEscapers;
2626
import com.google.common.util.concurrent.ListenableFuture;
27-
import com.google.common.util.concurrent.MoreExecutors;
2827
import com.google.protobuf.Any;
2928
import io.grpc.Status;
3029
import io.grpc.xds.Bootstrapper.ServerInfo;
@@ -37,8 +36,6 @@
3736
import java.util.Collections;
3837
import java.util.List;
3938
import java.util.Map;
40-
import java.util.concurrent.Executor;
41-
import java.util.concurrent.atomic.AtomicInteger;
4239
import javax.annotation.Nullable;
4340

4441
/**
@@ -307,15 +304,9 @@ TlsContextManager getTlsContextManager() {
307304
/**
308305
* Registers a data watcher for the given Xds resource.
309306
*/
310-
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
311-
ResourceWatcher<T> watcher,
312-
Executor executor) {
313-
throw new UnsupportedOperationException();
314-
}
315-
316307
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
317308
ResourceWatcher<T> watcher) {
318-
watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
309+
throw new UnsupportedOperationException();
319310
}
320311

321312
/**
@@ -362,32 +353,11 @@ Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
362353
throw new UnsupportedOperationException();
363354
}
364355

365-
static final class ProcessingTracker {
366-
private final AtomicInteger pendingTask = new AtomicInteger(1);
367-
private final Executor executor;
368-
private final Runnable completionListener;
369-
370-
ProcessingTracker(Runnable completionListener, Executor executor) {
371-
this.executor = executor;
372-
this.completionListener = completionListener;
373-
}
374-
375-
void startTask() {
376-
pendingTask.incrementAndGet();
377-
}
378-
379-
void onComplete() {
380-
if (pendingTask.decrementAndGet() == 0) {
381-
executor.execute(completionListener);
382-
}
383-
}
384-
}
385-
386356
interface XdsResponseHandler {
387357
/** Called when a xds response is received. */
388358
void handleResourceResponse(
389359
XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
390-
List<Any> resources, String nonce, ProcessingTracker processingTracker);
360+
List<Any> resources, String nonce);
391361

392362
/** Called when the ADS stream is closed passively. */
393363
// Must be synchronized.

0 commit comments

Comments
 (0)