Skip to content

Commit 69c3c56

Browse files
committed
libn/networkdb: report prev value in update events
When handling updates to existing entries, it is often necessary to know what the previous value was. NetworkDB knows the previous and new values when it broadcasts an update event for an entry. Include both values in the update event so the watchers do not have to do their own parallel bookkeeping. Unify the event types under WatchEvent as representing the operation kind in the type system has been inconvenient, not useful. The operation is now implied by the nilness of the Value and Prev event fields. Signed-off-by: Cory Snider <[email protected]>
1 parent e1281f0 commit 69c3c56

8 files changed

Lines changed: 162 additions & 176 deletions

File tree

daemon/libnetwork/agent.go

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -817,30 +817,23 @@ func (n *Network) handleDriverTableEvent(ev events.Event) {
817817

818818
var (
819819
etype driverapi.EventType
820-
tname string
821-
key string
822820
value []byte
823821
)
824822

825-
switch event := ev.(type) {
826-
case networkdb.CreateEvent:
827-
tname = event.Table
828-
key = event.Key
823+
event := ev.(networkdb.WatchEvent)
824+
switch {
825+
case event.IsCreate():
829826
value = event.Value
830827
etype = driverapi.Create
831-
case networkdb.DeleteEvent:
832-
tname = event.Table
833-
key = event.Key
834-
value = event.Value
828+
case event.IsDelete():
829+
value = event.Prev
835830
etype = driverapi.Delete
836-
case networkdb.UpdateEvent:
837-
tname = event.Table
838-
key = event.Key
831+
case event.IsUpdate():
839832
value = event.Value
840833
etype = driverapi.Update
841834
}
842835

843-
d.EventNotify(etype, n.ID(), tname, key, value)
836+
d.EventNotify(etype, n.ID(), event.Table, event.Key, value)
844837
}
845838

846839
func (c *Controller) handleNodeTableEvent(ev events.Event) {
@@ -849,13 +842,14 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
849842
isAdd bool
850843
nodeAddr networkdb.NodeAddr
851844
)
852-
switch event := ev.(type) {
853-
case networkdb.CreateEvent:
845+
event := ev.(networkdb.WatchEvent)
846+
switch {
847+
case event.IsCreate():
854848
value = event.Value
855849
isAdd = true
856-
case networkdb.DeleteEvent:
857-
value = event.Value
858-
case networkdb.UpdateEvent:
850+
case event.IsDelete():
851+
value = event.Prev
852+
case event.IsUpdate():
859853
log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event)
860854
}
861855

@@ -868,37 +862,27 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
868862
}
869863

