Skip to content

Commit f9f8805

Browse files
UdjinM6kwvg
authored andcommitted
fix: drop CConnman::mapNodesWithDataToSend, use transport data
1 parent d39d8a4 commit f9f8805

File tree

2 files changed

+34
-78
lines changed

2 files changed

+34
-78
lines changed

src/net.cpp

Lines changed: 34 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -661,13 +661,6 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
661661
connman->mapReceivableNodes.erase(GetId());
662662
connman->mapSendableNodes.erase(GetId());
663663
}
664-
{
665-
LOCK(connman->cs_mapNodesWithDataToSend);
666-
if (connman->mapNodesWithDataToSend.erase(GetId()) != 0) {
667-
// See comment in PushMessage
668-
Release();
669-
}
670-
}
671664

672665
if (connman->m_edge_trig_events && !connman->m_edge_trig_events->UnregisterEvents(m_sock->Get())) {
673666
LogPrint(BCLog::NET, "EdgeTriggeredEvents::UnregisterEvents() failed\n");
@@ -2521,17 +2514,11 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
25212514
LOCK2(m_nodes_mutex, cs_sendable_receivable_nodes);
25222515
if (!mapReceivableNodes.empty()) {
25232516
return true;
2524-
} else if (!mapSendableNodes.empty()) {
2525-
if (LOCK(cs_mapNodesWithDataToSend); !mapNodesWithDataToSend.empty()) {
2526-
// We must check if at least one of the nodes with pending messages is also
2527-
// sendable, as otherwise a single node would be able to make the network
2528-
// thread busy with polling.
2529-
for (auto& p : mapNodesWithDataToSend) {
2530-
if (mapSendableNodes.count(p.first)) {
2531-
return true;
2532-
break;
2533-
}
2534-
}
2517+
}
2518+
for (const auto& p : mapSendableNodes) {
2519+
const auto& [to_send, more, _msg_type] = p.second->m_transport->GetBytesToSend(p.second->nSendMsgSize != 0);
2520+
if (!to_send.empty()) {
2521+
return true;
25352522
}
25362523
}
25372524
return false;
@@ -2607,52 +2594,30 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
26072594
assert(jt.first->second == it->second);
26082595
it->second->fCanSendData = true;
26092596
}
2597+
}
26102598

2611-
// collect nodes that have a receivable socket
2612-
// also clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore
2613-
{
2614-
LOCK(cs_sendable_receivable_nodes);
2615-
2616-
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
2617-
if (!it->second->fHasRecvData) {
2618-
it = mapReceivableNodes.erase(it);
2619-
} else {
2620-
// Implement the following logic:
2621-
// * If there is data to send, try sending data. As this only
2622-
// happens when optimistic write failed, we choose to first drain the
2623-
// write buffer in this case before receiving more. This avoids
2624-
// needlessly queueing received data, if the remote peer is not themselves
2625-
// receiving data. This means properly utilizing TCP flow control signalling.
2626-
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
2627-
// receiving data (which should succeed as the socket signalled as receivable).
2628-
if (!it->second->fPauseRecv && !it->second->fDisconnect && it->second->nSendMsgSize == 0) {
2629-
it->second->AddRef();
2630-
vReceivableNodes.emplace(it->second);
2631-
}
2632-
++it;
2633-
}
2634-
}
2599+
ForEachNode(AllNodes, [&](CNode* pnode) {
2600+
const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(pnode->nSendMsgSize != 0);
2601+
// Collect nodes that have a receivable socket, implement the following logic:
2602+
// * If there is data to send, try sending data. As this only
2603+
// happens when optimistic write failed, we choose to first drain the
2604+
// write buffer in this case before receiving more. This avoids
2605+
// needlessly queueing received data, if the remote peer is not themselves
2606+
// receiving data. This means properly utilizing TCP flow control signalling.
2607+
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
2608+
// receiving data (which should succeed as the socket signalled as receivable).
2609+
if (pnode->fHasRecvData && !pnode->fPauseRecv && !pnode->fDisconnect &&
2610+
(!pnode->m_transport->ReceivedMessageComplete() || to_send.empty())) {
2611+
pnode->AddRef();
2612+
vReceivableNodes.emplace(pnode);
26352613
}
26362614

2637-
// collect nodes that have data to send and have a socket with non-empty write buffers
2638-
// also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration
2639-
// but don't have any in this iteration
2640-
LOCK(cs_mapNodesWithDataToSend);
2641-
for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) {
2642-
const auto& [to_send, more, _msg_type] = it->second->m_transport->GetBytesToSend(it->second->nSendMsgSize != 0);
2643-
if (to_send.empty() && !more) {
2644-
// See comment in PushMessage
2645-
it->second->Release();
2646-
it = mapNodesWithDataToSend.erase(it);
2647-
} else {
2648-
if (it->second->fCanSendData) {
2649-
it->second->AddRef();
2650-
vSendableNodes.emplace(it->second);
2651-
}
2652-
++it;
2653-
}
2615+
// Collect nodes that have data to send and have a socket with non-empty write buffers
2616+
if (pnode->fCanSendData && (!pnode->m_transport->ReceivedMessageComplete() || !to_send.empty())) {
2617+
pnode->AddRef();
2618+
vSendableNodes.emplace(pnode);
26542619
}
2655-
}
2620+
});
26562621

