Skip to content

Commit 08bde5e

Browse files
committed
libnetwork/networkdb: fix broadcast queue deadlocks
NetworkDB's JoinNetwork function enqueues a message onto a TransmitLimitedQueue while holding the NetworkDB mutex locked for writing. The TransmitLimitedQueue has its own synchronization; it locks its mutex when enqueueing a message. Locking order: 1. (NetworkDB).RWMutex.Lock() 2. (TransmitLimitedQueue).mu.Lock() NetworkDB's gossip periodic task calls GetBroadcasts on the same TransmitLimitedQueue to retrieve the enqueued messages. GetBroadcasts invokes the queue's NumNodes callback while the mutex is locked. The NumNodes callback function that NetworkDB sets locks the NetworkDB mutex for reading to take the length of the nodes map. Locking order: 1. (TransmitLimitedQueue).mu.Lock() 2. (NetworkDB).RWMutex.RLock() If one goroutine calls GetBroadcasts on the queue concurrently with another goroutine calling JoinNetwork on the NetworkDB, the goroutines may deadlock due to the lock inversion. Fix the deadlock by caching the number of nodes in an atomic variable so that the NumNodes callback can load the value without blocking or violating Go's memory model. And fix a similar deadlock situation with the table-event broadcast queues. Signed-off-by: Cory Snider <[email protected]>
1 parent 8326491 commit 08bde5e

4 files changed

Lines changed: 28 additions & 23 deletions

File tree

libnetwork/networkdb/cluster.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -135,22 +135,12 @@ func (nDB *NetworkDB) clusterInit() error {
135135
}
136136

137137
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{
138-
NumNodes: func() int {
139-
nDB.RLock()
140-
num := len(nDB.nodes)
141-
nDB.RUnlock()
142-
return num
143-
},
138+
NumNodes: nDB.estNumNodes,
144139
RetransmitMult: config.RetransmitMult,
145140
}
146141

147142
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{
148-
NumNodes: func() int {
149-
nDB.RLock()
150-
num := len(nDB.nodes)
151-
nDB.RUnlock()
152-
return num
153-
},
143+
NumNodes: nDB.estNumNodes,
154144
RetransmitMult: config.RetransmitMult,
155145
}
156146

libnetwork/networkdb/event_delegate.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
4040
e.nDB.purgeReincarnation(mn)
4141

4242
e.nDB.nodes[mn.Name] = &node{Node: *mn}
43+
e.nDB.estNodes.Store(int32(len(e.nDB.nodes)))
4344
log.G(context.TODO()).Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
4445
}
4546

libnetwork/networkdb/networkdb.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ type NetworkDB struct {
5656
// network.
5757
nodes map[string]*node
5858

59+
// An approximation of len(nodes) that can be accessed without
60+
// synchronization.
61+
estNodes atomic.Int32
62+
5963
// List of all peer nodes which have failed
6064
failedNodes map[string]*node
6165

@@ -171,6 +175,10 @@ type thisNodeNetwork struct {
171175
// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
172176
// interval
173177
entriesNumber atomic.Int64
178+
179+
// An approximation of len(nDB.networkNodes[nid]) that can be accessed
180+
// without synchronization.
181+
networkNodes atomic.Int32
174182
}
175183

176184
// Config represents the configuration of the networkdb instance and
@@ -637,25 +645,18 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
637645
n.network = network{ltime: ltime}
638646
n.inSync = false
639647
} else {
640-
numNodes := func() int {
641-
// TODO fcrisciani this can be optimized maybe avoiding the lock?
642-
// this call is done each GetBroadcasts call to evaluate the number of
643-
// replicas for the message
644-
nDB.RLock()
645-
defer nDB.RUnlock()
646-
return len(nDB.networkNodes[nid])
647-
}
648648
n = &thisNodeNetwork{
649649
network: network{ltime: ltime},
650650
tableBroadcasts: &memberlist.TransmitLimitedQueue{
651-
NumNodes: numNodes,
652651
RetransmitMult: 4,
653652
},
654653
tableRebroadcasts: &memberlist.TransmitLimitedQueue{
655-
NumNodes: numNodes,
656654
RetransmitMult: 4,
657655
},
658656
}
657+
numNodes := func() int { return int(n.networkNodes.Load()) }
658+
n.tableBroadcasts.NumNodes = numNodes
659+
n.tableRebroadcasts.NumNodes = numNodes
659660
}
660661
nDB.addNetworkNode(nid, nDB.config.NodeID)
661662

@@ -664,8 +665,9 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
664665
return fmt.Errorf("failed to send join network event for %s: %v", nid, err)
665666
}
666667

667-
nDB.thisNodeNetworks[nid] = n
668668
networkNodes := nDB.networkNodes[nid]
669+
n.networkNodes.Store(int32(len(networkNodes)))
670+
nDB.thisNodeNetworks[nid] = n
669671
nDB.Unlock()
670672

671673
log.G(context.TODO()).Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
@@ -727,6 +729,9 @@ func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
727729
}
728730

729731
nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
732+
if n, ok := nDB.thisNodeNetworks[nid]; ok {
733+
n.networkNodes.Store(int32(len(nDB.networkNodes[nid])))
734+
}
730735
}
731736

732737
// Deletes the node from the list of nodes which participate in the
@@ -745,6 +750,9 @@ func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
745750
newNodes = append(newNodes, name)
746751
}
747752
nDB.networkNodes[nid] = newNodes
753+
if n, ok := nDB.thisNodeNetworks[nid]; ok {
754+
n.networkNodes.Store(int32(len(newNodes)))
755+
}
748756
}
749757

750758
// findCommonNetworks find the networks that both this node and the

libnetwork/networkdb/nodemgmt.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ func (nDB *NetworkDB) changeNodeState(nodeName string, newState nodeState) (bool
7676
// TODO(thaJeztah): make switch exhaustive; add networkdb.nodeNotFound
7777
}
7878

79+
nDB.estNodes.Store(int32(len(nDB.nodes)))
80+
7981
log.G(context.TODO()).Infof("Node %s change state %s --> %s", nodeName, nodeStateName[currState], nodeStateName[newState])
8082

8183
if newState == nodeLeftState || newState == nodeFailedState {
@@ -121,3 +123,7 @@ func (nDB *NetworkDB) purgeReincarnation(mn *memberlist.Node) bool {
121123

122124
return false
123125
}
126+
127+
func (nDB *NetworkDB) estNumNodes() int {
128+
return int(nDB.estNodes.Load())
129+
}

0 commit comments

Comments
 (0)