Skip to content

Commit dbb0d88

Browse files
committed
libn/networkdb: use distinct type for own networks
NetworkDB uses a muli-dimensional map of struct network to keep track of network attachments for both remote nodes and the local node. Only a subset of the struct fields are used for remote nodes' network attachments. The tableBroadcasts pointer field in particular is always initialized for network values representing local attachments (read: nDB.networks[nDB.config.NodeID]) and always nil for remote attachments. Consequently, unnecessary defensive nil-pointer checks are peppered throughout the code despite the aforementioned invariant. Enshrine the invariant that tableBroadcasts is initialized iff the network attachment is for the local node in the type system. Pare down struct network to only the fields needed for remote network attachments and move the local-only fields into a new struct thisNodeNetwork. Elide the unnecessary nil-checks. Signed-off-by: Cory Snider <[email protected]>
1 parent 51f3182 commit dbb0d88

6 files changed

Lines changed: 86 additions & 97 deletions

File tree

libnetwork/networkdb/broadcast.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -142,27 +142,16 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
142142
return err
143143
}
144144

145-
var broadcastQ *memberlist.TransmitLimitedQueue
146145
nDB.RLock()
147-
thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID]
148-
if ok {
149-
// The network may have been removed
150-
network, networkOk := thisNodeNetworks[nid]
151-
if !networkOk {
152-
nDB.RUnlock()
153-
return nil
154-
}
155-
156-
broadcastQ = network.tableBroadcasts
157-
}
146+
n, ok := nDB.thisNodeNetworks[nid]
158147
nDB.RUnlock()
159148

