Skip to content

Commit d8730dc

Browse files
committed
libnetwork/networkdb: add convergence test
Add a property-based test which asserts that a cluster of NetworkDB nodes always eventually converges to a consistent state. As this test takes a long time to run it is build-tagged to be excluded from CI. Signed-off-by: Cory Snider <[email protected]>
1 parent d86a303 commit d8730dc

27 files changed

Lines changed: 5555 additions & 4 deletions

daemon/libnetwork/networkdb/debug.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package networkdb
33
import (
44
"context"
55
"encoding/hex"
6+
"fmt"
67
"os"
8+
"strings"
79

810
"github.com/containerd/log"
911
)
@@ -43,3 +45,15 @@ func logEncKeys(ctx context.Context, keys ...[]byte) {
4345
}
4446
}
4547
}
48+
49+
func (nDB *NetworkDB) DebugDumpTable(tname string) string {
50+
nDB.RLock()
51+
root := nDB.indexes[byTable].Root()
52+
nDB.RUnlock()
53+
var sb strings.Builder
54+
root.WalkPrefix([]byte("/"+tname), func(path []byte, v *entry) bool {
55+
fmt.Fprintf(&sb, " %q: %+v\n", path, v)
56+
return false
57+
})
58+
return sb.String()
59+
}
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
//go:build slowtests
2+
3+
package networkdb
4+
5+
import (
6+
"fmt"
7+
"maps"
8+
"os"
9+
"slices"
10+
"strconv"
11+
"strings"
12+
"testing"
13+
"time"
14+
15+
"github.com/google/go-cmp/cmp"
16+
"gotest.tools/v3/poll"
17+
"pgregory.net/rapid"
18+
)
19+
20+
func TestNetworkDBAlwaysConverges(t *testing.T) {
21+
rapid.Check(t, testConvergence)
22+
}
23+
24+
func testConvergence(t *rapid.T) {
25+
numNodes := rapid.IntRange(2, 25).Draw(t, "numNodes")
26+
numNetworks := rapid.IntRange(1, 5).Draw(t, "numNetworks")
27+
28+
fsm := &networkDBFSM{
29+
nDB: createNetworkDBInstances(t, numNodes, "node", DefaultConfig()),
30+
state: make([]map[string]map[string]string, numNodes),
31+
keysUsed: make(map[string]map[string]bool),
32+
}
33+
defer closeNetworkDBInstances(t, fsm.nDB)
34+
for i := range fsm.state {
35+
fsm.state[i] = make(map[string]map[string]string)
36+
}
37+
for i := range numNetworks {
38+
nw := "nw" + strconv.Itoa(i)
39+
fsm.networks = append(fsm.networks, nw)
40+
fsm.keysUsed[nw] = make(map[string]bool)
41+
}
42+
// Drive the NetworkDB instances with a sequence of actions in random order.
43+
// We do not check for convergence until afterwards as NetworkDB is an
44+
// eventually consistent system.
45+
t.Repeat(rapid.StateMachineActions(fsm))
46+
47+
// Take the union of all entries in all networks owned by all nodes.
48+
converged := make(map[string]map[string]string)
49+
for _, state := range fsm.state {
50+
for network, entries := range state {
51+
if converged[network] == nil {
52+
converged[network] = make(map[string]string)
53+
}
54+
maps.Copy(converged[network], entries)
55+
}
56+
}
57+
expected := make(map[string]map[string]map[string]string, numNodes)
58+
for i, st := range fsm.state {
59+
exp := make(map[string]map[string]string)
60+
for k := range st {
61+
exp[k] = converged[k]
62+
}
63+
expected[fsm.nDB[i].config.NodeID] = exp
64+
}
65+
66+
t.Logf("Waiting for NetworkDB state to converge to %#v", converged)
67+
for i, st := range fsm.state {
68+
t.Logf("Node #%d (%s): %v", i, fsm.nDB[i].config.NodeID, slices.Collect(maps.Keys(st)))
69+
}
70+
t.Log("Mutations:")
71+
for _, m := range fsm.mutations {
72+
t.Log(m)
73+
}
74+
t.Log("---------------------------")
75+
76+
poll.WaitOn(t, func(t poll.LogT) poll.Result {
77+
actualState := make(map[string]map[string]map[string]string, numNodes)
78+
for _, nDB := range fsm.nDB {
79+
actual := make(map[string]map[string]string)
80+
for k, nw := range nDB.thisNodeNetworks {
81+
if !nw.leaving {
82+
actual[k] = make(map[string]string)
83+
}
84+
}
85+
actualState[nDB.config.NodeID] = actual
86+
}
87+
tableContent := make([]string, len(fsm.nDB))
88+
for i, nDB := range fsm.nDB {
89+
tableContent[i] = fmt.Sprintf("Node #%d (%s):\n%v", i, nDB.config.NodeID, nDB.DebugDumpTable("some_table"))
90+
nDB.WalkTable("some_table", func(network, key string, value []byte, deleting bool) bool {
91+
if deleting {
92+
return false
93+
}
94+
if actualState[nDB.config.NodeID][network] == nil {
95+
actualState[nDB.config.NodeID][network] = make(map[string]string)
96+
}
97+
actualState[nDB.config.NodeID][network][key] = string(value)
98+
return false
99+
})
100+
}
101+
diff := cmp.Diff(expected, actualState)
102+
if diff != "" {
103+
return poll.Continue("NetworkDB state has not converged:\n%v\n%v", diff, strings.Join(tableContent, "\n\n"))
104+
}
105+
return poll.Success()
106+
}, poll.WithTimeout(5*time.Minute), poll.WithDelay(200*time.Millisecond))
107+
108+
convergenceTime := time.Since(fsm.lastMutation)
109+
t.Logf("NetworkDB state converged in %v", convergenceTime)
110+
111+
// Log the convergence time to disk for later statistical analysis.
112+
113+
if err := os.Mkdir("testdata", 0755); err != nil && !os.IsExist(err) {
114+
t.Logf("Could not log convergence time to disk: failed to create testdata directory: %v", err)
115+
return
116+
}
117+
f, err := os.OpenFile("testdata/convergence_time.csv", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
118+
if err != nil {
119+
t.Logf("Could not log convergence time to disk: failed to open file: %v", err)
120+
return
121+
}
122+
defer func() {
123+
if err := f.Close(); err != nil {
124+
t.Logf("Could not log convergence time to disk: error closing file: %v", err)
125+
}
126+
}()
127+
if st, err := f.Stat(); err != nil {
128+
t.Logf("Could not log convergence time to disk: failed to stat file: %v", err)
129+
return
130+
} else if st.Size() == 0 {
131+
f.WriteString("Nodes,Networks,#Mutations,Convergence(ns)\n")
132+
}
133+
if _, err := fmt.Fprintf(f, "%v,%v,%v,%d\n", numNodes, numNetworks, len(fsm.mutations), convergenceTime); err != nil {
134+
t.Logf("Could not log convergence time to disk: failed to write to file: %v", err)
135+
return
136+
}
137+
}
138+
139+
// networkDBFSM is a [rapid.StateMachine] providing the set of actions available
140+
// for rapid to drive NetworkDB with in tests. See also
141+
// [rapid.StateMachineActions] and [rapid.Repeat].
142+
type networkDBFSM struct {
143+
nDB []*NetworkDB
144+
networks []string // list of networks which can be joined
145+
// node -> joined-network -> key -> value
146+
state []map[string]map[string]string
147+
148+
// Remember entry keys that have been used before to avoid trying to
149+
// create colliding keys. Due to how quickly the FSM runs, it is
150+
// possible for a node to not have learned that the previous generation
151+
// of the key was deleted before we try to create it again.
152+
// network -> key -> true
153+
keysUsed map[string]map[string]bool
154+
155+
// Timestamp of the most recent state-machine action which perturbed the
156+
// system state.
157+
lastMutation time.Time
158+
mutations []string
159+
}
160+
161+
func (u *networkDBFSM) mutated(nodeidx int, action, network, key, value string) {
162+
u.lastMutation = time.Now()
163+
desc := fmt.Sprintf(" [%v] #%d(%v):%v(%s", u.lastMutation, nodeidx, u.nDB[nodeidx].config.NodeID, action, network)
164+
if key != "" {
165+
desc += fmt.Sprintf(", %s=%s", key, value)
166+
}
167+
desc += ")"
168+
u.mutations = append(u.mutations, desc)
169+
}
170+
171+
func (u *networkDBFSM) Check(t *rapid.T) {
172+
// This method is required to implement the [rapid.StateMachine]
173+
// interface. But there is nothing much to check stepwise as we are
174+
// testing an eventually consistent system. The checks happen after
175+
// rapid is done randomly driving the FSM.
176+
}
177+
178+
func (u *networkDBFSM) JoinNetwork(t *rapid.T) {
179+
// Pick a node that has not joined all networks...
180+
var nodes []int
181+
for i, s := range u.state {
182+
if len(s) < len(u.networks) {
183+
nodes = append(nodes, i)
184+
}
185+
}
186+
if len(nodes) == 0 {
187+
t.Skip("All nodes are already joined to all networks")
188+
}
189+
nodeidx := rapid.SampledFrom(nodes).Draw(t, "node")
190+
191+
// ... and a network to join.
192+
networks := slices.DeleteFunc(slices.Clone(u.networks), func(n string) bool {
193+
_, ok := u.state[nodeidx][n]
194+
return ok
195+
})
196+
nw := rapid.SampledFrom(networks).Draw(t, "network")
197+
198+
if err := u.nDB[nodeidx].JoinNetwork(nw); err != nil {
199+
t.Errorf("Node %v failed to join network %s: %v", nodeidx, nw, err)
200+
} else {
201+
u.state[nodeidx][nw] = make(map[string]string)
202+
u.mutated(nodeidx, "JoinNetwork", nw, "", "")
203+
}
204+
}
205+
206+
// drawJoinedNode returns a random node that has joined at least one network.
207+
func (u *networkDBFSM) drawJoinedNodeAndNetwork(t *rapid.T) (nodeidx int, nw string) {
208+
var nodes []int
209+
for i, s := range u.state {
210+
if len(s) > 0 {
211+
nodes = append(nodes, i)
212+
}
213+
}
214+
if len(nodes) == 0 {
215+
t.Skip("No node is joined to any network")
216+
}
217+
nodeidx = rapid.SampledFrom(nodes).Draw(t, "node")
218+
219+
nw = rapid.SampledFrom(slices.Collect(maps.Keys(u.state[nodeidx]))).Draw(t, "network")
220+
return nodeidx, nw
221+
}
222+
223+
func (u *networkDBFSM) LeaveNetwork(t *rapid.T) {
224+
nodeidx, nw := u.drawJoinedNodeAndNetwork(t)
225+
if err := u.nDB[nodeidx].LeaveNetwork(nw); err != nil {
226+
t.Errorf("Node %v failed to leave network %s: %v", nodeidx, nw, err)
227+
} else {
228+
delete(u.state[nodeidx], nw)
229+
u.mutated(nodeidx, "LeaveNetwork", nw, "", "")
230+
}
231+
}
232+
233+
func (u *networkDBFSM) CreateEntry(t *rapid.T) {
234+
nodeidx, nw := u.drawJoinedNodeAndNetwork(t)
235+
key := rapid.StringMatching(`[a-z]{3,25}`).
236+
Filter(func(s string) bool { return !u.keysUsed[nw][s] }).
237+
Draw(t, "key")
238+
value := rapid.StringMatching(`[a-z]{5,20}`).Draw(t, "value")
239+
240+
if err := u.nDB[nodeidx].CreateEntry("some_table", nw, key, []byte(value)); err != nil {
241+
t.Errorf("Node %v failed to create entry %s=%s in network %s: %v", nodeidx, key, value, nw, err)
242+
} else {
243+
u.state[nodeidx][nw][key] = value
244+
u.keysUsed[nw][key] = true
245+
u.mutated(nodeidx, "CreateEntry", nw, key, value)
246+
}
247+
}
248+
249+
// drawOwnedDBKey returns a random key in nw owned by the node at nodeidx.
250+
func (u *networkDBFSM) drawOwnedDBKey(t *rapid.T, nodeidx int, nw string) string {
251+
keys := slices.Collect(maps.Keys(u.state[nodeidx][nw]))
252+
if len(keys) == 0 {
253+
t.Skipf("Node %v owns no entries in network %s", nodeidx, nw)
254+
panic("unreachable")
255+
}
256+
return rapid.SampledFrom(keys).Draw(t, "key")
257+
}
258+
259+
func (u *networkDBFSM) UpdateEntry(t *rapid.T) {
260+
nodeidx, nw := u.drawJoinedNodeAndNetwork(t)
261+
key := u.drawOwnedDBKey(t, nodeidx, nw)
262+
value := rapid.StringMatching(`[a-z]{5,20}`).Draw(t, "value")
263+
264+
if err := u.nDB[nodeidx].UpdateEntry("some_table", nw, key, []byte(value)); err != nil {
265+
t.Errorf("Node %v failed to update entry %s=%s in network %s: %v", nodeidx, key, value, nw, err)
266+
} else {
267+
u.state[nodeidx][nw][key] = value
268+
u.mutated(nodeidx, "UpdateEntry", nw, key, value)
269+
}
270+
}
271+
272+
func (u *networkDBFSM) DeleteEntry(t *rapid.T) {
273+
nodeidx, nw := u.drawJoinedNodeAndNetwork(t)
274+
key := u.drawOwnedDBKey(t, nodeidx, nw)
275+
276+
if err := u.nDB[nodeidx].DeleteEntry("some_table", nw, key); err != nil {
277+
t.Errorf("Node %v failed to delete entry %s in network %s: %v", nodeidx, key, nw, err)
278+
} else {
279+
delete(u.state[nodeidx][nw], key)
280+
u.mutated(nodeidx, "DeleteEntry", nw, key, "")
281+
}
282+
}
283+
284+
func (u *networkDBFSM) Sleep(t *rapid.T) {
285+
duration := time.Duration(rapid.IntRange(10, 500).Draw(t, "duration")) * time.Millisecond
286+
time.Sleep(duration)
287+
}

daemon/libnetwork/networkdb/networkdb_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,20 @@ func TestMain(m *testing.M) {
3434
os.Exit(m.Run())
3535
}
3636

37-
func launchNode(t *testing.T, conf Config) *NetworkDB {
37+
type TestingT interface {
38+
assert.TestingT
39+
poll.TestingT
40+
Helper()
41+
}
42+
43+
func launchNode(t TestingT, conf Config) *NetworkDB {
3844
t.Helper()
3945
db, err := New(&conf)
4046
assert.NilError(t, err)
4147
return db
4248
}
4349

44-
func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
50+
func createNetworkDBInstances(t TestingT, num int, namePrefix string, conf *Config) []*NetworkDB {
4551
t.Helper()
4652
var dbs []*NetworkDB
4753
for i := 0; i < num; i++ {
@@ -69,12 +75,12 @@ func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Co
6975
}
7076
return poll.Success()
7177
}
72-
poll.WaitOn(t, check, poll.WithDelay(2*time.Second), poll.WithTimeout(20*time.Second))
78+
poll.WaitOn(t, check, poll.WithDelay(2*time.Second), poll.WithTimeout(20*time.Second+time.Duration(num-1)*10*time.Second))
7379

7480
return dbs
7581
}
7682

77-
func closeNetworkDBInstances(t *testing.T, dbs []*NetworkDB) {
83+
func closeNetworkDBInstances(t TestingT, dbs []*NetworkDB) {
7884
t.Helper()
7985
log.G(context.TODO()).Print("Closing DB instances...")
8086
for _, db := range dbs {

vendor.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ require (
115115
google.golang.org/grpc v1.72.2
116116
google.golang.org/protobuf v1.36.6
117117
gotest.tools/v3 v3.5.2
118+
pgregory.net/rapid v1.2.0
118119
resenje.org/singleflight v0.4.3
119120
tags.cncf.io/container-device-interface v1.0.1
120121
)

vendor.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,8 @@ kernel.org/pub/linux/libs/security/libcap/cap v1.2.76 h1:mrdLPj8ujM6eIKGtd1PkkuC
840840
kernel.org/pub/linux/libs/security/libcap/cap v1.2.76/go.mod h1:7V2BQeHnVAQwhCnCPJ977giCeGDiywVewWF+8vkpPlc=
841841
kernel.org/pub/linux/libs/security/libcap/psx v1.2.76 h1:3DyzQ30OHt3wiOZVL1se2g1PAPJIU7+tMUyvfMUj1dY=
842842
kernel.org/pub/linux/libs/security/libcap/psx v1.2.76/go.mod h1:+l6Ee2F59XiJ2I6WR5ObpC1utCQJZ/VLsEbQCD8RG24=
843+
pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk=
844+
pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
843845
resenje.org/singleflight v0.4.3 h1:l7foFYg8X/VEHPxWs1K/Pw77807RMVzvXgWGb0J1sdM=
844846
resenje.org/singleflight v0.4.3/go.mod h1:lAgQK7VfjG6/pgredbQfmV0RvG/uVhKo6vSuZ0vCWfk=
845847
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=

vendor/modules.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1701,6 +1701,9 @@ k8s.io/klog/v2/internal/dbg
17011701
k8s.io/klog/v2/internal/serialize
17021702
k8s.io/klog/v2/internal/severity
17031703
k8s.io/klog/v2/internal/sloghandler
1704+
# pgregory.net/rapid v1.2.0
1705+
## explicit; go 1.18
1706+
pgregory.net/rapid
17041707
# resenje.org/singleflight v0.4.3
17051708
## explicit; go 1.18
17061709
resenje.org/singleflight

vendor/pgregory.net/rapid/.gitattributes

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/pgregory.net/rapid/.gitignore

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)