Skip to content

Commit 2ecba6b

Browse files
committed
partial bitcoin#26036: add NetEventsInterface::g_msgproc_mutex
This backport excludes annotations for members introduced in bitcoin#25717 as it hasn't been backported yet.
1 parent f6c9439 commit 2ecba6b

File tree

11 files changed

+77
-86
lines changed

11 files changed

+77
-86
lines changed

src/net.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3142,8 +3142,12 @@ void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, MasternodeP
31423142
OpenNetworkConnection(addrConnect, false, nullptr, nullptr, ConnectionType::OUTBOUND_FULL_RELAY, MasternodeConn::IsConnection, probe);
31433143
}
31443144

3145+
Mutex NetEventsInterface::g_msgproc_mutex;
3146+
31453147
void CConnman::ThreadMessageHandler()
31463148
{
3149+
LOCK(NetEventsInterface::g_msgproc_mutex);
3150+
31473151
int64_t nLastSendMessagesTimeMasternodes = 0;
31483152

31493153
FastRandomContext rng;
@@ -3173,7 +3177,6 @@ void CConnman::ThreadMessageHandler()
31733177
return;
31743178
// Send messages
31753179
if (!fSkipSendMessagesForMasternodes || !pnode->m_masternode_connection) {
3176-
LOCK(pnode->cs_sendProcessing);
31773180
m_msgproc->SendMessages(pnode);
31783181
}
31793182

src/net.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -481,8 +481,6 @@ class CNode
481481
std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg);
482482
size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0};
483483

484-
RecursiveMutex cs_sendProcessing;
485-
486484
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
487485

488486
std::atomic<std::chrono::seconds> m_last_send{0s};
@@ -816,6 +814,9 @@ class CNode
816814
class NetEventsInterface
817815
{
818816
public:
817+
/** Mutex for anything that is only accessed via the msg processing thread */
818+
static Mutex g_msgproc_mutex;
819+
819820
/** Initialize a peer (setup state, queue any initial messages) */
820821
virtual void InitializeNode(CNode& node, ServiceFlags our_services) = 0;
821822

@@ -829,15 +830,15 @@ class NetEventsInterface
829830
* @param[in] interrupt Interrupt condition for processing threads
830831
* @return True if there is more work to be done
831832
*/
832-
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0;
833+
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
833834

834835
/**
835836
* Send queued protocol messages to a given node.
836837
*
837838
* @param[in] pnode The node which we are sending messages to.
838839
* @return True if there is more work to be done
839840
*/
840-
virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) = 0;
841+
virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
841842

842843

843844
protected:

src/net_processing.cpp

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ struct Peer {
286286
bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false};
287287
// Last time a "MEMPOOL" request was serviced.
288288
std::atomic<std::chrono::seconds> m_last_mempool_req{0s};
289-
std::chrono::microseconds m_next_inv_send_time{0};
289+
std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0};
290290
};
291291

292292
// in bitcoin: m_tx_relay == nullptr if we're not relaying transactions with this peer
@@ -295,7 +295,7 @@ struct Peer {
295295
std::unique_ptr<TxRelay> m_tx_relay{std::make_unique<TxRelay>()};
296296

297297
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
298-
std::vector<CAddress> m_addrs_to_send;
298+
std::vector<CAddress> m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex);
299299
/** Probabilistic filter to track recent addr messages relayed with this
300300
* peer. Used to avoid relaying redundant addresses to this peer.
301301
*
@@ -305,7 +305,7 @@ struct Peer {
305305
*
306306
* Presence of this filter must correlate with m_addr_relay_enabled.
307307
**/
308-
std::unique_ptr<CRollingBloomFilter> m_addr_known;
308+
std::unique_ptr<CRollingBloomFilter> m_addr_known GUARDED_BY(NetEventsInterface::g_msgproc_mutex);
309309
/** Whether we are participating in address relay with this connection.
310310
*
311311
* We set this bool to true for outbound peers (other than
@@ -324,7 +324,7 @@ struct Peer {
324324
/** Whether a Peer can only be relayed blocks */
325325
const bool m_block_relay_only{false};
326326
/** Whether a getaddr request to this peer is outstanding. */
327-
bool m_getaddr_sent{false};
327+
bool m_getaddr_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
328328
/** Guards address sending timers. */
329329
mutable Mutex m_addr_send_times_mutex;
330330
/** Time point to send the next ADDR message to this peer. */
@@ -335,12 +335,12 @@ struct Peer {
335335
* messages, indicating a preference to receive ADDRv2 instead of ADDR ones. */
336336
std::atomic_bool m_wants_addrv2{false};
337337
/** Whether this peer has already sent us a getaddr message. */
338-
bool m_getaddr_recvd{false};
338+
bool m_getaddr_recvd GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
339339
/** Number of addresses that can be processed from this peer. Start at 1 to
340340
* permit self-announcement. */
341-
double m_addr_token_bucket{1.0};
341+
double m_addr_token_bucket GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1.0};
342342
/** When m_addr_token_bucket was last updated */
343-
std::chrono::microseconds m_addr_token_timestamp{GetTime<std::chrono::microseconds>()};
343+
std::chrono::microseconds m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){GetTime<std::chrono::microseconds>()};
344344
/** Total number of addresses that were dropped due to rate limiting. */
345345
std::atomic<uint64_t> m_addr_rate_limited{0};
346346
/** Total number of addresses that were processed (excludes rate-limited ones). */
@@ -350,15 +350,15 @@ struct Peer {
350350
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
351351

352352
/** Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */
353-
bool m_inv_triggered_getheaders_before_sync{false};
353+
bool m_inv_triggered_getheaders_before_sync GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
354354

355355
/** Protects m_getdata_requests **/
356356
Mutex m_getdata_requests_mutex;
357357
/** Work queue of items requested by this peer **/
358358
std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex);
359359

