Skip to content

Commit 9023dd2

Browse files
committed
merge bitcoin#27257: End friendship of CNode, CConnman and ConnmanTestMsg
1 parent 3465df2 commit 9023dd2

File tree

4 files changed

+94
-58
lines changed

4 files changed

+94
-58
lines changed

src/net.cpp

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,7 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
10341034
// Notify transport that bytes have been processed.
10351035
node.m_transport->MarkBytesSent(nBytes);
10361036
// Update statistics per message type.
1037-
node.mapSendBytesPerMsgType[msg_type] += nBytes;
1037+
node.AccountForSentBytes(msg_type, nBytes);
10381038
nSentSize += nBytes;
10391039
if ((size_t)nBytes != data.size()) {
10401040
// could not send full message; stop sending more
@@ -1115,7 +1115,7 @@ bool CConnman::AttemptToEvictConnection()
11151115
.m_is_local = node->addr.IsLocal(),
11161116
.m_network = node->ConnectedThroughNetwork(),
11171117
.m_noban = node->HasPermission(NetPermissionFlags::NoBan),
1118-
.m_conn_type = node->m_conn_type,
1118+
.m_conn_type = node->GetConnectionType(),
11191119
};
11201120
vEvictionCandidates.push_back(candidate);
11211121
}
@@ -1339,7 +1339,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
13391339

13401340
// Count existing connections
13411341
int existing_connections = WITH_LOCK(m_nodes_mutex,
1342-
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
1342+
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->GetConnectionType() == conn_type; }););
13431343

13441344
// Max connections of specified type already exist
13451345
if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
@@ -1494,16 +1494,8 @@ void CConnman::CalculateNumConnectionsChangedStats()
14941494
mapSentBytesMsgStats[NET_MESSAGE_TYPE_OTHER] = 0;
14951495
const NodesSnapshot snap{*this, /* filter = */ CConnman::FullyConnectedOnly};
14961496
for (auto pnode : snap.Nodes()) {
1497-
{
1498-
LOCK(pnode->cs_vRecv);
1499-
for (const mapMsgTypeSize::value_type &i : pnode->mapRecvBytesPerMsgType)
1500-
mapRecvBytesMsgStats[i.first] += i.second;
1501-
}
1502-
{
1503-
LOCK(pnode->cs_vSend);
1504-
for (const mapMsgTypeSize::value_type &i : pnode->mapSendBytesPerMsgType)
1505-
mapSentBytesMsgStats[i.first] += i.second;
1506-
}
1497+
WITH_LOCK(pnode->cs_vRecv, pnode->UpdateRecvMapWithStats(mapRecvBytesMsgStats));
1498+
WITH_LOCK(pnode->cs_vSend, pnode->UpdateSentMapWithStats(mapSentBytesMsgStats));
15071499
if (pnode->m_bloom_filter_loaded.load()) {
15081500
spvNodes++;
15091501
} else {
@@ -2096,18 +2088,7 @@ size_t CConnman::SocketRecvData(CNode *pnode)
20962088
}
20972089
RecordBytesRecv(nBytes);
20982090
if (notify) {
2099-
size_t nSizeAdded = 0;
2100-
for (const auto& msg : pnode->vRecvMsg) {
2101-
// vRecvMsg contains only completed CNetMessage
2102-
// the single possible partially deserialized message are held by TransportDeserializer
2103-
nSizeAdded += msg.m_raw_message_size;
2104-
}
2105-
{
2106-
LOCK(pnode->cs_vProcessMsg);
2107-
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg);
2108-
pnode->nProcessQueueSize += nSizeAdded;
2109-
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
2110-
}
2091+
pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize);
21112092
WakeMessageHandler();
21122093
}
21132094
}
@@ -2483,7 +2464,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
24832464
if (pnode->IsFullOutboundConn() && pnode->ConnectedThroughNetwork() == Network::NET_ONION) nOutboundOnionRelay++;
24842465

24852466
// Make sure our persistent outbound slots belong to different netgroups.
2486-
switch (pnode->m_conn_type) {
2467+
switch (pnode->GetConnectionType()) {
24872468
// We currently don't take inbound connections into account. Since they are
24882469
// free to make, an attacker could make them to prevent us from connecting to
24892470
// certain peers.
@@ -4074,6 +4055,37 @@ CNode::CNode(NodeId idIn,
40744055
}
40754056
}
40764057

4058+
void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
4059+
{
4060+
AssertLockNotHeld(m_msg_process_queue_mutex);
4061+
4062+
size_t nSizeAdded = 0;
4063+
for (const auto& msg : vRecvMsg) {
4064+
// vRecvMsg contains only completed CNetMessage
4065+
// the single possible partially deserialized message are held by TransportDeserializer
4066+
nSizeAdded += msg.m_raw_message_size;
4067+
}
4068+
4069+
LOCK(m_msg_process_queue_mutex);
4070+
m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
4071+
m_msg_process_queue_size += nSizeAdded;
4072+
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
4073+
}
4074+
4075+
std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood_size)
4076+
{
4077+
LOCK(m_msg_process_queue_mutex);
4078+
if (m_msg_process_queue.empty()) return std::nullopt;
4079+
4080+
std::list<CNetMessage> msgs;
4081+
// Just take one message
4082+
msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
4083+
m_msg_process_queue_size -= msgs.front().m_raw_message_size;
4084+
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
4085+
4086+
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
4087+
}
4088+
40774089
bool CConnman::NodeFullyConnected(const CNode* pnode)
40784090
{
40794091
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;

src/net.h

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -459,9 +459,6 @@ struct CNodeOptions
459459
/** Information about a peer */
460460
class CNode
461461
{
462-
friend class CConnman;
463-
friend struct ConnmanTestMsg;
464-
465462
public:
466463
/** Transport serializer/deserializer. The receive side functions are only called under cs_vRecv, while
467464
* the sending side functions are only called under cs_vSend. */
@@ -490,10 +487,6 @@ class CNode
490487
Mutex m_sock_mutex;
491488
Mutex cs_vRecv;
492489

493-
RecursiveMutex cs_vProcessMsg;
494-
std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg);
495-
size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0};
496-
497490
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
498491

