Skip to content

Commit a3aea15

Browse files
committed
libn/networkdb: Watch() without race conditions
NetworkDB's Watch() facility is problematic to use in practice. The stream of events begins when the watch is started, so the watch cannot be used to process table entries that existed beforehand. Either option to process existing table entries is racy: walking the table before starting the watch leaves a race window where events could be missed, and walking the table after starting the watch leaves a race window where created/updated entries could be processed twice. Modify Watch() to initialize the channel with synthetic CREATE events for all existing entries owned by remote nodes before hooking it up to the live event stream. This way watchers observe an equivalent sequence of events irrespective of whether the watch was started before or after entries from remote nodes are added to the database. Remove the bespoke and racy synthetic event replay logic for driver watches from the libnetwork agent. Signed-off-by: Cory Snider <[email protected]>
1 parent ada8bc3 commit a3aea15

4 files changed

Lines changed: 183 additions & 21 deletions

File tree

libnetwork/agent.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -777,23 +777,6 @@ func (n *Network) addDriverWatches() {
777777
agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
778778
agent.mu.Unlock()
779779
go c.handleTableEvents(ch, n.handleDriverTableEvent)
780-
d, err := n.driver(false)
781-
if err != nil {
782-
log.G(context.TODO()).Errorf("Could not resolve driver %s while walking driver table: %v", n.networkType, err)
783-
return
784-
}
785-
786-
err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
787-
// skip the entries that are mark for deletion, this is safe because this function is
788-
// called at initialization time so there is no state to delete
789-
if nid == n.ID() && !deleted {
790-
d.EventNotify(driverapi.Create, nid, table.name, key, value)
791-
}
792-
return false
793-
})
794-
if err != nil {
795-
log.G(context.TODO()).WithError(err).Warn("Error while walking networkdb")
796-
}
797780
}
798781
}
799782

libnetwork/networkdb/delegate.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,17 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
169169
}
170170

171171
nDB.Lock()
172+
// Hold the lock until after we broadcast the event to watchers so that
173+
// the new watch receives either the synthesized event or the event we
174+
// broadcast, never both.
175+
defer nDB.Unlock()
172176
var entryPresent bool
173177
prev, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
174178
if err == nil {
175179
entryPresent = true
176180
// We have the latest state. Ignore the event
177181
// since it is stale.
178182
if prev.ltime >= tEvent.LTime {
179-
nDB.Unlock()
180183
return false
181184
}
182185
}
@@ -198,7 +201,6 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
198201
e.reapTime = nDB.config.reapEntryInterval
199202
}
200203
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
201-
nDB.Unlock()
202204

203205
if !entryPresent && tEvent.Type == TableEventTypeDelete {
204206
// We will rebroadcast the message for an unknown entry if all the conditions are met:

libnetwork/networkdb/watch.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package networkdb
22

33
import (
44
"net"
5+
"strings"
56

67
"github.com/docker/go-events"
78
)
@@ -42,7 +43,8 @@ type DeleteEvent event
4243
// network or any combination of the tuple. If any of the
4344
// filter is an empty string it acts as a wildcard for that
4445
// field. Watch returns a channel of events, where the events will be
45-
// sent.
46+
// sent. The watch channel is initialized with synthetic create events for all
47+
// the existing table entries not owned by this node which match the filters.
4648
func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) {
4749
var matcher events.Matcher
4850

@@ -77,6 +79,45 @@ func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) {
7779
sink = events.NewFilter(sink, matcher)
7880
}
7981

