Skip to content

Commit 21d9109

Browse files
committed
libn/networkdb: stop forging tombstone entries
When a node leaves a network, all entries owned by that node are implicitly deleted. The other NetworkDB nodes handle the leave by setting the deleted flag on the entries owned by the left node in their local stores. This behaviour is problematic as it results in two conflicting entries with the same Lamport timestamp propagating through the cluster. Consider two NetworkDB nodes, A, and B, which are both joined to some network. Node A in quick succession leaves the network, immediately rejoins it, then creates an entry. If Node B processes the entry-creation event first, it will add the entry to its local store then set the deleted flag upon processing the network-leave. No matter how many times B bulk-syncs with A, B will ignore the live entry for having the same timestamp as its local tombstone entry. Once this situation occurs, the only way to recover is for the entry to get updated by A with a new timestamp. There is no need for a node to store forged tombstones for another node's entries. All nodes will purge the entries naturally when they process the network-leave or node-leave event. Simply delete the non-owned entries from the local store so there is no inconsistent state to interfere with convergence when nodes rejoin a network. Have nodes update their local store with tombstones for entries when leaving a network so that after a rapid leave-then-rejoin the entry deletions propagate to nodes which may have missed the leave event. Signed-off-by: Cory Snider <[email protected]>
1 parent 8326491 commit 21d9109

3 files changed

Lines changed: 107 additions & 68 deletions

File tree

libnetwork/networkdb/delegate.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
109109
n.reapTime = nDB.config.reapNetworkInterval
110110

111111
// The remote node is leaving the network, but not the gossip cluster.
112-
// Mark all its entries in deleted state, this will guarantee that
113-
// if some node bulk sync with us, the deleted state of
114-
// these entries will be propagated.
112+
// Delete all the entries for this network owned by the node.
115113
nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
116114
}
117115

libnetwork/networkdb/networkdb.go

Lines changed: 49 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -512,92 +512,44 @@ func (nDB *NetworkDB) deleteNodeFromNetworks(deletedNode string) {
512512
delete(nDB.networks, deletedNode)
513513
}
514514

515-
// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
516-
// 1) when a notification is coming of a node leaving the network
517-
// - Walk all the network entries and mark the leaving node's entries for deletion
518-
// These will be garbage collected when the reap timer will expire
519-
//
520-
// 2) when the local node is leaving the network
521-
// - Walk all the network entries:
522-
// A) if the entry is owned by the local node
523-
// then we will mark it for deletion. This will ensure that if a node did not
524-
// yet received the notification that the local node is leaving, will be aware
525-
// of the entries to be deleted.
526-
// B) if the entry is owned by a remote node, then we can safely delete it. This
527-
// ensures that if we join back this network as we receive the CREATE event for
528-
// entries owned by remote nodes, we will accept them and we notify the application
515+
// deleteNodeNetworkEntries deletes all table entries for a network owned by
516+
// node from the local store.
529517
func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
530-
// Indicates if the delete is triggered for the local node
531-
isNodeLocal := node == nDB.config.NodeID
532-
533518
nDB.indexes[byNetwork].Root().WalkPrefix([]byte("/"+nid),
534-
func(path []byte, v *entry) bool {
535-
oldEntry := v
536-
537-
// If the entry is owned by a remote node and this node is not leaving the network
538-
if oldEntry.node != node && !isNodeLocal {
539-
// Don't do anything because the event is triggered for a node that does not own this entry
540-
return false
541-
}
542-
543-
// If this entry is already marked for deletion and this node is not leaving the network
544-
if oldEntry.deleting && !isNodeLocal {
545-
// Don't do anything this entry will be already garbage collected using the old reapTime
519+
func(path []byte, oldEntry *entry) bool {
520+
// Do nothing if the entry is owned by a remote node that is not leaving the network
521+
// because the event is triggered for a node that does not own this entry.
522+
if oldEntry.node != node {
546523
return false
547524
}
548-
549-
newEntry := &entry{
550-
ltime: oldEntry.ltime,
551-
node: oldEntry.node,
552-
value: oldEntry.value,
553-
deleting: true,
554-
reapTime: nDB.config.reapEntryInterval,
555-
}
556-
557-
// we arrived at this point in 2 cases:
558-
// 1) this entry is owned by the node that is leaving the network
559-
// 2) the local node is leaving the network
560525
params := strings.Split(string(path[1:]), "/")
561526
nwID, tName, key := params[0], params[1], params[2]
562-
if oldEntry.node == node {
563-
if isNodeLocal {
564-
// TODO fcrisciani: this can be removed if there is no way to leave the network
565-
// without doing a delete of all the objects
566-
newEntry.ltime++
567-
}
568-
569-
if !oldEntry.deleting {
570-
nDB.createOrUpdateEntry(nwID, tName, key, newEntry)
571-
}
572-
} else {
573-
// the local node is leaving the network, all the entries of remote nodes can be safely removed
574-
nDB.deleteEntry(nwID, tName, key)
575-
}
527+
528+
nDB.deleteEntry(nwID, tName, key)
576529

577530
// Notify to the upper layer only entries not already marked for deletion
578531
if !oldEntry.deleting {
579-
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, newEntry.value))
532+
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, oldEntry.value))
580533
}
581534
return false
582535
})
583536
}
584537

