Skip to content

Commit c68671d

Browse files
committed
libn/networkdb: b'cast watch events from local POV
NetworkDB gossips changes to table entries to other nodes using distinct CREATE, UPDATE and DELETE events. It is unfortunate that the wire protocol distinguishes CREATEs from UPDATEs as nothing useful can be done with this information. Newer events for an entry invalidate older ones, so there is no guarantee that a CREATE event is broadcast to any node before an UPDATE is broadcast. And due to the nature of gossip protocols, even if the CREATE event is broadcast from the originating node, there is no guarantee that any particular node will receive the CREATE before an UPDATE. Any code which handles an UPDATE event differently from a CREATE event is therefore going to behave in unexpected ways in less than perfect conditions. NetworkDB table watchers also receive CREATE, UPDATE and DELETE events. Since the watched tables are local to the node, the events could all have well-defined meanings that are actually useful. Unfortunately NetworkDB is just bubbling up the wire-protocol event types to the watchers. Redefine the table-watch events such that a CREATE event is broadcast when an entry pops into existence in the local NetworkDB, an UPDATE event is broadcast when an entry which was already present in the NetworkDB state is modified, and a DELETE event is broadcast when an entry which was already present in the NetworkDB state is marked for deletion. DELETE events are broadcast with the same value as the most recent CREATE or UPDATE event for the entry. The handler for endpoint table events in the libnetwork agent assumed incorrectly that CREATE events always correspond to adding a new active endpoint and that UPDATE events always correspond to disabling an endpoint. Fix up the handler to handle CREATE and UPDATE events using the same code path, checking the table entry's ServiceDisabled flag to determine which action to take. Signed-off-by: Cory Snider <[email protected]>
1 parent 294f0c3 commit c68671d

4 files changed

Lines changed: 194 additions & 44 deletions

File tree

libnetwork/agent.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,7 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
911911

912912
err := proto.Unmarshal(value, &epRec)
913913
if err != nil {
914-
log.G(context.TODO()).Errorf("Failed to unmarshal service table value: %v", err)
914+
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
915915
return
916916
}
917917

@@ -924,53 +924,54 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
924924
serviceAliases := epRec.Aliases
925925
taskAliases := epRec.TaskAliases
926926

927+
logger := log.G(context.TODO()).WithFields(log.Fields{
928+
"nid": nid,
929+
"eid": eid,
930+
"T": fmt.Sprintf("%T", ev),
931+
"R": epRec,
932+
})
933+
927934
if containerName == "" || ip == nil {
928-
log.G(context.TODO()).Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
935+
logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
929936
return
930937
}
931938

939+
logger.Debug("handleEpTableEvent")
940+
932941
switch ev.(type) {
933-
case networkdb.CreateEvent:
934-
log.G(context.TODO()).Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
942+
case networkdb.CreateEvent, networkdb.UpdateEvent:
935943
if svcID != "" {
936944
// This is a remote task part of a service
937-
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
938-
log.G(context.TODO()).Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
939-
return
945+
if epRec.ServiceDisabled {
946+
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
947+
logger.WithError(err).Error("failed disabling service binding")
948+
return
949+
}
950+
} else {
951+
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
952+
logger.WithError(err).Error("failed adding service binding")
953+
return
954+
}
940955
}
941956
} else {
942957
// This is a remote container simply attached to an attachable network
943958
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
944-
log.G(context.TODO()).Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
959+
logger.WithError(err).Errorf("failed adding container name resolution")
945960
}
946961
}
947962

948963
case networkdb.DeleteEvent:
949-
log.G(context.TODO()).Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
950964
if svcID != "" {
951965
// This is a remote task part of a service
952966
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
953-
log.G(context.TODO()).Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
967+
logger.WithError(err).Error("failed removing service binding")
954968
return
955969
}
956970
} else {
957971
// This is a remote container simply attached to an attachable network
958972
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
959-
log.G(context.TODO()).Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
973+
logger.WithError(err).Errorf("failed removing container name resolution")
960974
}
961975
}
962-
case networkdb.UpdateEvent:
963-
log.G(context.TODO()).Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
964-
// We currently should only get these to inform us that an endpoint
965-
// is disabled. Report if otherwise.
966-
if svcID == "" || !epRec.ServiceDisabled {
967-
log.G(context.TODO()).Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
968-
return
969-
}
970-
// This is a remote task that is part of a service that is now disabled
971-
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
972-
log.G(context.TODO()).Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
973-
return
974-
}
975976
}
976977
}