360360
/** Time of the last getheaders message to this peer */
361-
std::atomic<std::chrono::seconds> m_last_getheaders_timestamp{0s};
361+
std::atomic<std::chrono::seconds> m_last_getheaders_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0s};
362362

363363
explicit Peer(NodeId id, ServiceFlags our_services, bool block_relay_only)
364364
: m_id(id)
@@ -549,9 +549,9 @@ class PeerManagerImpl final : public PeerManager
549549
void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
550550
void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
551551
bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt) override
552-
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex);
553-
bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing)
554-
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex);
552+
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex);
553+
bool SendMessages(CNode* pto) override
554+
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex);
555555

556556
/** Implement PeerManager */
557557
void StartScheduledTasks(CScheduler& scheduler) override;
@@ -570,7 +570,7 @@ class PeerManagerImpl final : public PeerManager
570570
void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
571571
void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
572572
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override
573-
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex);
573+
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex);
574574
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override;
575575
bool IsBanned(NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex);
576576
void EraseObjectRequest(NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
@@ -584,7 +584,7 @@ class PeerManagerImpl final : public PeerManager
584584
void ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
585585

586586
/** Consider evicting an outbound peer based on the amount of time they've been behind our tip */
587-
void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
587+
void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex);
588588

589589
/** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */
590590
void EvictExtraOutboundPeers(std::chrono::seconds now) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
@@ -638,20 +638,21 @@ class PeerManagerImpl final : public PeerManager
638638
void ProcessHeadersMessage(CNode& pfrom, Peer& peer,
639639
const std::vector<CBlockHeader>& headers,
640640
bool via_compact_block)
641-
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
641+
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
642642
/** Various helpers for headers processing, invoked by ProcessHeadersMessage() */
643643
/** Deal with state tracking and headers sync for peers that send the
644644
* occasional non-connecting header (this can happen due to BIP 130 headers
645645
* announcements for blocks interacting with the 2hr (MAX_FUTURE_BLOCK_TIME) rule). */
646646
void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector<CBlockHeader>& headers)
647-
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
647+
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
648648
/** Return true if the headers connect to each other, false otherwise */
649649
bool CheckHeadersAreContinuous(const std::vector<CBlockHeader>& headers) const;
650650
/** Request further headers from this peer with a given locator.
651651
* We don't issue a getheaders message if we have a recent one outstanding.
652652
* This returns true if a getheaders is actually sent, and false otherwise.
653653
*/
654-
bool MaybeSendGetHeaders(CNode& pfrom, const std::string& msg_type, const CBlockLocator& locator, Peer& peer);
654+
bool MaybeSendGetHeaders(CNode& pfrom, const std::string& msg_type, const CBlockLocator& locator, Peer& peer)
655+
EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
655656
/** Potentially fetch blocks from this peer upon receipt of a new headers tip */
656657
void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast);
657658
/** Update peer state based on received headers message */
@@ -670,7 +671,8 @@ class PeerManagerImpl final : public PeerManager
670671
void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now);
671672

672673
/** Send `addr` messages on a regular schedule. */
673-
void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time);
674+
void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time)
675+
EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
674676

