Skip to content

Commit b524c08

Browse files
authored
xdsclient: include xds node ID in errors from the WatchResource API (#8093)
1 parent 91eb6aa commit b524c08

File tree

3 files changed

+161
-10
lines changed

3 files changed

+161
-10
lines changed

xds/internal/xdsclient/authority.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -604,8 +604,9 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
604604
a.logger.Infof("New watch for type %q, resource name %q", rType.TypeName(), resourceName)
605605
}
606606

607-
xdsChannel := a.xdsChannelToUse()
608-
if xdsChannel == nil {
607+
xdsChannel, err := a.xdsChannelToUse()
608+
if err != nil {
609+
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) })
609610
return
610611
}
611612

@@ -739,22 +740,23 @@ func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string,
739740
// Otherwise, it creates a new channel using the first server configuration in
740741
// the list of configurations, and returns that.
741742
//
743+
// A non-nil error is returned if the channel creation fails.
744+
//
742745
// Only executed in the context of a serializer callback.
743-
func (a *authority) xdsChannelToUse() *xdsChannelWithConfig {
746+
func (a *authority) xdsChannelToUse() (*xdsChannelWithConfig, error) {
744747
if a.activeXDSChannel != nil {
745-
return a.activeXDSChannel
748+
return a.activeXDSChannel, nil
746749
}
747750

748751
sc := a.xdsChannelConfigs[0].serverConfig
749752
xc, cleanup, err := a.getChannelForADS(sc, a)
750753
if err != nil {
751-
a.logger.Warningf("Failed to create xDS channel: %v", err)
752-
return nil
754+
return nil, err
753755
}
754756
a.xdsChannelConfigs[0].channel = xc
755757
a.xdsChannelConfigs[0].cleanup = cleanup
756758
a.activeXDSChannel = a.xdsChannelConfigs[0]
757-
return a.activeXDSChannel
759+
return a.activeXDSChannel, nil
758760
}
759761

760762
// closeXDSChannels closes all the xDS channels associated with this authority,

xds/internal/xdsclient/clientimpl_watchers.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ import (
2626
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
2727
)
2828

29+
// wrappingWatcher is a wrapper around an xdsresource.ResourceWatcher that adds
30+
// the node ID to the error messages reported to the watcher.
31+
type wrappingWatcher struct {
32+
xdsresource.ResourceWatcher
33+
nodeID string
34+
}
35+
36+
func (w *wrappingWatcher) OnError(err error, done xdsresource.OnDoneFunc) {
37+
w.ResourceWatcher.OnError(fmt.Errorf("[xDS node id: %v]: %v", w.nodeID, err), done)
38+
}
39+
2940
// WatchResource uses xDS to discover the resource associated with the provided
3041
// resource name. The resource type implementation determines how xDS responses
3142
// are are deserialized and validated, as received from the xDS management
@@ -43,6 +54,11 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
4354
return func() {}
4455
}
4556

57+
watcher = &wrappingWatcher{
58+
ResourceWatcher: watcher,
59+
nodeID: c.config.Node().GetId(),
60+
}
61+
4662
if err := c.resourceTypes.maybeRegister(rType); err != nil {
4763
logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName)
4864
c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) })
@@ -53,9 +69,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
5369
a := c.getAuthorityForResource(n)
5470
if a == nil {
5571
logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority)
56-
c.serializer.TrySchedule(func(context.Context) {
57-
watcher.OnError(fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {})
58-
})
72+
watcher.OnError(fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {})
5973
return func() {}
6074
}
6175
// The watchResource method on the authority is invoked with n.String()

xds/internal/xdsclient/tests/misc_watchers_test.go

