Skip to content

Commit 51f3182

Browse files
committed
libnetwork/networkdb: don't clear queue on rejoin
When joining a network that was previously joined but not yet reaped, NetworkDB replaces the network struct value with a zeroed-out one with the entries count copied over. This is also the case when joining a network that is currently joined! Consequently, joining a network has the side effect of clearing the broadcast queue. If the queue is cleared while messages are still pending broadcast, convergence may be delayed until the next bulk sync cycle. Make it an error to join a network twice without leaving. Retain the existing broadcast queue when rejoining a network that has not yet been reaped. Signed-off-by: Cory Snider <[email protected]>
1 parent 30b27ab commit 51f3182

1 file changed

Lines changed: 29 additions & 19 deletions

File tree

libnetwork/networkdb/networkdb.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -614,32 +614,42 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
614614
nDB.networks[nDB.config.NodeID] = nodeNetworks
615615
}
616616
n, ok := nodeNetworks[nid]
617-
var entries int64
618617
if ok {
619-
entries = n.entriesNumber.Load()
620-
}
621-
nodeNetworks[nid] = &network{ltime: ltime}
622-
nodeNetworks[nid].entriesNumber.Store(entries)
623-
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
624-
NumNodes: func() int {
625-
// TODO fcrisciani this can be optimized maybe avoiding the lock?
626-
// this call is done each GetBroadcasts call to evaluate the number of
627-
// replicas for the message
628-
nDB.RLock()
629-
defer nDB.RUnlock()
630-
return len(nDB.networkNodes[nid])
631-
},
632-
RetransmitMult: 4,
618+
if !n.leaving {
619+
nDB.Unlock()
620+
return fmt.Errorf("networkdb: network %s is already joined", nid)
621+
}
622+
n.ltime = ltime
623+
n.inSync = false
624+
n.leaving = false
625+
n.reapTime = 0
626+
} else {
627+
n = &network{
628+
ltime: ltime,
629+
tableBroadcasts: &memberlist.TransmitLimitedQueue{
630+
NumNodes: func() int {
631+
// TODO fcrisciani this can be optimized maybe avoiding the lock?
632+
// this call is done each GetBroadcasts call to evaluate the number of
633+
// replicas for the message
634+
nDB.RLock()
635+
defer nDB.RUnlock()
636+
return len(nDB.networkNodes[nid])
637+
},
638+
RetransmitMult: 4,
639+
},
640+
}
633641
}
634642
nDB.addNetworkNode(nid, nDB.config.NodeID)
635-
networkNodes := nDB.networkNodes[nid]
636-
n = nodeNetworks[nid]
637-
nDB.Unlock()
638643

639644
if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
640-
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
645+
nDB.Unlock()
646+
return fmt.Errorf("failed to send join network event for %s: %v", nid, err)
641647
}
642648

649+
nodeNetworks[nid] = n
650+
networkNodes := nDB.networkNodes[nid]
651+
nDB.Unlock()
652+
643653
log.G(context.TODO()).Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
644654
if _, err := nDB.bulkSync(networkNodes, true); err != nil {
645655
log.G(context.TODO()).Errorf("Error bulk syncing while joining network %s: %v", nid, err)

0 commit comments

Comments
 (0)