538+
// deleteNodeTableEntries deletes all table entries owned by node from the local
539+
// store, across all networks.
585540
func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
586-
nDB.indexes[byTable].Root().Walk(func(path []byte, v *entry) bool {
587-
oldEntry := v
541+
nDB.indexes[byTable].Root().Walk(func(path []byte, oldEntry *entry) bool {
588542
if oldEntry.node != node {
589543
return false
590544
}
591545

592546
params := strings.Split(string(path[1:]), "/")
593-
tname := params[0]
594-
nid := params[1]
595-
key := params[2]
547+
tName, nwID, key := params[0], params[1], params[2]
596548

597-
nDB.deleteEntry(nid, tname, key)
549+
nDB.deleteEntry(nwID, tName, key)
598550

599551
if !oldEntry.deleting {
600-
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
552+
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, oldEntry.value))
601553
}
602554
return false
603555
})
@@ -700,8 +652,40 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
700652
// Remove myself from the list of the nodes participating to the network
701653
nDB.deleteNetworkNode(nid, nDB.config.NodeID)
702654

703-
// Update all the local entries marking them for deletion and delete all the remote entries
704-
nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID)
655+
// Mark all the local entries for deletion
656+
// so that if we rejoin the network
657+
// before another node has received the network-leave notification,
658+
// the old entries owned by us will still be purged as expected.
659+
// Delete all the remote entries from our local store
660+
// without leaving any tombstone.
661+
// This ensures that we will accept the CREATE events
662+
// for entries owned by remote nodes
663+
// if we later rejoin the network.
664+
nDB.indexes[byNetwork].Root().WalkPrefix([]byte("/"+nid), func(path []byte, oldEntry *entry) bool {
665+
owned := oldEntry.node == nDB.config.NodeID
666+
if owned && oldEntry.deleting {
667+
return false
668+
}
669+
670+
params := strings.Split(string(path[1:]), "/")
671+
nwID, tName, key := params[0], params[1], params[2]
672+
if owned {
673+
newEntry := &entry{
674+
ltime: nDB.tableClock.Increment(),
675+
node: oldEntry.node,
676+
value: oldEntry.value,
677+
deleting: true,
678+
reapTime: nDB.config.reapEntryInterval,
679+
}
680+
nDB.createOrUpdateEntry(nwID, tName, key, newEntry)
681+
} else {
682+
nDB.deleteEntry(nwID, tName, key)
683+
}
684+
if !oldEntry.deleting {
685+
nDB.broadcaster.Write(makeEvent(opDelete, tName, nwID, key, oldEntry.value))
686+
}
687+
return false
688+
})
705689

706690
n, ok := nDB.thisNodeNetworks[nid]
707691
if !ok {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package networkdb
22

33
import (
4+
"fmt"
45
"net"
56
"testing"
67
"time"
@@ -223,6 +224,62 @@ L:
223224
}))
224225
}
225226

227+
func TestLeaveRejoinOutOfOrder(t *testing.T) {
228+
// Regression test for https://github.com/moby/moby/issues/47728
229+
230+
nDB := newNetworkDB(DefaultConfig())
231+
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
232+
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
233+
assert.Assert(t, nDB.JoinNetwork("network1"))
234+
235+
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
236+
Name: "node1",
237+
Addr: net.IPv4(1, 2, 3, 4),
238+
})
239+
240+
d := &delegate{nDB}
241+
242+
msgs := messageBuffer{t: t}
243+
appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1")
244+
245+
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
246+
Type: NetworkEventTypeJoin,
247+
LTime: 1,
248+
NodeName: "node1",
249+
NetworkID: "network1",
250+
})
251+
// Simulate node1 leaving, rejoining, and creating an entry,
252+
// but the table events are broadcast before the network events.
253+
appendTableEvent(1, TableEventTypeCreate, "key1", []byte("a"))
254+
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
255+
Type: NetworkEventTypeLeave,
256+
LTime: 2,
257+
NodeName: "node1",
258+
NetworkID: "network1",
259+
})
260+
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
261+
Type: NetworkEventTypeJoin,
262+
LTime: 3,
263+
NodeName: "node1",
264+
NetworkID: "network1",
265+
})
266+
// Simulate a bulk sync or receiving a rebroadcasted copy of the table
267+
// event from another node.
268+
appendTableEvent(1, TableEventTypeCreate, "key1", []byte("a"))
269+
270+
d.NotifyMsg(msgs.Compound())
271+
272+
got := make(map[string]string)
273+
nDB.WalkTable("table1", func(nw, key string, value []byte, deleted bool) bool {
274+
got[nw+"/"+key] = fmt.Sprintf("%s (deleted=%t)", value, deleted)
275+
return false
276+
})
277+
want := map[string]string{
278+
"network1/key1": "a (deleted=false)",
279+
}
280+
assert.Check(t, is.DeepEqual(got, want))
281+
}
282+
226283
func drainChannel(ch <-chan events.Event) []events.Event {
227284
var events []events.Event
228285
for {

0 commit comments

Comments
 (0)