libnetwork/networkdb/delegate.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,13 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
169169
}
170170

171171
nDB.Lock()
172-
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
172+
var entryPresent bool
173+
prev, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
173174
if err == nil {
175+
entryPresent = true
174176
// We have the latest state. Ignore the event
175177
// since it is stale.
176-
if e.ltime >= tEvent.LTime {
178+
if prev.ltime >= tEvent.LTime {
177179
nDB.Unlock()
178180
return false
179181
}
@@ -187,7 +189,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
187189
return false
188190
}
189191

190-
e = &entry{
192+
e := &entry{
191193
ltime: tEvent.LTime,
192194
node: tEvent.NodeName,
193195
value: tEvent.Value,
@@ -221,18 +223,33 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
221223
}
222224

223225
var op opType
226+
value := tEvent.Value
224227
switch tEvent.Type {
225-
case TableEventTypeCreate:
228+
case TableEventTypeCreate, TableEventTypeUpdate:
229+
// Gossip messages could arrive out-of-order so it is possible
230+
// for an entry's UPDATE event to be received before its CREATE
231+
// event. The local watchers should not need to care about such
232+
// nuances. Broadcast events to watchers based only on what
233+
// changed in the local NetworkDB state.
226234
op = opCreate
227-
case TableEventTypeUpdate:
228-
op = opUpdate
235+
if entryPresent && !prev.deleting {
236+
op = opUpdate
237+
}
229238
case TableEventTypeDelete:
239+
if !entryPresent || prev.deleting {
240+
goto SkipBroadcast
241+
}
230242
op = opDelete
243+
// Broadcast the value most recently observed by watchers,
244+
// which may be different from the value in the DELETE event
245+
// (e.g. if the DELETE event was received out-of-order).
246+
value = prev.value
231247
default:
232248
// TODO(thaJeztah): make switch exhaustive; add networkdb.TableEventTypeInvalid
233249
}
234250

235-
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
251+
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, value))
252+
SkipBroadcast:
236253
return network.inSync
237254
}
238255