82+
// Synthesize events for all the existing table entries not owned by
83+
// this node so that the watcher receives all state without racing with
84+
// any concurrent mutations to the table.
85+
nDB.RLock()
86+
defer nDB.RUnlock()
87+
if tname == "" {
88+
var prefix []byte
89+
if nid != "" {
90+
prefix = []byte("/" + nid + "/")
91+
} else {
92+
prefix = []byte("/")
93+
}
94+
nDB.indexes[byNetwork].Root().WalkPrefix(prefix, func(path []byte, v *entry) bool {
95+
if !v.deleting && v.node != nDB.config.NodeID {
96+
tuple := strings.SplitN(string(path[1:]), "/", 3)
97+
if len(tuple) == 3 {
98+
entryNid, entryTname, key := tuple[0], tuple[1], tuple[2]
99+
sink.Write(makeEvent(opCreate, entryTname, entryNid, key, v.value))
100+
}
101+
}
102+
return false
103+
})
104+
} else {
105+
prefix := []byte("/" + tname + "/")
106+
if nid != "" {
107+
prefix = append(prefix, []byte(nid+"/")...)
108+
}
109+
nDB.indexes[byTable].Root().WalkPrefix(prefix, func(path []byte, v *entry) bool {
110+
if !v.deleting && v.node != nDB.config.NodeID {
111+
tuple := strings.SplitN(string(path[1:]), "/", 3)
112+
if len(tuple) == 3 {
113+
entryTname, entryNid, key := tuple[0], tuple[1], tuple[2]
114+
sink.Write(makeEvent(opCreate, entryTname, entryNid, key, v.value))
115+
}
116+
}
117+
return false
118+
})
119+
}
120+
80121
nDB.broadcaster.Add(sink)
81122
return ch, func() {
82123
nDB.broadcaster.Remove(sink)

libnetwork/networkdb/watch_test.go

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ func TestWatch_out_of_order(t *testing.T) {
4343
watch, cancel := nDB.Watch("table1", "network1")
4444
defer cancel()
4545

46+
got := drainChannel(watch.C)
47+
assert.Check(t, is.DeepEqual(got, []events.Event{
48+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key1", Value: []byte("value1")}),
49+
}))
50+
4651
// Receive events from node1, with events not received or received out of order
4752
// Create, (hidden update), delete
4853
appendTableEvent(4, TableEventTypeCreate, "key2", []byte("a"))
@@ -69,7 +74,7 @@ func TestWatch_out_of_order(t *testing.T) {
6974
d.NotifyMsg(msgs.Compound())
7075
msgs.Reset()
7176

72-
got := drainChannel(watch.C)
77+
got = drainChannel(watch.C)
7378
assert.Check(t, is.DeepEqual(got, []events.Event{
7479
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")}),
7580
// Delete value should match last observed value,
@@ -87,6 +92,137 @@ func TestWatch_out_of_order(t *testing.T) {
8792
}))
8893
}
8994

95+
func TestWatch_filters(t *testing.T) {
96+
nDB := new(DefaultConfig())
97+
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
98+
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
99+
assert.Assert(t, nDB.JoinNetwork("network1"))
100+
assert.Assert(t, nDB.JoinNetwork("network2"))
101+
102+
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
103+
Name: "node1",
104+
Addr: net.IPv4(1, 2, 3, 4),
105+
})
106+
107+
var ltime serf.LamportClock
108+
msgs := messageBuffer{t: t}
109+
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
110+
Type: NetworkEventTypeJoin,
111+
LTime: ltime.Increment(),
112+
NodeName: "node1",
113+
NetworkID: "network1",
114+
})
115+
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
116+
Type: NetworkEventTypeJoin,
117+
LTime: ltime.Increment(),
118+
NodeName: "node1",
119+
NetworkID: "network2",
120+
})
121+
for _, nid := range []string{"network1", "network2"} {
122+
for _, tname := range []string{"table1", "table2"} {
123+
msgs.Append(MessageTypeTableEvent, &TableEvent{
124+
Type: TableEventTypeCreate,
125+
LTime: ltime.Increment(),
126+
NodeName: "node1",
127+
NetworkID: nid,
128+
TableName: tname,
129+
Key: nid + "." + tname + ".dead",
130+
Value: []byte("deaddead"),
131+
})
132+
msgs.Append(MessageTypeTableEvent, &TableEvent{
133+
Type: TableEventTypeDelete,
134+
LTime: ltime.Increment(),
135+
NodeName: "node1",
136+
NetworkID: nid,
137+
TableName: tname,
138+
Key: nid + "." + tname + ".dead",
139+
Value: []byte("deaddead"),
140+
})
141+
msgs.Append(MessageTypeTableEvent, &TableEvent{
142+
Type: TableEventTypeCreate,
143+
LTime: ltime.Increment(),
144+
NodeName: "node1",
145+
NetworkID: nid,
146+
TableName: tname,
147+
Key: nid + "." + tname + ".update",
148+
Value: []byte("initial"),
149+
})
150+
msgs.Append(MessageTypeTableEvent, &TableEvent{
151+
Type: TableEventTypeCreate,
152+
LTime: ltime.Increment(),
153+
NodeName: "node1",
154+
NetworkID: nid,
155+
TableName: tname,
156+
Key: nid + "." + tname,
157+
Value: []byte("a"),
158+
})
159+
msgs.Append(MessageTypeTableEvent, &TableEvent{
160+
Type: TableEventTypeUpdate,
161+
LTime: ltime.Increment(),
162+
NodeName: "node1",
163+
NetworkID: nid,
164+
TableName: tname,
165+
Key: nid + "." + tname + ".update",
166+
Value: []byte("updated"),
167+
})
168+
}
169+
}
170+
(&delegate{nDB}).NotifyMsg(msgs.Compound())
171+
172+
watchAll, cancel := nDB.Watch("", "")
173+
defer cancel()
174+
watchNetwork1Tables, cancel := nDB.Watch("", "network1")
175+
defer cancel()
176+
watchTable1AllNetworks, cancel := nDB.Watch("table1", "")
177+
defer cancel()
178+
watchTable1Network1, cancel := nDB.Watch("table1", "network1")
179+
defer cancel()
180+
181+
var gotAll, gotNetwork1Tables, gotTable1AllNetworks, gotTable1Network1 []events.Event
182+
L:
183+
for {
184+
select {
185+
case ev := <-watchAll.C:
186+
gotAll = append(gotAll, ev)
187+
case ev := <-watchNetwork1Tables.C:
188+
gotNetwork1Tables = append(gotNetwork1Tables, ev)
189+
case ev := <-watchTable1AllNetworks.C:
190+
gotTable1AllNetworks = append(gotTable1AllNetworks, ev)
191+
case ev := <-watchTable1Network1.C:
192+
gotTable1Network1 = append(gotTable1Network1, ev)
193+
case <-time.After(time.Second):
194+
break L
195+
}
196+
}
197+
198+
assert.Check(t, is.DeepEqual(gotAll, []events.Event{
199+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
200+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
201+
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")}),
202+
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")}),
203+
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")}),
204+
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")}),
205+
CreateEvent(event{Table: "table2", NetworkID: "network2", Key: "network2.table2", Value: []byte("a")}),
206+
CreateEvent(event{Table: "table2", NetworkID: "network2", Key: "network2.table2.update", Value: []byte("updated")}),
207+
}))
208+
assert.Check(t, is.DeepEqual(gotNetwork1Tables, []events.Event{
209+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
210+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
211+
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")}),
212+
CreateEvent(event{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")}),
213+
}))
214+
assert.Check(t, is.DeepEqual(gotTable1AllNetworks, []events.Event{
215+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
216+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
217+
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")}),
218+
CreateEvent(event{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")}),
219+
}))
220+
assert.Check(t, is.DeepEqual(gotTable1Network1, []events.Event{
221+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}),
222+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}),
223+
}))
224+
}
225+
90226
func drainChannel(ch <-chan events.Event) []events.Event {
91227
var events []events.Event
92228
for {

0 commit comments

Comments
 (0)