675677
/** Relay (gossip) an address to a few randomly chosen nodes.
676678
*
@@ -679,7 +681,8 @@ class PeerManagerImpl final : public PeerManager
679681
* @param[in] fReachable Whether the address' network is reachable. We relay unreachable
680682
* addresses less.
681683
*/
682-
void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
684+
void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable)
685+
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
683686

684687
const CChainParams& m_chainparams;
685688
CConnman& m_connman;
@@ -807,13 +810,16 @@ class PeerManagerImpl final : public PeerManager
807810
* @return True if address relay is enabled with peer
808811
* False if address relay is disallowed
809812
*/
810-
bool SetupAddressRelay(const CNode& node, Peer& peer);
813+
bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
814+
815+
void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
816+
void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
811817

812818
/** Number of nodes with fSyncStarted. */
813819
int nSyncStarted GUARDED_BY(cs_main) = 0;
814820

815821
/** Hash of the last block we received via INV */
816-
uint256 m_last_block_inv_triggering_headers_sync{};
822+
uint256 m_last_block_inv_triggering_headers_sync GUARDED_BY(g_msgproc_mutex){};
817823

818824
/**
819825
* Sources of received blocks, saved to be able punish them when processing
@@ -889,7 +895,7 @@ class PeerManagerImpl final : public PeerManager
889895
uint256 m_most_recent_block_hash GUARDED_BY(m_most_recent_block_mutex);
890896

891897
/** Height of the highest block announced using BIP 152 high-bandwidth mode. */
892-
int m_highest_fast_announce{0};
898+
int m_highest_fast_announce GUARDED_BY(::cs_main){0};
893899

894900
/* Returns a bool indicating whether we requested this block.
895901
* Also used if a block was /not/ received and timed out or started with another peer
@@ -981,13 +987,13 @@ static bool IsAddrCompatible(const Peer& peer, const CAddress& addr)
981987
return peer.m_wants_addrv2 || addr.IsAddrV1Compatible();
982988
}
983989

984-
static void AddAddressKnown(Peer& peer, const CAddress& addr)
990+
void PeerManagerImpl::AddAddressKnown(Peer& peer, const CAddress& addr)
985991
{
986992
assert(peer.m_addr_known);
987993
peer.m_addr_known->insert(addr.GetKey());
988994
}
989995

990-
static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand)
996+
void PeerManagerImpl::PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand)
991997
{
992998
// Known checking here is only to save space from duplicates.
993999
// Before sending, we'll filter it again for known addresses that were
@@ -3250,6 +3256,8 @@ void PeerManagerImpl::ProcessMessage(
32503256
const std::chrono::microseconds time_received,
32513257
const std::atomic<bool>& interruptMsgProc)
32523258
{
3259+
AssertLockHeld(g_msgproc_mutex);
3260+
32533261
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId());
32543262
statsClient.inc("message.received." + SanitizeString(msg_type), 1.0f);
32553263

@@ -4971,6 +4979,8 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer)
49714979

49724980
bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc)
49734981
{
4982+
AssertLockHeld(g_msgproc_mutex);
4983+
49744984
bool fMoreWork = false;
49754985

49764986
PeerRef peer = GetPeerRef(pfrom->GetId());
@@ -5313,7 +5323,7 @@ void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::micros
53135323

53145324
// Remove addr records that the peer already knows about, and add new
53155325
// addrs to the m_addr_known filter on the same pass.
5316-
auto addr_already_known = [&peer](const CAddress& addr) {
5326+
auto addr_already_known = [&peer](const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) {
53175327
bool ret = peer.m_addr_known->contains(addr.GetKey());
53185328
if (!ret) peer.m_addr_known->insert(addr.GetKey());
53195329
return ret;
@@ -5379,6 +5389,8 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer)
53795389

53805390
bool PeerManagerImpl::SendMessages(CNode* pto)
53815391
{
5392+
AssertLockHeld(g_msgproc_mutex);
5393+
53825394
assert(m_llmq_ctx);
53835395

53845396
const bool is_masternode = m_mn_activeman != nullptr;

src/net_processing.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class PeerManager : public CValidationInterface, public NetEventsInterface
125125

126126
/** Process a single message from a peer. Public for fuzz testing */
127127
virtual void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
128-
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) = 0;
128+
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
129129

130130
/** This function is used for testing the stale tip eviction logic, see denialofservice_tests.cpp */
131131
virtual void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) = 0;

0 commit comments

Comments
 (0)