Skip to content

Commit c7e17ae

Browse files
committed
libn/networkdb: report prev value in update events
When handling updates to existing entries, it is often necessary to know what the previous value was. NetworkDB knows the previous and new values when it broadcasts an update event for an entry. Include both values in the update event so the watchers do not have to do their own parallel bookkeeping. Unify the event types under WatchEvent as representing the operation kind in the type system has been inconvenient, not useful. The operation is now implied by the nilness of the Value and Prev event fields. Signed-off-by: Cory Snider <[email protected]> (cherry picked from commit 69c3c56) Signed-off-by: Cory Snider <[email protected]>
1 parent d60c71a commit c7e17ae

8 files changed

Lines changed: 163 additions & 177 deletions

File tree

libnetwork/agent.go

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -816,30 +816,23 @@ func (n *Network) handleDriverTableEvent(ev events.Event) {
816816

817817
var (
818818
etype driverapi.EventType
819-
tname string
820-
key string
821819
value []byte
822820
)
823821

824-
switch event := ev.(type) {
825-
case networkdb.CreateEvent:
826-
tname = event.Table
827-
key = event.Key
822+
event := ev.(networkdb.WatchEvent)
823+
switch {
824+
case event.IsCreate():
828825
value = event.Value
829826
etype = driverapi.Create
830-
case networkdb.DeleteEvent:
831-
tname = event.Table
832-
key = event.Key
833-
value = event.Value
827+
case event.IsDelete():
828+
value = event.Prev
834829
etype = driverapi.Delete
835-
case networkdb.UpdateEvent:
836-
tname = event.Table
837-
key = event.Key
830+
case event.IsUpdate():
838831
value = event.Value
839832
etype = driverapi.Update
840833
}
841834

842-
d.EventNotify(etype, n.ID(), tname, key, value)
835+
d.EventNotify(etype, n.ID(), event.Table, event.Key, value)
843836
}
844837

845838
func (c *Controller) handleNodeTableEvent(ev events.Event) {
@@ -848,13 +841,14 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
848841
isAdd bool
849842
nodeAddr networkdb.NodeAddr
850843
)
851-
switch event := ev.(type) {
852-
case networkdb.CreateEvent:
844+
event := ev.(networkdb.WatchEvent)
845+
switch {
846+
case event.IsCreate():
853847
value = event.Value
854848
isAdd = true
855-
case networkdb.DeleteEvent:
856-
value = event.Value
857-
case networkdb.UpdateEvent:
849+
case event.IsDelete():
850+
value = event.Prev
851+
case event.IsUpdate():
858852
log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event)
859853
}
860854

@@ -867,37 +861,27 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
867861
}
868862