+135
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,19 @@ import (
2222
"context"
2323
"encoding/json"
2424
"fmt"
25+
"strings"
2526
"testing"
2627

2728
"github.com/google/uuid"
29+
"google.golang.org/grpc"
2830
"google.golang.org/grpc/internal/testutils"
2931
"google.golang.org/grpc/internal/testutils/xds/e2e"
3032
"google.golang.org/grpc/internal/testutils/xds/fakeserver"
3133
"google.golang.org/grpc/internal/xds/bootstrap"
3234
"google.golang.org/grpc/xds/internal"
3335
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
3436
"google.golang.org/grpc/xds/internal/xdsclient"
37+
xdsclientinternal "google.golang.org/grpc/xds/internal/xdsclient/internal"
3538
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
3639
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
3740
"google.golang.org/protobuf/types/known/anypb"
@@ -371,3 +374,135 @@ func readDiscoveryResponseAndCheckForNonEmptyNodeProto(ctx context.Context, reqC
371374
}
372375
return nil
373376
}
377+
378+
type testRouteConfigResourceType struct{}
379+
380+
func (testRouteConfigResourceType) TypeURL() string { return version.V3RouteConfigURL }
381+
func (testRouteConfigResourceType) TypeName() string { return "RouteConfigResource" }
382+
func (testRouteConfigResourceType) AllResourcesRequiredInSotW() bool { return false }
383+
func (testRouteConfigResourceType) Decode(*xdsresource.DecodeOptions, *anypb.Any) (*xdsresource.DecodeResult, error) {
384+
return nil, nil
385+
}
386+
387+
// Tests that the errors returned by the xDS client when watching a resource
388+
// contain the node ID that was used to create the client. This test covers two
389+
// scenarios:
390+
//
391+
// 1. When a watch is registered for an already registered resource type, but
392+
// this time with a different implementation,
393+
// 2. When a watch is registered for a resource name whose authority is not
394+
// found in the bootstrap configuration.
395+
func (s) TestWatchErrorsContainNodeID(t *testing.T) {
396+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
397+
398+
// Create bootstrap configuration pointing to the above management server.
399+
nodeID := uuid.New().String()
400+
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
401+
402+
// Create an xDS client with the above bootstrap contents.
403+
config, err := bootstrap.NewConfigFromContents(bc)
404+
if err != nil {
405+
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err)
406+
}
407+
pool := xdsclient.NewPool(config)
408+
client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{
409+
Name: t.Name(),
410+
})
411+
if err != nil {
412+
t.Fatalf("Failed to create xDS client: %v", err)
413+
}
414+
defer close()
415+
416+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
417+
defer cancel()
418+
419+
t.Run("Multiple_ResourceType_Implementations", func(t *testing.T) {
420+
const routeConfigName = "route-config-name"
421+
watcher := xdstestutils.NewTestResourceWatcher()
422+
client.WatchResource(routeConfigResourceType, routeConfigName, watcher)
423+
424+
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
425+
defer sCancel()
426+
select {
427+
case <-sCtx.Done():
428+
case <-watcher.UpdateCh:
429+
t.Fatal("Unexpected resource update")
430+
case <-watcher.ErrorCh:
431+
t.Fatal("Unexpected resource error")
432+
case <-watcher.ResourceDoesNotExistCh:
433+
t.Fatal("Unexpected resource does not exist")
434+
}
435+
436+
client.WatchResource(testRouteConfigResourceType{}, routeConfigName, watcher)
437+
select {
438+
case <-ctx.Done():
439+
t.Fatal("Timeout when waiting for error callback to be invoked")
440+
case err := <-watcher.ErrorCh:
441+
if err == nil || !strings.Contains(err.Error(), nodeID) {
442+
t.Fatalf("Unexpected error: %v, want error with node ID: %q", err, nodeID)
443+
}
444+
}
445+
})
446+
447+
t.Run("Missing_Authority", func(t *testing.T) {
448+
const routeConfigName = "xdstp://nonexistant-authority/envoy.config.route.v3.RouteConfiguration/route-config-name"
449+
watcher := xdstestutils.NewTestResourceWatcher()
450+
client.WatchResource(routeConfigResourceType, routeConfigName, watcher)
451+
452+
select {
453+
case <-ctx.Done():
454+
t.Fatal("Timeout when waiting for error callback to be invoked")
455+
case err := <-watcher.ErrorCh:
456+
if err == nil || !strings.Contains(err.Error(), nodeID) {
457+
t.Fatalf("Unexpected error: %v, want error with node ID: %q", err, nodeID)
458+
}
459+
}
460+
})
461+
}
462+
463+
// Tests that the errors returned by the xDS client when watching a resource
464+
// contain the node ID when channel creation to the management server fails.
465+
func (s) TestWatchErrorsContainNodeID_ChannelCreationFailure(t *testing.T) {
466+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
467+
468+
// Create bootstrap configuration pointing to the above management server.
469+
nodeID := uuid.New().String()
470+
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
471+
472+
// Create an xDS client with the above bootstrap contents.
473+
config, err := bootstrap.NewConfigFromContents(bc)
474+
if err != nil {
475+
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err)
476+
}
477+
pool := xdsclient.NewPool(config)
478+
client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{
479+
Name: t.Name(),
480+
})
481+
if err != nil {
482+
t.Fatalf("Failed to create xDS client: %v", err)
483+
}
484+
defer close()
485+
486+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
487+
defer cancel()
488+
489+
// Override the xDS channel dialer with one that always fails.
490+
origDialer := xdsclientinternal.GRPCNewClient
491+
xdsclientinternal.GRPCNewClient = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
492+
return nil, fmt.Errorf("failed to create channel")
493+
}
494+
defer func() { xdsclientinternal.GRPCNewClient = origDialer }()
495+
496+
const routeConfigName = "route-config-name"
497+
watcher := xdstestutils.NewTestResourceWatcher()
498+
client.WatchResource(routeConfigResourceType, routeConfigName, watcher)
499+
500+
select {
501+
case <-ctx.Done():
502+
t.Fatal("Timeout when waiting for error callback to be invoked")
503+
case err := <-watcher.ErrorCh:
504+
if err == nil || !strings.Contains(err.Error(), nodeID) {
505+
t.Fatalf("Unexpected error: %v, want error with node ID: %q", err, nodeID)
506+
}
507+
}
508+
}

0 commit comments

Comments
 (0)