26572622
for (CNode* pnode : vSendableNodes) {
26582623
if (interruptNet) {
@@ -2724,6 +2689,15 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
27242689
++it;
27252690
}
27262691
}
2692+
// clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore
2693+
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
2694+
if (!it->second->fHasRecvData) {
2695+
LogPrint(BCLog::NET, "%s -- remove mapReceivableNodes, peer=%d\n", __func__, it->second->GetId());
2696+
it = mapReceivableNodes.erase(it);
2697+
} else {
2698+
++it;
2699+
}
2700+
}
27272701
}
27282702
}
27292703

@@ -4259,10 +4233,6 @@ void CConnman::StopNodes()
42594233
LOCK(cs_sendable_receivable_nodes);
42604234
mapReceivableNodes.clear();
42614235
}
4262-
{
4263-
LOCK(cs_mapNodesWithDataToSend);
4264-
mapNodesWithDataToSend.clear();
4265-
}
42664236
m_nodes_disconnected.clear();
42674237
vhListenSocket.clear();
42684238
semOutbound.reset();
@@ -4818,17 +4788,6 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
48184788
pnode->vSendMsg.push_back(std::move(msg));
48194789
pnode->nSendMsgSize = pnode->vSendMsg.size();
48204790

4821-
{
4822-
LOCK(cs_mapNodesWithDataToSend);
4823-
// we're not holding m_nodes_mutex here, so there is a chance of this node being disconnected shortly before
4824-
// we get here. Whoever called PushMessage still has a ref to CNode*, but will later Release() it, so we
4825-
// might end up having an entry in mapNodesWithDataToSend that is not in m_nodes anymore. We need to
4826-
// Add/Release refs when adding/erasing mapNodesWithDataToSend.
4827-
if (mapNodesWithDataToSend.emplace(pnode->GetId(), pnode).second) {
4828-
pnode->AddRef();
4829-
}
4830-
}
4831-
48324791
// If there was nothing to send before, and there is now (predicted by the "more" value
48334792
// returned by the GetBytesToSend call above), attempt "optimistic write":
48344793
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually

src/net.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1856,9 +1856,6 @@ friend class CNode;
18561856
Mutex cs_sendable_receivable_nodes;
18571857
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);
18581858
std::unordered_map<NodeId, CNode*> mapSendableNodes GUARDED_BY(cs_sendable_receivable_nodes);
1859-
/** Protected by cs_mapNodesWithDataToSend */
1860-
std::unordered_map<NodeId, CNode*> mapNodesWithDataToSend GUARDED_BY(cs_mapNodesWithDataToSend);
1861-
mutable RecursiveMutex cs_mapNodesWithDataToSend;
18621859

18631860
std::thread threadDNSAddressSeed;
18641861
std::thread threadSocketHandler;

0 commit comments

Comments
 (0)