@@ -1034,7 +1034,7 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
10341034 // Notify transport that bytes have been processed.
10351035 node.m_transport ->MarkBytesSent (nBytes);
10361036 // Update statistics per message type.
1037- node.mapSendBytesPerMsgType [ msg_type] += nBytes;
1037+ node.AccountForSentBytes ( msg_type, nBytes) ;
10381038 nSentSize += nBytes;
10391039 if ((size_t )nBytes != data.size ()) {
10401040 // could not send full message; stop sending more
@@ -1115,7 +1115,7 @@ bool CConnman::AttemptToEvictConnection()
11151115 .m_is_local = node->addr .IsLocal (),
11161116 .m_network = node->ConnectedThroughNetwork (),
11171117 .m_noban = node->HasPermission (NetPermissionFlags::NoBan),
1118- .m_conn_type = node->m_conn_type ,
1118+ .m_conn_type = node->GetConnectionType () ,
11191119 };
11201120 vEvictionCandidates.push_back (candidate);
11211121 }
@@ -1339,7 +1339,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
13391339
13401340 // Count existing connections
13411341 int existing_connections = WITH_LOCK (m_nodes_mutex,
1342- return std::count_if (m_nodes.begin (), m_nodes.end (), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
1342+ return std::count_if (m_nodes.begin (), m_nodes.end (), [conn_type](CNode* node) { return node->GetConnectionType () == conn_type; }););
13431343
13441344 // Max connections of specified type already exist
13451345 if (max_connections != std::nullopt && existing_connections >= max_connections) return false ;
@@ -1494,16 +1494,8 @@ void CConnman::CalculateNumConnectionsChangedStats()
14941494 mapSentBytesMsgStats[NET_MESSAGE_TYPE_OTHER] = 0 ;
14951495 const NodesSnapshot snap{*this , /* filter = */ CConnman::FullyConnectedOnly};
14961496 for (auto pnode : snap.Nodes ()) {
1497- {
1498- LOCK (pnode->cs_vRecv );
1499- for (const mapMsgTypeSize::value_type &i : pnode->mapRecvBytesPerMsgType )
1500- mapRecvBytesMsgStats[i.first ] += i.second ;
1501- }
1502- {
1503- LOCK (pnode->cs_vSend );
1504- for (const mapMsgTypeSize::value_type &i : pnode->mapSendBytesPerMsgType )
1505- mapSentBytesMsgStats[i.first ] += i.second ;
1506- }
1497+ WITH_LOCK (pnode->cs_vRecv , pnode->UpdateRecvMapWithStats (mapRecvBytesMsgStats));
1498+ WITH_LOCK (pnode->cs_vSend , pnode->UpdateSentMapWithStats (mapSentBytesMsgStats));
15071499 if (pnode->m_bloom_filter_loaded .load ()) {
15081500 spvNodes++;
15091501 } else {
@@ -2096,18 +2088,7 @@ size_t CConnman::SocketRecvData(CNode *pnode)
20962088 }
20972089 RecordBytesRecv (nBytes);
20982090 if (notify) {
2099- size_t nSizeAdded = 0 ;
2100- for (const auto & msg : pnode->vRecvMsg ) {
2101- // vRecvMsg contains only completed CNetMessage
2102- // the single possible partially deserialized message are held by TransportDeserializer
2103- nSizeAdded += msg.m_raw_message_size ;
2104- }
2105- {
2106- LOCK (pnode->cs_vProcessMsg );
2107- pnode->vProcessMsg .splice (pnode->vProcessMsg .end (), pnode->vRecvMsg );
2108- pnode->nProcessQueueSize += nSizeAdded;
2109- pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
2110- }
2091+ pnode->MarkReceivedMsgsForProcessing (nReceiveFloodSize);
21112092 WakeMessageHandler ();
21122093 }
21132094 }
@@ -2483,7 +2464,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
24832464 if (pnode->IsFullOutboundConn () && pnode->ConnectedThroughNetwork () == Network::NET_ONION) nOutboundOnionRelay++;
24842465
24852466 // Make sure our persistent outbound slots belong to different netgroups.
2486- switch (pnode->m_conn_type ) {
2467+ switch (pnode->GetConnectionType () ) {
24872468 // We currently don't take inbound connections into account. Since they are
24882469 // free to make, an attacker could make them to prevent us from connecting to
24892470 // certain peers.
@@ -4074,6 +4055,37 @@ CNode::CNode(NodeId idIn,
40744055 }
40754056}
40764057
4058+ void CNode::MarkReceivedMsgsForProcessing (unsigned int recv_flood_size)
4059+ {
4060+ AssertLockNotHeld (m_msg_process_queue_mutex);
4061+
4062+ size_t nSizeAdded = 0 ;
4063+ for (const auto & msg : vRecvMsg) {
4064+ // vRecvMsg contains only completed CNetMessage
4065+ // the single possible partially deserialized message are held by TransportDeserializer
4066+ nSizeAdded += msg.m_raw_message_size ;
4067+ }
4068+
4069+ LOCK (m_msg_process_queue_mutex);
4070+ m_msg_process_queue.splice (m_msg_process_queue.end (), vRecvMsg);
4071+ m_msg_process_queue_size += nSizeAdded;
4072+ fPauseRecv = m_msg_process_queue_size > recv_flood_size;
4073+ }
4074+
4075+ std::optional<std::pair<CNetMessage, bool >> CNode::PollMessage (size_t recv_flood_size)
4076+ {
4077+ LOCK (m_msg_process_queue_mutex);
4078+ if (m_msg_process_queue.empty ()) return std::nullopt ;
4079+
4080+ std::list<CNetMessage> msgs;
4081+ // Just take one message
4082+ msgs.splice (msgs.begin (), m_msg_process_queue, m_msg_process_queue.begin ());
4083+ m_msg_process_queue_size -= msgs.front ().m_raw_message_size ;
4084+ fPauseRecv = m_msg_process_queue_size > recv_flood_size;
4085+
4086+ return std::make_pair (std::move (msgs.front ()), !m_msg_process_queue.empty ());
4087+ }
4088+
40774089bool CConnman::NodeFullyConnected (const CNode* pnode)
40784090{
40794091 return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect ;
0 commit comments