libnetwork/networkdb/networkdb.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -252,14 +252,27 @@ func DefaultConfig() *Config {
252252
// New creates a new instance of NetworkDB using the Config passed by
253253
// the caller.
254254
func New(c *Config) (*NetworkDB, error) {
255+
nDB := new(c)
256+
log.G(context.TODO()).Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
257+
if err := nDB.clusterInit(); err != nil {
258+
return nil, err
259+
}
260+
261+
return nDB, nil
262+
}
263+
264+
func new(c *Config) *NetworkDB {
255265
// The garbage collection logic for entries leverage the presence of the network.
256266
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
257267
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
258268
c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod
259269

260-
nDB := &NetworkDB{
261-
config: c,
262-
indexes: make(map[int]*iradix.Tree[*entry]),
270+
return &NetworkDB{
271+
config: c,
272+
indexes: map[int]*iradix.Tree[*entry]{
273+
byTable: iradix.New[*entry](),
274+
byNetwork: iradix.New[*entry](),
275+
},
263276
networks: make(map[string]map[string]*network),
264277
nodes: make(map[string]*node),
265278
failedNodes: make(map[string]*node),
@@ -268,16 +281,6 @@ func New(c *Config) (*NetworkDB, error) {
268281
bulkSyncAckTbl: make(map[string]chan struct{}),
269282
broadcaster: events.NewBroadcaster(),
270283
}
271-
272-
nDB.indexes[byTable] = iradix.New[*entry]()
273-
nDB.indexes[byNetwork] = iradix.New[*entry]()
274-
275-
log.G(context.TODO()).Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
276-
if err := nDB.clusterInit(); err != nil {
277-
return nil, err
278-
}
279-
280-
return nDB, nil
281284
}
282285

283286
// Join joins this NetworkDB instance with a list of peer NetworkDB

libnetwork/networkdb/watch_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package networkdb
2+
3+
import (
4+
"net"
5+
"testing"
6+
"time"
7+
8+
"github.com/docker/go-events"
9+
"github.com/hashicorp/memberlist"
10+
"github.com/hashicorp/serf/serf"
11+
"gotest.tools/v3/assert"
12+
is "gotest.tools/v3/assert/cmp"
13+
)
14+
15+
func TestWatch_out_of_order(t *testing.T) {
16+
nDB := new(DefaultConfig())
17+
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
18+
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
19+
assert.Assert(t, nDB.JoinNetwork("network1"))
20+
21+
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
22+
Name: "node1",
23+
Addr: net.IPv4(1, 2, 3, 4),
24+
})
25+
26+
d := &delegate{nDB}
27+
28+
msgs := messageBuffer{t: t}
29+
appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1")
30+
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
31+
Type: NetworkEventTypeJoin,
32+
LTime: 1,
33+
NodeName: "node1",
34+
NetworkID: "network1",
35+
})
36+
appendTableEvent(1, TableEventTypeCreate, "tombstone1", []byte("a"))
37+
appendTableEvent(2, TableEventTypeDelete, "tombstone1", []byte("b"))
38+
appendTableEvent(3, TableEventTypeCreate, "key1", []byte("value1"))
39+
d.NotifyMsg(msgs.Compound())
40+
msgs.Reset()
41+
42+
nDB.CreateEntry("table1", "network1", "local1", []byte("should not see me in watch events"))
43+
watch, cancel := nDB.Watch("table1", "network1")
44+
defer cancel()
45+
46+
// Receive events from node1, with events not received or received out of order
47+
// Create, (hidden update), delete
48+
appendTableEvent(4, TableEventTypeCreate, "key2", []byte("a"))
49+
appendTableEvent(6, TableEventTypeDelete, "key2", []byte("b"))
50+
// (Hidden recreate), delete
51+
appendTableEvent(8, TableEventTypeDelete, "key2", []byte("c"))
52+
// (Hidden recreate), update
53+
appendTableEvent(10, TableEventTypeUpdate, "key2", []byte("d"))
54+
55+
// Update, create
56+
appendTableEvent(11, TableEventTypeUpdate, "key3", []byte("b"))
57+
appendTableEvent(10, TableEventTypeCreate, "key3", []byte("a"))
58+
59+
// (Hidden create), update, update
60+
appendTableEvent(13, TableEventTypeUpdate, "key4", []byte("b"))
61+
appendTableEvent(14, TableEventTypeUpdate, "key4", []byte("c"))
62+
63+
d.NotifyMsg(msgs.Compound())
64+
msgs.Reset()
65+
66+
got := drainChannel(watch.C)
67+
assert.Check(t, is.DeepEqual(got, []events.Event{
68+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")}),
69+
// Delete value should match last observed value,
70+
// irrespective of the content of the delete event over the wire.
71+
DeleteEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")}),
72+
// Updates to previously-deleted keys should be observed as creates.
73+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("d")}),
74+
75+
// Out-of-order update events should be observed as creates.
76+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key3", Value: []byte("b")}),
77+
CreateEvent(event{Table: "table1", NetworkID: "network1", Key: "key4", Value: []byte("b")}),
78+
UpdateEvent(event{Table: "table1", NetworkID: "network1", Key: "key4", Value: []byte("c")}),
79+
}))
80+
}
81+
82+
func drainChannel(ch <-chan events.Event) []events.Event {
83+
var events []events.Event
84+
for {
85+
select {
86+
case ev := <-ch:
87+
events = append(events, ev)
88+
case <-time.After(time.Second):
89+
return events
90+
}
91+
}
92+
}
93+
94+
type messageBuffer struct {
95+
t *testing.T
96+
msgs [][]byte
97+
}
98+
99+
func (mb *messageBuffer) Append(typ MessageType, msg any) {
100+
mb.t.Helper()
101+
buf, err := encodeMessage(typ, msg)
102+
if err != nil {
103+
mb.t.Fatalf("failed to encode message: %v", err)
104+
}
105+
mb.msgs = append(mb.msgs, buf)
106+
}
107+
108+
func (mb *messageBuffer) Compound() []byte {
109+
return makeCompoundMessage(mb.msgs)
110+
}
111+
112+
func (mb *messageBuffer) Reset() {
113+
mb.msgs = nil
114+
}
115+
116+
func tableEventHelper(mb *messageBuffer, nodeName, networkID, tableName string) func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
117+
return func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
118+
mb.t.Helper()
119+
mb.Append(MessageTypeTableEvent, &TableEvent{
120+
Type: typ,
121+
LTime: ltime,
122+
NodeName: nodeName,
123+
NetworkID: networkID,
124+
TableName: tableName,
125+
Key: key,
126+
Value: value,
127+
})
128+
}
129+
}

0 commit comments

Comments
 (0)