869863
func (c *Controller) handleEpTableEvent(ev events.Event) {
870-
var (
871-
nid string
872-
eid string
873-
value []byte
874-
epRec EndpointRecord
875-
)
876-
877-
switch event := ev.(type) {
878-
case networkdb.CreateEvent:
879-
nid = event.NetworkID
880-
eid = event.Key
881-
value = event.Value
882-
case networkdb.DeleteEvent:
883-
nid = event.NetworkID
884-
eid = event.Key
885-
value = event.Value
886-
case networkdb.UpdateEvent:
887-
nid = event.NetworkID
888-
eid = event.Key
864+
var value []byte
865+
event := ev.(networkdb.WatchEvent)
866+
switch {
867+
case event.IsCreate(), event.IsUpdate():
889868
value = event.Value
869+
case event.IsDelete():
870+
value = event.Prev
890871
default:
891872
log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
892873
return
893874
}
894875

876+
var epRec EndpointRecord
895877
err := proto.Unmarshal(value, &epRec)
896878
if err != nil {
897879
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
898880
return
899881
}
900882

883+
nid := event.NetworkID
884+
eid := event.Key
901885
containerName := epRec.Name
902886
svcName := epRec.ServiceName
903887
svcID := epRec.ServiceID
@@ -908,9 +892,7 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
908892
taskAliases := epRec.TaskAliases
909893

910894
logger := log.G(context.TODO()).WithFields(log.Fields{
911-
"nid": nid,
912-
"eid": eid,
913-
"T": fmt.Sprintf("%T", ev),
895+
"evt": event,
914896
"R": epRec,
915897
})
916898

@@ -921,8 +903,8 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
921903

922904
logger.Debug("handleEpTableEvent")
923905

924-
switch ev.(type) {
925-
case networkdb.CreateEvent, networkdb.UpdateEvent:
906+
switch {
907+
case event.IsCreate(), event.IsUpdate():
926908
if svcID != "" {
927909
// This is a remote task part of a service
928910
if epRec.ServiceDisabled {
@@ -943,7 +925,7 @@ func (c *Controller) handleEpTableEvent(ev events.Event) {
943925
}
944926
}
945927

946-
case networkdb.DeleteEvent:
928+
case event.IsDelete():
947929
if svcID != "" {
948930
// This is a remote task part of a service
949931
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {

libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,6 @@ func watchTableEntries(w http.ResponseWriter, r *http.Request) {
8080
}
8181

8282
func handleTableEvents(tableName string, ch *events.Channel) {
83-
var (
84-
// nid string
85-
eid string
86-
value []byte
87-
isAdd bool
88-
)
89-
9083
log.G(context.TODO()).Infof("Started watching table:%s", tableName)
9184
for {
9285
select {
@@ -95,27 +88,17 @@ func handleTableEvents(tableName string, ch *events.Channel) {
9588
return
9689

9790
case evt := <-ch.C:
98-
log.G(context.TODO()).Infof("Recevied new event on:%s", tableName)
99-
switch event := evt.(type) {
100-
case networkdb.CreateEvent:
101-
// nid = event.NetworkID
102-
eid = event.Key
103-
value = event.Value
104-
isAdd = true
105-
case networkdb.DeleteEvent:
106-
// nid = event.NetworkID
107-
eid = event.Key
108-
value = event.Value
109-
isAdd = false
110-
default:
91+
log.G(context.TODO()).Infof("Received new event on:%s", tableName)
92+
event, ok := evt.(networkdb.WatchEvent)
93+
if !ok {
11194
log.G(context.TODO()).Fatalf("Unexpected table event = %#v", event)
11295
}
113-
if isAdd {
114-
// log.G(ctx).Infof("Add %s %s", tableName, eid)
115-
clientWatchTable[tableName].entries[eid] = string(value)
96+
if event.Value != nil {
97+
// log.G(ctx).Infof("Add %s %s", tableName, event.Key)
98+
clientWatchTable[tableName].entries[event.Key] = string(event.Value)
11699
} else {
117-
// log.G(ctx).Infof("Del %s %s", tableName, eid)
118-
delete(clientWatchTable[tableName].entries, eid)
100+
// log.G(ctx).Infof("Del %s %s", tableName, event.Key)
101+
delete(clientWatchTable[tableName].entries, event.Key)
119102
}
120103
}
121104
}

libnetwork/networkdb/delegate.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,31 +214,33 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
214214
return isBulkSync && network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
215215
}
216216

217-
var op opType
218-
value := tEvent.Value
217+
event := WatchEvent{
218+
Table: tEvent.TableName,
219+
NetworkID: tEvent.NetworkID,
220+
Key: tEvent.Key,
221+
}
219222
switch tEvent.Type {
220223
case TableEventTypeCreate, TableEventTypeUpdate:
221224
// Gossip messages could arrive out-of-order so it is possible
222225
// for an entry's UPDATE event to be received before its CREATE
223226
// event. The local watchers should not need to care about such
224227
// nuances. Broadcast events to watchers based only on what
225228
// changed in the local NetworkDB state.
226-
op = opCreate
229+
event.Value = tEvent.Value
227230
if entryPresent && !prev.deleting {
228-
op = opUpdate
231+
event.Prev = prev.value
229232
}
230233
case TableEventTypeDelete:
231234
if !entryPresent || prev.deleting {
232235
goto SkipBroadcast
233236
}
234-
op = opDelete
235237
// Broadcast the value most recently observed by watchers,
236238
// which may be different from the value in the DELETE event
237239
// (e.g. if the DELETE event was received out-of-order).
238-
value = prev.value
240+
event.Prev = prev.value
239241
}
240242

241-
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, value))
243+
nDB.broadcaster.Write(event)
242244
SkipBroadcast:
243245
return network.inSync
244246
}

libnetwork/networkdb/event_delegate.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,32 @@ type eventDelegate struct {
1313
nDB *NetworkDB
1414
}
1515

16-
func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
16+
type nodeEventOp bool
17+
18+
const (
19+
notifyNodeJoined nodeEventOp = true
20+
notifyNodeLeft nodeEventOp = false
21+
)
22+
23+
func (e *eventDelegate) broadcastNodeEvent(addr net.IP, kind nodeEventOp) {
1724
value, err := json.Marshal(&NodeAddr{addr})
18-
if err == nil {
19-
e.nDB.broadcaster.Write(makeEvent(op, NodeTable, "", "", value))
20-
} else {
25+
if err != nil {
2126
log.G(context.TODO()).Errorf("Error marshalling node broadcast event %s", addr.String())
27+
return
28+
}
29+
event := WatchEvent{Table: NodeTable}
30+
switch kind {
31+
case notifyNodeJoined:
32+
event.Value = value
33+
case notifyNodeLeft:
34+
event.Prev = value
2235
}
36+
e.nDB.broadcaster.Write(event)
2337
}
2438

2539
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
2640
log.G(context.TODO()).Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
27-
e.broadcastNodeEvent(mn.Addr, opCreate)
41+
e.broadcastNodeEvent(mn.Addr, notifyNodeJoined)
2842
e.nDB.Lock()
2943
defer e.nDB.Unlock()
3044

@@ -46,7 +60,7 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
4660

4761
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
4862
log.G(context.TODO()).Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
49-
e.broadcastNodeEvent(mn.Addr, opDelete)
63+
e.broadcastNodeEvent(mn.Addr, notifyNodeLeft)
5064

5165
e.nDB.Lock()
5266
defer e.nDB.Unlock()

libnetwork/networkdb/networkdb.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,12 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
544544

545545
// Notify to the upper layer only entries not already marked for deletion
546546
if !oldEntry.deleting {
547-
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, oldEntry.value))
547+
nDB.broadcaster.Write(WatchEvent{
548+
Table: tName,
549+
NetworkID: nwID,
550+
Key: key,
551+
Prev: oldEntry.value,
552+
})
548553
}
549554
return false
550555
})
@@ -565,7 +570,12 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
565570
nDB.deleteEntry(nwID, tName, key)
566571

567572
if !oldEntry.deleting {
568-
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, oldEntry.value))
573+
nDB.broadcaster.Write(WatchEvent{
574+
Table: tName,
575+
NetworkID: nwID,
576+
Key: key,
577+
Prev: oldEntry.value,
578+
})
569579
}
570580
return false
571581
})
@@ -694,7 +704,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
694704
nDB.deleteEntry(nwID, tName, key)
695705
}
696706
if !oldEntry.deleting {
697-
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, oldEntry.value))
707+
nDB.broadcaster.Write(WatchEvent{
708+
Table: tName,
709+
NetworkID: nwID,
710+
Key: key,
711+
Prev: oldEntry.value,
712+
})
698713
}
699714
return false
700715
})

