@@ -936,7 +936,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
936936 CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0 , hdr};
937937}
938938
939- size_t CConnman::SocketSendData (CNode& node)
939+ std::pair< size_t , bool > CConnman::SocketSendData (CNode& node) const
940940{
941941 auto it = node.vSendMsg .begin ();
942942 size_t nSentSize = 0 ;
@@ -994,7 +994,7 @@ size_t CConnman::SocketSendData(CNode& node)
994994 }
995995 node.vSendMsg .erase (node.vSendMsg .begin (), it);
996996 node.nSendMsgSize = node.vSendMsg .size ();
997- return nSentSize;
997+ return { nSentSize, !node. vSendMsg . empty ()} ;
998998}
999999
10001000static bool ReverseCompareNodeMinPingTime (const NodeEvictionCandidate& a, const NodeEvictionCandidate& b)
@@ -1711,8 +1711,7 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
17111711 recv_set.insert (hListenSocket.sock ->Get ());
17121712 }
17131713
1714- for (CNode* pnode : nodes)
1715- {
1714+ for (CNode* pnode : nodes) {
17161715 bool select_recv = !pnode->fHasRecvData ;
17171716 bool select_send = !pnode->fCanSendData ;
17181717
@@ -2027,9 +2026,9 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
20272026
20282027 if (interruptNet) return ;
20292028
2030- std::vector <CNode*> vErrorNodes;
2031- std::vector <CNode*> vReceivableNodes;
2032- std::vector <CNode*> vSendableNodes;
2029+ std::set <CNode*> vErrorNodes;
2030+ std::set <CNode*> vReceivableNodes;
2031+ std::set <CNode*> vSendableNodes;
20332032 {
20342033 LOCK (cs_mapSocketToNode);
20352034 for (auto hSocket : error_set) {
@@ -2038,7 +2037,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
20382037 continue ;
20392038 }
20402039 it->second ->AddRef ();
2041- vErrorNodes.emplace_back (it->second );
2040+ vErrorNodes.emplace (it->second );
20422041 }
20432042 for (auto hSocket : recv_set) {
20442043 if (error_set.count (hSocket)) {
@@ -2073,7 +2072,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
20732072 {
20742073 LOCK (cs_sendable_receivable_nodes);
20752074
2076- vReceivableNodes.reserve (mapReceivableNodes.size ());
20772075 for (auto it = mapReceivableNodes.begin (); it != mapReceivableNodes.end (); ) {
20782076 if (!it->second ->fHasRecvData ) {
20792077 it = mapReceivableNodes.erase (it);
@@ -2088,7 +2086,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
20882086 // receiving data (which should succeed as the socket signalled as receivable).
20892087 if (!it->second ->fPauseRecv && it->second ->nSendMsgSize == 0 && !it->second ->fDisconnect ) {
20902088 it->second ->AddRef ();
2091- vReceivableNodes.emplace_back (it->second );
2089+ vReceivableNodes.emplace (it->second );
20922090 }
20932091 ++it;
20942092 }
@@ -2099,7 +2097,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
20992097 // also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration
21002098 // but don't have any in this iteration
21012099 LOCK (cs_mapNodesWithDataToSend);
2102- vSendableNodes.reserve (mapNodesWithDataToSend.size ());
21032100 for (auto it = mapNodesWithDataToSend.begin (); it != mapNodesWithDataToSend.end (); ) {
21042101 if (it->second ->nSendMsgSize == 0 ) {
21052102 // See comment in PushMessage
@@ -2108,13 +2105,36 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
21082105 } else {
21092106 if (it->second ->fCanSendData ) {
21102107 it->second ->AddRef ();
2111- vSendableNodes.emplace_back (it->second );
2108+ vSendableNodes.emplace (it->second );
21122109 }
21132110 ++it;
21142111 }
21152112 }
21162113 }
21172114
2115+ for (CNode* pnode : vSendableNodes) {
2116+ if (interruptNet) {
2117+ break ;
2118+ }
2119+
2120+ // Send data
2121+ auto [bytes_sent, data_left] = WITH_LOCK (pnode->cs_vSend , return SocketSendData (*pnode));
2122+ if (bytes_sent) {
2123+ RecordBytesSent (bytes_sent);
2124+
2125+ // If both receiving and (non-optimistic) sending were possible, we first attempt
2126+ // sending. If that succeeds, but does not fully drain the send queue, do not
2127+ // attempt to receive. This avoids needlessly queueing data if the remote peer
2128+ // is slow at receiving data, by means of TCP flow control. We only do this when
2129+ // sending actually succeeded to make sure progress is always made; otherwise a
2130+ // deadlock would be possible when both sides have data to send, but neither is
2131+ // receiving.
2132+ if (data_left && vReceivableNodes.erase (pnode)) {
2133+ pnode->Release ();
2134+ }
2135+ }
2136+ }
2137+
21182138 for (CNode* pnode : vErrorNodes)
21192139 {
21202140 if (interruptNet) {
@@ -2136,16 +2156,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
21362156 SocketRecvData (pnode);
21372157 }
21382158
2139- for (CNode* pnode : vSendableNodes) {
2140- if (interruptNet) {
2141- break ;
2142- }
2143-
2144- // Send data
2145- size_t bytes_sent = WITH_LOCK (pnode->cs_vSend , return SocketSendData (*pnode));
2146- if (bytes_sent) RecordBytesSent (bytes_sent);
2147- }
2148-
21492159 for (auto & node : vErrorNodes) {
21502160 node->Release ();
21512161 }
@@ -4183,7 +4193,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
41834193
41844194 {
41854195 LOCK (pnode->cs_vSend );
4186- bool hasPendingData = ! pnode->vSendMsg .empty ();
4196+ bool optimisticSend ( pnode->vSendMsg .empty () );
41874197
41884198 // log total amount of bytes per message type
41894199 pnode->mapSendBytesPerMsgType [msg.m_type ] += nTotalSize;
@@ -4206,7 +4216,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
42064216 }
42074217
42084218 // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
4209- if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup .load ()))
4219+ if (optimisticSend && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup .load ()))
42104220 m_wakeup_pipe->Write ();
42114221 }
42124222}
0 commit comments