499492
std::atomic<std::chrono::seconds> m_last_send{0s};
@@ -553,6 +546,48 @@ class CNode
553546
std::atomic_bool fHasRecvData{false};
554547
std::atomic_bool fCanSendData{false};
555548

549+
const ConnectionType& GetConnectionType() const
550+
{
551+
return m_conn_type;
552+
}
553+
554+
/** Move all messages from the received queue to the processing queue. */
555+
void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
556+
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
557+
558+
/** Poll the next message from the processing queue of this connection.
559+
*
560+
* Returns std::nullopt if the processing queue is empty, or a pair
561+
* consisting of the message and a bool that indicates if the processing
562+
* queue has more entries. */
563+
std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size)
564+
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
565+
566+
/** Account for the total size of a sent message in the per msg type connection stats. */
567+
void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes)
568+
EXCLUSIVE_LOCKS_REQUIRED(cs_vSend)
569+
{
570+
mapSendBytesPerMsgType[msg_type] += sent_bytes;
571+
}
572+
573+
/** Update a supplied map with bytes sent for each msg type for this node */
574+
void UpdateSentMapWithStats(mapMsgTypeSize& map_sentbytes_msg)
575+
EXCLUSIVE_LOCKS_REQUIRED(cs_vSend)
576+
{
577+
for (auto& [msg_type, bytes] : mapSendBytesPerMsgType) {
578+
map_sentbytes_msg[msg_type] += bytes;
579+
}
580+
}
581+
582+
/** Update a supplied map with bytes recv for each msg type for this node */
583+
void UpdateRecvMapWithStats(mapMsgTypeSize& map_recvbytes_msg)
584+
EXCLUSIVE_LOCKS_REQUIRED(cs_vRecv)
585+
{
586+
for (auto& [msg_type, bytes] : mapRecvBytesPerMsgType) {
587+
map_recvbytes_msg[msg_type] += bytes;
588+
}
589+
}
590+
556591
/**
557592
* Get network the peer connected through.
558593
*
@@ -564,6 +599,7 @@ class CNode
564599
* @return network the peer connected through.
565600
*/
566601
Network ConnectedThroughNetwork() const;
602+
567603
bool IsOutboundOrBlockRelayConn() const {
568604
switch (m_conn_type) {
569605
case ConnectionType::OUTBOUND_FULL_RELAY:
@@ -794,6 +830,10 @@ class CNode
794830

795831
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
796832

833+
Mutex m_msg_process_queue_mutex;
834+
std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
835+
size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
836+
797837
// Our address, as reported by the peer
798838
CService addrLocal GUARDED_BY(m_addr_local_mutex);
799839
mutable Mutex m_addr_local_mutex;

src/net_processing.cpp

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5013,8 +5013,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
50135013
{
50145014
AssertLockHeld(g_msgproc_mutex);
50155015

5016-
bool fMoreWork = false;
5017-
50185016
PeerRef peer = GetPeerRef(pfrom->GetId());
50195017
if (peer == nullptr) return false;
50205018

@@ -5050,17 +5048,14 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
50505048
// Don't bother if send buffer is too full to respond anyway
50515049
if (pfrom->fPauseSend) return false;
50525050

5053-
std::list<CNetMessage> msgs;
5054-
{
5055-
LOCK(pfrom->cs_vProcessMsg);
5056-
if (pfrom->vProcessMsg.empty()) return false;
5057-
// Just take one message
5058-
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
5059-
pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
5060-
pfrom->fPauseRecv = pfrom->nProcessQueueSize > m_connman.GetReceiveFloodSize();
5061-
fMoreWork = !pfrom->vProcessMsg.empty();
5062-
}
5063-
CNetMessage& msg(msgs.front());
5051+
auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())};
5052+
if (!poll_result) {
5053+
// No message to process
5054+
return false;
5055+
}
5056+
5057+
CNetMessage& msg{poll_result->first};
5058+
bool fMoreWork = poll_result->second;
50645059

50655060
TRACE6(net, inbound_message,
50665061
pfrom->GetId(),

src/test/util/net.cpp

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
6868
{
6969
assert(node.ReceiveMsgBytes(msg_bytes, complete));
7070
if (complete) {
71-
size_t nSizeAdded = 0;
72-
for (const auto& msg : node.vRecvMsg) {
73-
// vRecvMsg contains only completed CNetMessage
74-
// the single possible partially deserialized message are held by TransportDeserializer
75-
nSizeAdded += msg.m_raw_message_size;
76-
}
77-
{
78-
LOCK(node.cs_vProcessMsg);
79-
node.vProcessMsg.splice(node.vProcessMsg.end(), node.vRecvMsg);
80-
node.nProcessQueueSize += nSizeAdded;
81-
node.fPauseRecv = node.nProcessQueueSize > nReceiveFloodSize;
82-
}
71+
node.MarkReceivedMsgsForProcessing(nReceiveFloodSize);
8372
}
8473
}
8574

0 commit comments

Comments
 (0)