870864
func (c *Controller) handleEpTableEvent(ev events.Event) {
871-
var (
872-
nid string
873-
eid string
874-
value []byte
875-
epRec EndpointRecord
876-
)
877-
878-
switch event := ev.(type) {
879-
case networkdb.CreateEvent:
880-
nid = event.NetworkID
881-
eid = event.Key
882-
value = event.Value
883-
case networkdb.DeleteEvent:
884-
nid = event.NetworkID
885-
eid = event.Key
886-
value = event.Value
887-
case networkdb.UpdateEvent:
888-
nid = event.NetworkID
889-
eid = event.Key
865+
var value []byte
866+
event := ev.(networkdb.WatchEvent)
867+
switch {
868+
case event.IsCreate(), event.IsUpdate():
890869
value = event.Value
870+
case event.IsDelete():
871+
value = event.Prev
891872
default:
892873
log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
893874
return
894875
}
895876

877+
var epRec EndpointRecord
896878
err := proto.Unmarshal(value, &epRec)
897879
if err != nil {
898880
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
899881
return
900882
}
901883

884+
nid := event.NetworkID
885+
eid := event.Key
902886
containerName := epRec.Name
903887
svcName := epRec.ServiceName
904888
svcID := epRec.ServiceID
@@ -909,9 +893,7 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
909893
taskAliases := epRec.TaskAliases
910894

911895
logger := log.G(context.TODO()).WithFields(log.Fields{
912-
"nid": nid,
913-
"eid": eid,
914-
"T": fmt.Sprintf("%T", ev),
896+
"evt": event,
915897
"R": epRec,
916898
})
917899

@@ -922,8 +904,8 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
922904

923905
logger.Debug("handleEpTableEvent")
924906

925-
switch ev.(type) {
926-
case networkdb.CreateEvent, networkdb.UpdateEvent:
907+
switch {
908+
case event.IsCreate(), event.IsUpdate():
927909
if svcID != "" {
928910
// This is a remote task part of a service
929911
if epRec.ServiceDisabled {
@@ -944,7 +926,7 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
944926
}
945927
}
946928

947-
case networkdb.DeleteEvent:
929+
case event.IsDelete():
948930
if svcID != "" {
949931
// This is a remote task part of a service
950932
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {

daemon/libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,6 @@ func watchTableEntries(w http.ResponseWriter, r *http.Request) {
8080
}
8181

8282
func handleTableEvents(tableName string, ch *events.Channel) {
83-
var (
84-
// nid string
85-
eid string
86-
value []byte
87-
isAdd bool
88-
)
89-
9083
log.G(context.TODO()).Infof("Started watching table:%s", tableName)
9184
for {
9285
select {
@@ -96,26 +89,16 @@ func handleTableEvents(tableName string, ch *events.Channel) {
9689

9790
case evt := <-ch.C:
9891
log.G(context.TODO()).Infof("Received new event on:%s", tableName)
99-
switch event := evt.(type) {
100-
case networkdb.CreateEvent:
101-
// nid = event.NetworkID
102-
eid = event.Key
103-
value = event.Value
104-
isAdd = true
105-
case networkdb.DeleteEvent:
106-
// nid = event.NetworkID
107-
eid = event.Key
108-
value = event.Value
109-
isAdd = false
110-
default:
92+
event, ok := evt.(networkdb.WatchEvent)
93+
if !ok {
11194
log.G(context.TODO()).Fatalf("Unexpected table event = %#v", event)
11295
}
113-
if isAdd {
114-
// log.G(ctx).Infof("Add %s %s", tableName, eid)
115-
clientWatchTable[tableName].entries[eid] = string(value)
96+
if event.Value != nil {
97+
// log.G(ctx).Infof("Add %s %s", tableName, event.Key)
98+
clientWatchTable[tableName].entries[event.Key] = string(event.Value)
11699
} else {
117-
// log.G(ctx).Infof("Del %s %s", tableName, eid)
118-
delete(clientWatchTable[tableName].entries, eid)
100+
// log.G(ctx).Infof("Del %s %s", tableName, event.Key)
101+
delete(clientWatchTable[tableName].entries, event.Key)
119102
}
120103
}
121104
}

daemon/libnetwork/networkdb/delegate.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -215,33 +215,35 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
215215
return isBulkSync && network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
216216
}
217217

218-
var op opType
219-
value := tEvent.Value
218+
event := WatchEvent{
219+
Table: tEvent.TableName,
220+
NetworkID: tEvent.NetworkID,
221+
Key: tEvent.Key,
222+
}
220223
switch tEvent.Type {
221224
case TableEventTypeCreate, TableEventTypeUpdate:
222225
// Gossip messages could arrive out-of-order so it is possible
223226
// for an entry's UPDATE event to be received before its CREATE
224227
// event. The local watchers should not need to care about such
225228
// nuances. Broadcast events to watchers based only on what
226229
// changed in the local NetworkDB state.
227-
op = opCreate
230+
event.Value = tEvent.Value
228231
if entryPresent && !prev.deleting {
229-
op = opUpdate
232+
event.Prev = prev.value
230233
}
231234
case TableEventTypeDelete:
232235
if !entryPresent || prev.deleting {
233236
goto SkipBroadcast
234237
}
235-
op = opDelete
236238
// Broadcast the value most recently observed by watchers,
237239
// which may be different from the value in the DELETE event
238240
// (e.g. if the DELETE event was received out-of-order).
239-
value = prev.value
241+
event.Prev = prev.value
240242
default:
241243
// TODO(thaJeztah): make switch exhaustive; add networkdb.TableEventTypeInvalid
242244
}
243245

244-
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, value))
246+
nDB.broadcaster.Write(event)
245247
SkipBroadcast:
246248
return network.inSync
247249
}

daemon/libnetwork/networkdb/event_delegate.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,32 @@ type eventDelegate struct {
1313
nDB *NetworkDB
1414
}
1515

16-
func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
16+
type nodeEventOp bool
17+
18+
const (
19+
notifyNodeJoined nodeEventOp = true
20+
notifyNodeLeft nodeEventOp = false
21+
)
22+
23+
func (e *eventDelegate) broadcastNodeEvent(addr net.IP, kind nodeEventOp) {
1724
value, err := json.Marshal(&NodeAddr{addr})
18-
if err == nil {
19-
e.nDB.broadcaster.Write(makeEvent(op, NodeTable, "", "", value))
20-
} else {
25+
if err != nil {
2126
log.G(context.TODO()).Errorf("Error marshalling node broadcast event %s", addr.String())
27+
return
28+
}
29+
event := WatchEvent{Table: NodeTable}
30+
switch kind {
31+
case notifyNodeJoined:
32+
event.Value = value
33+
case notifyNodeLeft:
34+
event.Prev = value
2235
}
36+
e.nDB.broadcaster.Write(event)
2337
}
2438

2539
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
2640
log.G(context.TODO()).Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
27-
e.broadcastNodeEvent(mn.Addr, opCreate)
41+
e.broadcastNodeEvent(mn.Addr, notifyNodeJoined)
2842
e.nDB.Lock()
2943
defer e.nDB.Unlock()
3044

@@ -46,7 +60,7 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
4660

4761
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
4862
log.G(context.TODO()).Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
49-
e.broadcastNodeEvent(mn.Addr, opDelete)
63+
e.broadcastNodeEvent(mn.Addr, notifyNodeLeft)
5064

5165
e.nDB.Lock()
5266
defer e.nDB.Unlock()

daemon/libnetwork/networkdb/networkdb.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,12 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
546546

547547
// Notify to the upper layer only entries not already marked for deletion
548548
if !oldEntry.deleting {
549-
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, oldEntry.value))
549+
nDB.broadcaster.Write(WatchEvent{
550+
Table: tName,
551+
NetworkID: nwID,
552+
Key: key,
553+
Prev: oldEntry.value,
554+
})
550555
}
551556
return false
552557
})
@@ -566,7 +571,12 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
566571
nDB.deleteEntry(nwID, tName, key)
567572

568573
if !oldEntry.deleting {
569-
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, oldEntry.value))
574+
nDB.broadcaster.Write(WatchEvent{
575+
Table: tName,
576+
NetworkID: nwID,
577+
Key: key,
578+
Prev: oldEntry.value,
579+
})
570580
}
571581
return false
572582
})
@@ -693,7 +703,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
693703
nDB.deleteEntry(nwID, tName, key)
694704
}
695705
if !oldEntry.deleting {
696-
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, oldEntry.value))
706+
nDB.broadcaster.Write(WatchEvent{
707+
Table: tName,
708+
NetworkID: nwID,
709+
Key: key,
710+
Prev: oldEntry.value,
711+
})
697712
}
698713
return false
699714
})

