Skip to content

Commit 7f23df0

Browse files
authored
xdsclient: switch xdsclient watch deadlock test to e2e style (#5697)
1 parent 32f969e commit 7f23df0

3 files changed

Lines changed: 146 additions & 46 deletions

File tree

internal/testutils/xds/e2e/server.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ type ManagementServerOptions struct {
6060
// will be created and used.
6161
Listener net.Listener
6262

63+
// AllowResourceSubSet allows the management server to respond to requests
64+
// before all configured resources are explicitly named in the request. The
65+
// default behavior that we want is for the management server to wait for
66+
// all configured resources to be requested before responding to any of
67+
// them, since this is how we have run our tests historically, and should be
68+
// set to true only for tests which explicitly require the other behavior.
69+
AllowResourceSubset bool
70+
6371
// The callbacks defined below correspond to the state of the world (sotw)
6472
// version of the xDS API on the management server.
6573

@@ -97,8 +105,11 @@ type ManagementServerOptions struct {
97105
// logic. When the test is done, it should call the Stop() method to cleanup
98106
// resources allocated by the management server.
99107
func StartManagementServer(opts *ManagementServerOptions) (*ManagementServer, error) {
100-
// Create a snapshot cache.
101-
cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, serverLogger{})
108+
// Create a snapshot cache. The first parameter to NewSnapshotCache()
109+
// controls whether the server should wait for all resources to be
110+
// explicitly named in the request before responding to any of them.
111+
wait := opts == nil || !opts.AllowResourceSubset
112+
cache := v3cache.NewSnapshotCache(wait, v3cache.IDHash{}, serverLogger{})
102113
logger.Infof("Created new snapshot cache...")
103114

104115
var lis net.Listener

xds/internal/xdsclient/client_test.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"google.golang.org/grpc/xds/internal/xdsclient/load"
3232
"google.golang.org/grpc/xds/internal/xdsclient/pubsub"
3333
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
34-
"google.golang.org/protobuf/types/known/anypb"
3534

3635
"google.golang.org/grpc"
3736
"google.golang.org/grpc/credentials/insecure"
@@ -148,49 +147,6 @@ func (c *testController) Close() {
148147
c.done.Fire()
149148
}
150149

151-
// TestWatchCallAnotherWatch covers the case where watch() is called inline by a
152-
// callback. It makes sure it doesn't cause a deadlock.
153-
func (s) TestWatchCallAnotherWatch(t *testing.T) {
154-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
155-
defer cancel()
156-
// Start a watch for some resource, so that the controller and update
157-
// handlers are built for this authority. The test needs these to make an
158-
// inline watch in a callback.
159-
client, ctrlCh := testClientSetup(t, false)
160-
newWatch(t, client, xdsresource.ClusterResource, "doesnot-matter")
161-
controller, updateHandler := getControllerAndPubsub(ctx, t, client, ctrlCh, xdsresource.ClusterResource, "doesnot-matter")
162-
163-
clusterUpdateCh := testutils.NewChannel()
164-
firstTime := true
165-
client.WatchCluster(testCDSName, func(update xdsresource.ClusterUpdate, err error) {
166-
clusterUpdateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update, Err: err})
167-
// Calls another watch inline, to ensure there's deadlock.
168-
client.WatchCluster("another-random-name", func(xdsresource.ClusterUpdate, error) {})
169-
170-
if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); firstTime && err != nil {
171-
t.Fatalf("want new watch to start, got error %v", err)
172-
}
173-
firstTime = false
174-
})
175-
if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); err != nil {
176-
t.Fatalf("want new watch to start, got error %v", err)
177-
}
178-
179-
wantUpdate := xdsresource.ClusterUpdate{ClusterName: testEDSName}
180-
updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate}}, xdsresource.UpdateMetadata{})
181-
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil {
182-
t.Fatal(err)
183-
}
184-
185-
// The second update needs to be different in the underlying resource proto
186-
// for the watch callback to be invoked.
187-
wantUpdate2 := xdsresource.ClusterUpdate{ClusterName: testEDSName + "2", Raw: &anypb.Any{}}
188-
updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate2}}, xdsresource.UpdateMetadata{})
189-
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2, nil); err != nil {
190-
t.Fatal(err)
191-
}
192-
}
193-
194150
func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.ListenerUpdate, wantErr error) error {
195151
u, err := updateCh.Receive(ctx)
196152
if err != nil {
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
*
3+
* Copyright 2022 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package e2e_test
20+
21+
import (
22+
"context"
23+
"testing"
24+
25+
"google.golang.org/grpc/internal/testutils"
26+
"google.golang.org/grpc/internal/testutils/xds/e2e"
27+
"google.golang.org/grpc/xds/internal/xdsclient"
28+
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
29+
30+
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
31+
)
32+
33+
// TestWatchCallAnotherWatch tests the scenario where a watch is registered for
34+
// a resource, and more watches are registered from the first watch's callback.
35+
// The test verifies that this scenario does not lead to a deadlock.
36+
func (s) TestWatchCallAnotherWatch(t *testing.T) {
37+
overrideFedEnvVar(t)
38+
39+
// Start an xDS management server and set the option to allow it to respond
40+
// to requests which only specify a subset of the configured resources.
41+
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{AllowResourceSubset: true})
42+
defer cleanup()
43+
44+
// Create an xDS client with the above bootstrap contents.
45+
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
46+
if err != nil {
47+
t.Fatalf("Failed to create xDS client: %v", err)
48+
}
49+
defer client.Close()
50+
51+
// Configure the management server to respond with route config resources.
52+
resources := e2e.UpdateOptions{
53+
NodeID: nodeID,
54+
Routes: []*v3routepb.RouteConfiguration{
55+
e2e.DefaultRouteConfig(rdsName, ldsName, cdsName),
56+
e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsName),
57+
},
58+
SkipValidation: true,
59+
}
60+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
61+
defer cancel()
62+
if err := mgmtServer.Update(ctx, resources); err != nil {
63+
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
64+
}
65+
66+
// Start a watch for one route configuration resource. From the watch
67+
// callback of the first resource, register two more watches (one for the
68+
// same resource name, which would be satisfied from the cache, and another
69+
// for a different resource name, which would be satisfied from the server).
70+
updateCh1 := testutils.NewChannel()
71+
updateCh2 := testutils.NewChannel()
72+
updateCh3 := testutils.NewChannel()
73+
var rdsCancel2, rdsCancel3 func()
74+
rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
75+
updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
76+
// Watch for the same resource name.
77+
rdsCancel2 = client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
78+
updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
79+
})
80+
t.Cleanup(rdsCancel2)
81+
// Watch for a different resource name.
82+
rdsCancel3 = client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) {
83+
updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
84+
rdsCancel3()
85+
})
86+
t.Cleanup(rdsCancel3)
87+
})
88+
// defer rdsCancel1()
89+
t.Cleanup(rdsCancel1)
90+
91+
// Verify the contents of the received update for the all watchers.
92+
wantUpdate12 := xdsresource.RouteConfigUpdateErrTuple{
93+
Update: xdsresource.RouteConfigUpdate{
94+
VirtualHosts: []*xdsresource.VirtualHost{
95+
{
96+
Domains: []string{ldsName},
97+
Routes: []*xdsresource.Route{
98+
{
99+
Prefix: newStringP("/"),
100+
ActionType: xdsresource.RouteActionRoute,
101+
WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}},
102+
},
103+
},
104+
},
105+
},
106+
},
107+
}
108+
wantUpdate3 := xdsresource.RouteConfigUpdateErrTuple{
109+
Update: xdsresource.RouteConfigUpdate{
110+
VirtualHosts: []*xdsresource.VirtualHost{
111+
{
112+
Domains: []string{ldsNameNewStyle},
113+
Routes: []*xdsresource.Route{
114+
{
115+
Prefix: newStringP("/"),
116+
ActionType: xdsresource.RouteActionRoute,
117+
WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}},
118+
},
119+
},
120+
},
121+
},
122+
},
123+
}
124+
if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate12); err != nil {
125+
t.Fatal(err)
126+
}
127+
if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate12); err != nil {
128+
t.Fatal(err)
129+
}
130+
if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate3); err != nil {
131+
t.Fatal(err)
132+
}
133+
}

0 commit comments

Comments
 (0)