160149
// The network may have been removed
161-
if broadcastQ == nil {
150+
if !ok {
162151
return nil
163152
}
164153

165-
broadcastQ.QueueBroadcast(&tableEventMessage{
154+
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
166155
msg: raw,
167156
id: nid,
168157
tname: tname,

libnetwork/networkdb/cluster.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,15 @@ func (nDB *NetworkDB) reapState() {
369369

370370
func (nDB *NetworkDB) reapNetworks() {
371371
nDB.Lock()
372+
for id, n := range nDB.thisNodeNetworks {
373+
if n.leaving {
374+
if n.reapTime <= 0 {
375+
delete(nDB.thisNodeNetworks, id)
376+
continue
377+
}
378+
n.reapTime -= reapPeriod
379+
}
380+
}
372381
for _, nn := range nDB.networks {
373382
for id, n := range nn {
374383
if n.leaving {
@@ -387,7 +396,7 @@ func (nDB *NetworkDB) reapTableEntries() {
387396
var nodeNetworks []string
388397
// This is best effort, if the list of network changes will be picked up in the next cycle
389398
nDB.RLock()
390-
for nid := range nDB.networks[nDB.config.NodeID] {
399+
for nid := range nDB.thisNodeNetworks {
391400
nodeNetworks = append(nodeNetworks, nid)
392401
}
393402
nDB.RUnlock()
@@ -430,8 +439,7 @@ func (nDB *NetworkDB) reapTableEntries() {
430439
func (nDB *NetworkDB) gossip() {
431440
networkNodes := make(map[string][]string)
432441
nDB.RLock()
433-
thisNodeNetworks := nDB.networks[nDB.config.NodeID]
434-
for nid := range thisNodeNetworks {
442+
for nid := range nDB.thisNodeNetworks {
435443
networkNodes[nid] = nDB.networkNodes[nid]
436444
}
437445
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
@@ -451,7 +459,7 @@ func (nDB *NetworkDB) gossip() {
451459
bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead
452460

453461
nDB.RLock()
454-
network, ok := thisNodeNetworks[nid]
462+
network, ok := nDB.thisNodeNetworks[nid]
455463
nDB.RUnlock()
456464
if !ok || network == nil {
457465
// It is normal for the network to be removed
@@ -461,21 +469,14 @@ func (nDB *NetworkDB) gossip() {
461469
continue
462470
}
463471

464-
broadcastQ := network.tableBroadcasts
465-
466-
if broadcastQ == nil {
467-
log.G(context.TODO()).Errorf("Invalid broadcastQ encountered while gossiping for network %s", nid)
468-
continue
469-
}
470-
471-
msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
472+
msgs := network.tableBroadcasts.GetBroadcasts(compoundOverhead, bytesAvail)
472473
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
473474
network.qMessagesSent.Add(int64(len(msgs)))
474475
if printStats {
475476
msent := network.qMessagesSent.Swap(0)
476477
log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
477478
nDB.config.Hostname, nDB.config.NodeID,
478-
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber.Load(), broadcastQ.NumQueued(),
479+
nid, network.leaving, network.tableBroadcasts.NumNodes(), network.entriesNumber.Load(), network.tableBroadcasts.NumQueued(),
479480
msent/int64((nDB.config.StatsPrintPeriod/time.Second)))
480481
}
481482

@@ -510,7 +511,7 @@ func (nDB *NetworkDB) gossip() {
510511
func (nDB *NetworkDB) bulkSyncTables() {
511512
var networks []string
512513
nDB.RLock()
513-
for nid, network := range nDB.networks[nDB.config.NodeID] {
514+
for nid, network := range nDB.thisNodeNetworks {
514515
if network.leaving {
515516
continue
516517
}

libnetwork/networkdb/delegate.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
135135
}
136136

137137
// This remote network join is being seen the first time.
138-
nodeNetworks[nEvent.NetworkID] = &network{
139-
ltime: nEvent.LTime,
140-
}
138+
nodeNetworks[nEvent.NetworkID] = &network{ltime: nEvent.LTime}
141139

142140
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
143141
return true
@@ -154,8 +152,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool
154152
defer nDB.Unlock()
155153

156154
// Ignore the table events for networks that are in the process of going away
157-
networks := nDB.networks[nDB.config.NodeID]
158-
network, ok := networks[tEvent.NetworkID]
155+
network, ok := nDB.thisNodeNetworks[tEvent.NetworkID]
159156
// Check if the owner of the event is still part of the network
160157
nodes := nDB.networkNodes[tEvent.NetworkID]
161158
var nodePresent bool
@@ -286,11 +283,11 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
286283
}
287284

288285
nDB.RLock()
289-
n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
286+
n, ok := nDB.thisNodeNetworks[tEvent.NetworkID]
290287
nDB.RUnlock()
291288

292-
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
293-
if !ok || n.leaving || n.tableBroadcasts == nil {
289+
// if the network is not there anymore, OR we are leaving the network
290+
if !ok || n.leaving {
294291
return
295292
}
296293

@@ -450,6 +447,14 @@ func (d *delegate) LocalState(join bool) []byte {
450447
NodeName: d.nDB.config.NodeID,
451448
}
452449

450+
for nid, n := range d.nDB.thisNodeNetworks {
451+
pp.Networks = append(pp.Networks, &NetworkEntry{
452+
LTime: n.ltime,
453+
NetworkID: nid,
454+
NodeName: d.nDB.config.NodeID,
455+
Leaving: n.leaving,
456+
})
457+
}
453458
for name, nn := range d.nDB.networks {
454459
for nid, n := range nn {
455460
pp.Networks = append(pp.Networks, &NetworkEntry{

libnetwork/networkdb/networkdb.go

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,14 @@ type NetworkDB struct {
6262
// List of all peer nodes which have left
6363
leftNodes map[string]*node
6464

65-
// A multi-dimensional map of network/node attachments. The
66-
// first key is a node name and the second key is a network ID
67-
// for the network that node is participating in.
65+
// A multi-dimensional map of network/node attachments for peer nodes.
66+
// The first key is a node name and the second key is a network ID for
67+
// the network that node is participating in.
6868
networks map[string]map[string]*network
6969

70+
// A map of this node's network attachments.
71+
thisNodeNetworks map[string]*thisNodeNetwork
72+
7073
// A map of nodes which are participating in a given
7174
// network. The key is a network ID.
7275
networkNodes map[string][]string
@@ -131,15 +134,20 @@ type network struct {
131134
// Lamport time for the latest state of the entry.
132135
ltime serf.LamportTime
133136

134-
// Gets set to true after the first bulk sync happens
135-
inSync bool
136-
137137
// Node leave is in progress.
138138
leaving bool
139139

140140
// Number of seconds still left before a deleted network entry gets
141141
// removed from networkDB
142142
reapTime time.Duration
143+
}
144+
145+
// thisNodeNetwork describes a network attachment on the local node.
146+
type thisNodeNetwork struct {
147+
network
148+
149+
// Gets set to true after the first bulk sync happens
150+
inSync bool
143151

144152
// The broadcast queue for table event gossip. This is only
145153
// initialized for this node's network attachment entries.
@@ -270,13 +278,14 @@ func new(c *Config) *NetworkDB {
270278
byTable: iradix.New[*entry](),
271279
byNetwork: iradix.New[*entry](),
272280
},
273-
networks: make(map[string]map[string]*network),
274-
nodes: make(map[string]*node),
275-
failedNodes: make(map[string]*node),
276-
leftNodes: make(map[string]*node),
277-
networkNodes: make(map[string][]string),
278-
bulkSyncAckTbl: make(map[string]chan struct{}),
279-
broadcaster: events.NewBroadcaster(),
281+
networks: make(map[string]map[string]*network),
282+
thisNodeNetworks: make(map[string]*thisNodeNetwork),
283+
nodes: make(map[string]*node),
284+
failedNodes: make(map[string]*node),
285+
leftNodes: make(map[string]*node),
286+
networkNodes: make(map[string][]string),
287+
bulkSyncAckTbl: make(map[string]chan struct{}),
288+
broadcaster: events.NewBroadcaster(),
280289
}
281290
}
282291

@@ -608,24 +617,17 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
608617
ltime := nDB.networkClock.Increment()
609618

610619
nDB.Lock()
611-
nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
612-
if !ok {
613-
nodeNetworks = make(map[string]*network)
614-
nDB.networks[nDB.config.NodeID] = nodeNetworks
615-
}
616-
n, ok := nodeNetworks[nid]
620+
n, ok := nDB.thisNodeNetworks[nid]
617621
if ok {
618622
if !n.leaving {
619623
nDB.Unlock()
620624
return fmt.Errorf("networkdb: network %s is already joined", nid)
621625
}
622-
n.ltime = ltime
626+
n.network = network{ltime: ltime}
623627
n.inSync = false
624-
n.leaving = false
625-
n.reapTime = 0
626628
} else {
627-
n = &network{
628-
ltime: ltime,
629+
n = &thisNodeNetwork{
630+
network: network{ltime: ltime},
629631
tableBroadcasts: &memberlist.TransmitLimitedQueue{
630632
NumNodes: func() int {
631633
// TODO fcrisciani this can be optimized maybe avoiding the lock?
@@ -646,7 +648,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
646648
return fmt.Errorf("failed to send join network event for %s: %v", nid, err)
647649
}
648650

649-
nodeNetworks[nid] = n
651+
nDB.thisNodeNetworks[nid] = n
650652
networkNodes := nDB.networkNodes[nid]
651653
nDB.Unlock()
652654

@@ -685,12 +687,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
685687
// Update all the local entries marking them for deletion and delete all the remote entries
686688
nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID)
687689

688-
nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
689-
if !ok {
690-
return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
691-
}
692-
693-
n, ok := nodeNetworks[nid]
690+
n, ok := nDB.thisNodeNetworks[nid]
694691
if !ok {
695692
return fmt.Errorf("could not find network %s while trying to leave", nid)
696693
}
@@ -741,7 +738,7 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
741738
defer nDB.RUnlock()
742739

743740
var networks []string
744-
for nid := range nDB.networks[nDB.config.NodeID] {
741+
for nid := range nDB.thisNodeNetworks {
745742
if n, ok := nDB.networks[nodeName][nid]; ok {
746743
if !n.leaving {
747744
networks = append(networks, nid)
@@ -757,7 +754,7 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
757754
defer nDB.Unlock()
758755

759756
ltime := nDB.networkClock.Increment()
760-
for _, n := range nDB.networks[nDB.config.NodeID] {
757+
for _, n := range nDB.thisNodeNetworks {
761758
n.ltime = ltime
762759
}
763760
}
@@ -769,7 +766,7 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, v *entry) (okT
769766
nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Insert([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)), v)
770767
if !okNetwork {
771768
// Add only if it is an insert not an update
772-
n, ok := nDB.networks[nDB.config.NodeID][nid]
769+
n, ok := nDB.thisNodeNetworks[nid]
773770
if ok {
774771
n.entriesNumber.Add(1)
775772
}
@@ -784,7 +781,7 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwo
784781
nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Delete([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)))
785782
if okNetwork {
786783
// Remove only if the delete is successful
787-
n, ok := nDB.networks[nDB.config.NodeID][nid]
784+
n, ok := nDB.thisNodeNetworks[nid]
788785
if ok {
789786
n.entriesNumber.Add(-1)
790787
}

0 commit comments

Comments
 (0)