@@ -661,13 +661,6 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
661661 connman->mapReceivableNodes .erase (GetId ());
662662 connman->mapSendableNodes .erase (GetId ());
663663 }
664- {
665- LOCK (connman->cs_mapNodesWithDataToSend );
666- if (connman->mapNodesWithDataToSend .erase (GetId ()) != 0 ) {
667- // See comment in PushMessage
668- Release ();
669- }
670- }
671664
672665 if (connman->m_edge_trig_events && !connman->m_edge_trig_events ->UnregisterEvents (m_sock->Get ())) {
673666 LogPrint (BCLog::NET, " EdgeTriggeredEvents::UnregisterEvents() failed\n " );
@@ -2521,17 +2514,11 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
25212514 LOCK2 (m_nodes_mutex, cs_sendable_receivable_nodes);
25222515 if (!mapReceivableNodes.empty ()) {
25232516 return true ;
2524- } else if (!mapSendableNodes.empty ()) {
2525- if (LOCK (cs_mapNodesWithDataToSend); !mapNodesWithDataToSend.empty ()) {
2526- // We must check if at least one of the nodes with pending messages is also
2527- // sendable, as otherwise a single node would be able to make the network
2528- // thread busy with polling.
2529- for (auto & p : mapNodesWithDataToSend) {
2530- if (mapSendableNodes.count (p.first )) {
2531- return true ;
2532- break ;
2533- }
2534- }
2517+ }
2518+ for (const auto & p : mapSendableNodes) {
2519+ const auto & [to_send, more, _msg_type] = p.second ->m_transport ->GetBytesToSend (p.second ->nSendMsgSize != 0 );
2520+ if (!to_send.empty ()) {
2521+ return true ;
25352522 }
25362523 }
25372524 return false ;
@@ -2607,52 +2594,30 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
26072594 assert (jt.first ->second == it->second );
26082595 it->second ->fCanSendData = true ;
26092596 }
2597+ }
26102598
2611- // collect nodes that have a receivable socket
2612- // also clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore
2613- {
2614- LOCK (cs_sendable_receivable_nodes);
2615-
2616- for (auto it = mapReceivableNodes.begin (); it != mapReceivableNodes.end (); ) {
2617- if (!it->second ->fHasRecvData ) {
2618- it = mapReceivableNodes.erase (it);
2619- } else {
2620- // Implement the following logic:
2621- // * If there is data to send, try sending data. As this only
2622- // happens when optimistic write failed, we choose to first drain the
2623- // write buffer in this case before receiving more. This avoids
2624- // needlessly queueing received data, if the remote peer is not themselves
2625- // receiving data. This means properly utilizing TCP flow control signalling.
2626- // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
2627- // receiving data (which should succeed as the socket signalled as receivable).
2628- if (!it->second ->fPauseRecv && !it->second ->fDisconnect && it->second ->nSendMsgSize == 0 ) {
2629- it->second ->AddRef ();
2630- vReceivableNodes.emplace (it->second );
2631- }
2632- ++it;
2633- }
2634- }
2599+ ForEachNode (AllNodes, [&](CNode* pnode) {
2600+ const auto & [to_send, more, _msg_type] = pnode->m_transport ->GetBytesToSend (pnode->nSendMsgSize != 0 );
2601+ // Collect nodes that have a receivable socket, implement the following logic:
2602+ // * If there is data to send, try sending data. As this only
2603+ // happens when optimistic write failed, we choose to first drain the
2604+ // write buffer in this case before receiving more. This avoids
2605+ // needlessly queueing received data, if the remote peer is not themselves
2606+ // receiving data. This means properly utilizing TCP flow control signalling.
2607+ // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
2608+ // receiving data (which should succeed as the socket signalled as receivable).
2609+ if (pnode->fHasRecvData && !pnode->fPauseRecv && !pnode->fDisconnect &&
2610+ (!pnode->m_transport ->ReceivedMessageComplete () || to_send.empty ())) {
2611+ pnode->AddRef ();
2612+ vReceivableNodes.emplace (pnode);
26352613 }
26362614
2637- // collect nodes that have data to send and have a socket with non-empty write buffers
2638- // also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration
2639- // but don't have any in this iteration
2640- LOCK (cs_mapNodesWithDataToSend);
2641- for (auto it = mapNodesWithDataToSend.begin (); it != mapNodesWithDataToSend.end (); ) {
2642- const auto & [to_send, more, _msg_type] = it->second ->m_transport ->GetBytesToSend (it->second ->nSendMsgSize != 0 );
2643- if (to_send.empty () && !more) {
2644- // See comment in PushMessage
2645- it->second ->Release ();
2646- it = mapNodesWithDataToSend.erase (it);
2647- } else {
2648- if (it->second ->fCanSendData ) {
2649- it->second ->AddRef ();
2650- vSendableNodes.emplace (it->second );
2651- }
2652- ++it;
2653- }
2615+ // Collect nodes that have data to send and have a socket with non-empty write buffers
2616+ if (pnode->fCanSendData && (!pnode->m_transport ->ReceivedMessageComplete () || !to_send.empty ())) {
2617+ pnode->AddRef ();
2618+ vSendableNodes.emplace (pnode);
26542619 }
2655- }
2620+ });
26562621
26572622 for (CNode* pnode : vSendableNodes) {
26582623 if (interruptNet) {
@@ -2724,6 +2689,15 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
27242689 ++it;
27252690 }
27262691 }
2692+ // clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore
2693+ for (auto it = mapReceivableNodes.begin (); it != mapReceivableNodes.end (); ) {
2694+ if (!it->second ->fHasRecvData ) {
2695+ LogPrint (BCLog::NET, " %s -- remove mapReceivableNodes, peer=%d\n " , __func__, it->second ->GetId ());
2696+ it = mapReceivableNodes.erase (it);
2697+ } else {
2698+ ++it;
2699+ }
2700+ }
27272701 }
27282702}
27292703
@@ -4259,10 +4233,6 @@ void CConnman::StopNodes()
42594233 LOCK (cs_sendable_receivable_nodes);
42604234 mapReceivableNodes.clear ();
42614235 }
4262- {
4263- LOCK (cs_mapNodesWithDataToSend);
4264- mapNodesWithDataToSend.clear ();
4265- }
42664236 m_nodes_disconnected.clear ();
42674237 vhListenSocket.clear ();
42684238 semOutbound.reset ();
@@ -4818,17 +4788,6 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
48184788 pnode->vSendMsg .push_back (std::move (msg));
48194789 pnode->nSendMsgSize = pnode->vSendMsg .size ();
48204790
4821- {
4822- LOCK (cs_mapNodesWithDataToSend);
4823- // we're not holding m_nodes_mutex here, so there is a chance of this node being disconnected shortly before
4824- // we get here. Whoever called PushMessage still has a ref to CNode*, but will later Release() it, so we
4825- // might end up having an entry in mapNodesWithDataToSend that is not in m_nodes anymore. We need to
4826- // Add/Release refs when adding/erasing mapNodesWithDataToSend.
4827- if (mapNodesWithDataToSend.emplace (pnode->GetId (), pnode).second ) {
4828- pnode->AddRef ();
4829- }
4830- }
4831-
48324791 // If there was nothing to send before, and there is now (predicted by the "more" value
48334792 // returned by the GetBytesToSend call above), attempt "optimistic write":
48344793 // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
0 commit comments