Skip to content

Commit 6ec6e09

Browse files
committed
libnetwork/networkdb: prioritize local broadcasts
A network node is responsible for both broadcasting table events for entries it owns and for rebroadcasting table events from other nodes it has received. Table events to be broadcast are added to a single queue per network, including events for rebroadcasting. As the memberlist TransmitLimitedQueue is (to a first approximation) LIFO, a flood of events from other nodes could delay the broadcasting of locally-generated events indefinitely. Prioritize broadcasting local events by splitting up the queues and only pulling from the rebroadcast queue if there is free space in the gossip packet after draining the local-broadcast queue. Signed-off-by: Cory Snider <[email protected]>
1 parent e9a7154 commit 6ec6e09

4 files changed

Lines changed: 48 additions & 23 deletions

File tree

libnetwork/networkdb/broadcast.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,3 +159,18 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
159159
})
160160
return nil
161161
}
162+
163+
func getBroadcasts(overhead, limit int, queues ...*memberlist.TransmitLimitedQueue) [][]byte {
164+
var msgs [][]byte
165+
for _, q := range queues {
166+
b := q.GetBroadcasts(overhead, limit)
167+
for _, m := range b {
168+
limit -= overhead + len(m)
169+
}
170+
msgs = append(msgs, b...)
171+
if limit <= 0 {
172+
break
173+
}
174+
}
175+
return msgs
176+
}

libnetwork/networkdb/cluster.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,14 +469,15 @@ func (nDB *NetworkDB) gossip() {
469469
continue
470470
}
471471

472-
msgs := network.tableBroadcasts.GetBroadcasts(compoundOverhead, bytesAvail)
472+
msgs := getBroadcasts(compoundOverhead, bytesAvail, network.tableBroadcasts, network.tableRebroadcasts)
473473
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
474474
network.qMessagesSent.Add(int64(len(msgs)))
475475
if printStats {
476476
msent := network.qMessagesSent.Swap(0)
477-
log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
477+
log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d+%d netMsg/s:%d",
478478
nDB.config.Hostname, nDB.config.NodeID,
479-
nid, network.leaving, network.tableBroadcasts.NumNodes(), network.entriesNumber.Load(), network.tableBroadcasts.NumQueued(),
479+
nid, network.leaving, network.tableBroadcasts.NumNodes(), network.entriesNumber.Load(),
480+
network.tableBroadcasts.NumQueued(), network.tableRebroadcasts.NumQueued(),
480481
msent/int64((nDB.config.StatsPrintPeriod/time.Second)))
481482
}
482483

libnetwork/networkdb/delegate.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,11 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
292292
}
293293

294294
// if the queue is over the threshold, avoid distributing information coming from TCP sync
295-
if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
295+
if isBulkSync && n.tableRebroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
296296
return
297297
}
298298

299-
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
299+
n.tableRebroadcasts.QueueBroadcast(&tableEventMessage{
300300
msg: buf,
301301
id: tEvent.NetworkID,
302302
tname: tEvent.TableName,
@@ -419,14 +419,7 @@ func (d *delegate) NotifyMsg(buf []byte) {
419419
}
420420

421421
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
422-
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
423-
for _, m := range msgs {
424-
limit -= overhead + len(m)
425-
}
426-
if limit > 0 {
427-
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
428-
}
429-
return msgs
422+
return getBroadcasts(overhead, limit, d.nDB.networkBroadcasts, d.nDB.nodeBroadcasts)
430423
}
431424

432425
func (d *delegate) LocalState(join bool) []byte {

libnetwork/networkdb/networkdb.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,21 @@ type thisNodeNetwork struct {
149149
// Gets set to true after the first bulk sync happens
150150
inSync bool
151151

152-
// The broadcast queue for table event gossip. This is only
153-
// initialized for this node's network attachment entries.
152+
// The broadcast queue for this network's table event gossip
153+
// for entries owned by this node.
154154
tableBroadcasts *memberlist.TransmitLimitedQueue
155155

156+
// The broadcast queue for this network's table event gossip
157+
// relayed from other nodes.
158+
//
159+
// Messages in this queue are broadcasted when there is space available
160+
// in the gossip packet after filling it with tableBroadcast messages.
161+
// Relayed messages are broadcasted at a lower priority than messages
162+
// originating from this node to ensure that local messages are always
163+
// broadcasted in a timely manner, irrespective of how many messages
164+
// from other nodes are queued for rebroadcasting.
165+
tableRebroadcasts *memberlist.TransmitLimitedQueue
166+
156167
// Number of gossip messages sent related to this network during the last stats collection period
157168
qMessagesSent atomic.Int64
158169

@@ -626,17 +637,22 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
626637
n.network = network{ltime: ltime}
627638
n.inSync = false
628639
} else {
640+
numNodes := func() int {
641+
// TODO fcrisciani this can be optimized maybe avoiding the lock?
642+
// this call is done each GetBroadcasts call to evaluate the number of
643+
// replicas for the message
644+
nDB.RLock()
645+
defer nDB.RUnlock()
646+
return len(nDB.networkNodes[nid])
647+
}
629648
n = &thisNodeNetwork{
630649
network: network{ltime: ltime},
631650
tableBroadcasts: &memberlist.TransmitLimitedQueue{
632-
NumNodes: func() int {
633-
// TODO fcrisciani this can be optimized maybe avoiding the lock?
634-
// this call is done each GetBroadcasts call to evaluate the number of
635-
// replicas for the message
636-
nDB.RLock()
637-
defer nDB.RUnlock()
638-
return len(nDB.networkNodes[nid])
639-
},
651+
NumNodes: numNodes,
652+
RetransmitMult: 4,
653+
},
654+
tableRebroadcasts: &memberlist.TransmitLimitedQueue{
655+
NumNodes: numNodes,
640656
RetransmitMult: 4,
641657
},
642658
}

0 commit comments

Comments
 (0)