libnetwork/networkdb/networkdb_test.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -170,26 +170,25 @@ func (nDB *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value
170170
nDB.config.Hostname, nDB.config.NodeID, tname, key, nid, present, value, !present, string(v))
171171
}
172172

173-
func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
173+
func testWatch(t *testing.T, ch chan events.Event, tname, nid, key, prev, value string) {
174174
t.Helper()
175175
select {
176176
case rcvdEv := <-ch:
177-
assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)))
178-
switch typ := rcvdEv.(type) {
179-
case CreateEvent:
180-
assert.Check(t, is.Equal(tname, typ.Table))
181-
assert.Check(t, is.Equal(nid, typ.NetworkID))
182-
assert.Check(t, is.Equal(key, typ.Key))
183-
assert.Check(t, is.Equal(value, string(typ.Value)))
184-
case UpdateEvent:
185-
assert.Check(t, is.Equal(tname, typ.Table))
186-
assert.Check(t, is.Equal(nid, typ.NetworkID))
187-
assert.Check(t, is.Equal(key, typ.Key))
188-
assert.Check(t, is.Equal(value, string(typ.Value)))
189-
case DeleteEvent:
177+
typ, ok := rcvdEv.(WatchEvent)
178+
if assert.Check(t, ok, "expected WatchEvent, got %T", rcvdEv) {
190179
assert.Check(t, is.Equal(tname, typ.Table))
191180
assert.Check(t, is.Equal(nid, typ.NetworkID))
192181
assert.Check(t, is.Equal(key, typ.Key))
182+
if prev == "" {
183+
assert.Check(t, is.Nil(typ.Prev))
184+
} else {
185+
assert.Check(t, is.Equal(prev, string(typ.Prev)))
186+
}
187+
if value == "" {
188+
assert.Check(t, is.Nil(typ.Value))
189+
} else {
190+
assert.Check(t, is.Equal(value, string(typ.Value)))
191+
}
193192
}
194193
case <-time.After(time.Second):
195194
t.Fail()
@@ -413,17 +412,17 @@ func TestNetworkDBWatch(t *testing.T) {
413412
err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
414413
assert.NilError(t, err)
415414

416-
testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
415+
testWatch(t, ch.C, "test_table", "network1", "test_key", "", "test_value")
417416

418417
err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
419418
assert.NilError(t, err)
420419

421-
testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
420+
testWatch(t, ch.C, "test_table", "network1", "test_key", "test_value", "test_updated_value")
422421

423422
err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
424423
assert.NilError(t, err)
425424

426-
testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
425+
testWatch(t, ch.C, "test_table", "network1", "test_key", "test_updated_value", "")
427426

428427
cancel()
429428
closeNetworkDBInstances(t, dbs)

0 commit comments

Comments
 (0)