daemon/libnetwork/networkdb/networkdb_test.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -174,26 +174,25 @@ func (nDB *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value
174174
nDB.config.Hostname, nDB.config.NodeID, tname, key, nid, present, value, !present, string(v))
175175
}
176176

177-
func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
177+
func testWatch(t *testing.T, ch chan events.Event, tname, nid, key, prev, value string) {
178178
t.Helper()
179179
select {
180180
case rcvdEv := <-ch:
181-
assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)))
182-
switch typ := rcvdEv.(type) {
183-
case CreateEvent:
184-
assert.Check(t, is.Equal(tname, typ.Table))
185-
assert.Check(t, is.Equal(nid, typ.NetworkID))
186-
assert.Check(t, is.Equal(key, typ.Key))
187-
assert.Check(t, is.Equal(value, string(typ.Value)))
188-
case UpdateEvent:
189-
assert.Check(t, is.Equal(tname, typ.Table))
190-
assert.Check(t, is.Equal(nid, typ.NetworkID))
191-
assert.Check(t, is.Equal(key, typ.Key))
192-
assert.Check(t, is.Equal(value, string(typ.Value)))
193-
case DeleteEvent:
181+
typ, ok := rcvdEv.(WatchEvent)
182+
if assert.Check(t, ok, "expected WatchEvent, got %T", rcvdEv) {
194183
assert.Check(t, is.Equal(tname, typ.Table))
195184
assert.Check(t, is.Equal(nid, typ.NetworkID))
196185
assert.Check(t, is.Equal(key, typ.Key))
186+
if prev == "" {
187+
assert.Check(t, is.Nil(typ.Prev))
188+
} else {
189+
assert.Check(t, is.Equal(prev, string(typ.Prev)))
190+
}
191+
if value == "" {
192+
assert.Check(t, is.Nil(typ.Value))
193+
} else {
194+
assert.Check(t, is.Equal(value, string(typ.Value)))
195+
}
197196
}
198197
case <-time.After(time.Second):
199198
t.Fail()
@@ -417,17 +416,17 @@ func TestNetworkDBWatch(t *testing.T) {
417416
err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
418417
assert.NilError(t, err)
419418

420-
testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
419+
testWatch(t, ch.C, "test_table", "network1", "test_key", "", "test_value")
421420

422421
err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
423422
assert.NilError(t, err)
424423

425-
testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
424+
testWatch(t, ch.C, "test_table", "network1", "test_key", "test_value", "test_updated_value")
426425

427426
err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
428427
assert.NilError(t, err)
429428

430-
testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
429+
testWatch(t, ch.C, "test_table", "network1", "test_key", "test_updated_value", "")
431430

432431
cancel()
433432
closeNetworkDBInstances(t, dbs)

0 commit comments

Comments
 (0)