@@ -286,7 +286,7 @@ struct Peer {
286286 bool m_send_mempool GUARDED_BY (m_tx_inventory_mutex){false };
287287 // Last time a "MEMPOOL" request was serviced.
288288 std::atomic<std::chrono::seconds> m_last_mempool_req{0s};
289- std::chrono::microseconds m_next_inv_send_time{0 };
289+ std::chrono::microseconds m_next_inv_send_time GUARDED_BY (NetEventsInterface::g_msgproc_mutex) {0 };
290290 };
291291
292292 // in bitcoin: m_tx_relay == nullptr if we're not relaying transactions with this peer
@@ -295,7 +295,7 @@ struct Peer {
295295 std::unique_ptr<TxRelay> m_tx_relay{std::make_unique<TxRelay>()};
296296
297297 /* * A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
298- std::vector<CAddress> m_addrs_to_send;
298+ std::vector<CAddress> m_addrs_to_send GUARDED_BY (NetEventsInterface::g_msgproc_mutex) ;
299299 /* * Probabilistic filter to track recent addr messages relayed with this
300300 * peer. Used to avoid relaying redundant addresses to this peer.
301301 *
@@ -305,7 +305,7 @@ struct Peer {
305305 *
306306 * Presence of this filter must correlate with m_addr_relay_enabled.
307307 **/
308- std::unique_ptr<CRollingBloomFilter> m_addr_known;
308+ std::unique_ptr<CRollingBloomFilter> m_addr_known GUARDED_BY (NetEventsInterface::g_msgproc_mutex) ;
309309 /* * Whether we are participating in address relay with this connection.
310310 *
311311 * We set this bool to true for outbound peers (other than
@@ -324,7 +324,7 @@ struct Peer {
324324 /* * Whether a Peer can only be relayed blocks */
325325 const bool m_block_relay_only{false };
326326 /* * Whether a getaddr request to this peer is outstanding. */
327- bool m_getaddr_sent{false };
327+ bool m_getaddr_sent GUARDED_BY (NetEventsInterface::g_msgproc_mutex) {false };
328328 /* * Guards address sending timers. */
329329 mutable Mutex m_addr_send_times_mutex;
330330 /* * Time point to send the next ADDR message to this peer. */
@@ -335,12 +335,12 @@ struct Peer {
335335 * messages, indicating a preference to receive ADDRv2 instead of ADDR ones. */
336336 std::atomic_bool m_wants_addrv2{false };
337337 /* * Whether this peer has already sent us a getaddr message. */
338- bool m_getaddr_recvd{false };
338+ bool m_getaddr_recvd GUARDED_BY (NetEventsInterface::g_msgproc_mutex) {false };
339339 /* * Number of addresses that can be processed from this peer. Start at 1 to
340340 * permit self-announcement. */
341- double m_addr_token_bucket{1.0 };
341+ double m_addr_token_bucket GUARDED_BY (NetEventsInterface::g_msgproc_mutex) {1.0 };
342342 /* * When m_addr_token_bucket was last updated */
343- std::chrono::microseconds m_addr_token_timestamp{GetTime<std::chrono::microseconds>()};
343+ std::chrono::microseconds m_addr_token_timestamp GUARDED_BY (NetEventsInterface::g_msgproc_mutex) {GetTime<std::chrono::microseconds>()};
344344 /* * Total number of addresses that were dropped due to rate limiting. */
345345 std::atomic<uint64_t > m_addr_rate_limited{0 };
346346 /* * Total number of addresses that were processed (excludes rate-limited ones). */
@@ -350,15 +350,15 @@ struct Peer {
350350 std::set<uint256> m_orphan_work_set GUARDED_BY (g_cs_orphans);
351351
352352 /* * Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */
353- bool m_inv_triggered_getheaders_before_sync{false };
353+ bool m_inv_triggered_getheaders_before_sync GUARDED_BY (NetEventsInterface::g_msgproc_mutex) {false };
354354
355355 /* * Protects m_getdata_requests **/
356356 Mutex m_getdata_requests_mutex;
357357 /* * Work queue of items requested by this peer **/
358358 std::deque<CInv> m_getdata_requests GUARDED_BY (m_getdata_requests_mutex);
359359
360360 /* * Time of the last getheaders message to this peer */
361- std::atomic<std::chrono::seconds> m_last_getheaders_timestamp{0s};
361+ std::atomic<std::chrono::seconds> m_last_getheaders_timestamp GUARDED_BY (NetEventsInterface::g_msgproc_mutex) {0s};
362362
363363 explicit Peer (NodeId id, ServiceFlags our_services, bool block_relay_only)
364364 : m_id(id)
@@ -549,9 +549,9 @@ class PeerManagerImpl final : public PeerManager
549549 void InitializeNode (CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
550550 void FinalizeNode (const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
551551 bool ProcessMessages (CNode* pfrom, std::atomic<bool >& interrupt) override
552- EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex);
553- bool SendMessages (CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing)
554- EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex);
552+ EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex );
553+ bool SendMessages (CNode* pto) override
554+ EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex );
555555
556556 /* * Implement PeerManager */
557557 void StartScheduledTasks (CScheduler& scheduler) override ;
@@ -570,7 +570,7 @@ class PeerManagerImpl final : public PeerManager
570570 void Misbehaving (const NodeId pnode, const int howmuch, const std::string& message = " " ) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
571571 void ProcessMessage (CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
572572 const std::chrono::microseconds time_received, const std::atomic<bool >& interruptMsgProc) override
573- EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex);
573+ EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex );
574574 void UpdateLastBlockAnnounceTime (NodeId node, int64_t time_in_seconds) override ;
575575 bool IsBanned (NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex);
576576 void EraseObjectRequest (NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
@@ -584,7 +584,7 @@ class PeerManagerImpl final : public PeerManager
584584 void ProcessPeerMsgRet (const PeerMsgRet& ret, CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
585585
586586 /* * Consider evicting an outbound peer based on the amount of time they've been behind our tip */
587- void ConsiderEviction (CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
587+ void ConsiderEviction (CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex );
588588
589589 /* * If we have extra outbound peers, try to disconnect the one with the oldest block announcement */
590590 void EvictExtraOutboundPeers (std::chrono::seconds now) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
@@ -638,20 +638,21 @@ class PeerManagerImpl final : public PeerManager
638638 void ProcessHeadersMessage (CNode& pfrom, Peer& peer,
639639 const std::vector<CBlockHeader>& headers,
640640 bool via_compact_block)
641- EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
641+ EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex );
642642 /* * Various helpers for headers processing, invoked by ProcessHeadersMessage() */
643643 /* * Deal with state tracking and headers sync for peers that send the
644644 * occasional non-connecting header (this can happen due to BIP 130 headers
645645 * announcements for blocks interacting with the 2hr (MAX_FUTURE_BLOCK_TIME) rule). */
646646 void HandleFewUnconnectingHeaders (CNode& pfrom, Peer& peer, const std::vector<CBlockHeader>& headers)
647- EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
647+ EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex );
648648 /* * Return true if the headers connect to each other, false otherwise */
649649 bool CheckHeadersAreContinuous (const std::vector<CBlockHeader>& headers) const ;
650650 /* * Request further headers from this peer with a given locator.
651651 * We don't issue a getheaders message if we have a recent one outstanding.
652652 * This returns true if a getheaders is actually sent, and false otherwise.
653653 */
654- bool MaybeSendGetHeaders (CNode& pfrom, const std::string& msg_type, const CBlockLocator& locator, Peer& peer);
654+ bool MaybeSendGetHeaders (CNode& pfrom, const std::string& msg_type, const CBlockLocator& locator, Peer& peer)
655+ EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
655656 /* * Potentially fetch blocks from this peer upon receipt of a new headers tip */
656657 void HeadersDirectFetchBlocks (CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast);
657658 /* * Update peer state based on received headers message */
@@ -670,7 +671,8 @@ class PeerManagerImpl final : public PeerManager
670671 void MaybeSendPing (CNode& node_to, Peer& peer, std::chrono::microseconds now);
671672
672673 /* * Send `addr` messages on a regular schedule. */
673- void MaybeSendAddr (CNode& node, Peer& peer, std::chrono::microseconds current_time);
674+ void MaybeSendAddr (CNode& node, Peer& peer, std::chrono::microseconds current_time)
675+ EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
674676
675677 /* * Relay (gossip) an address to a few randomly chosen nodes.
676678 *
@@ -679,7 +681,8 @@ class PeerManagerImpl final : public PeerManager
679681 * @param[in] fReachable Whether the address' network is reachable. We relay unreachable
680682 * addresses less.
681683 */
682- void RelayAddress (NodeId originator, const CAddress& addr, bool fReachable ) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
684+ void RelayAddress (NodeId originator, const CAddress& addr, bool fReachable )
685+ EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
683686
684687 const CChainParams& m_chainparams;
685688 CConnman& m_connman;
@@ -807,13 +810,16 @@ class PeerManagerImpl final : public PeerManager
807810 * @return True if address relay is enabled with peer
808811 * False if address relay is disallowed
809812 */
810- bool SetupAddressRelay (const CNode& node, Peer& peer);
813+ bool SetupAddressRelay (const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
814+
815+ void AddAddressKnown (Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
816+ void PushAddress (Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
811817
812818 /* * Number of nodes with fSyncStarted. */
813819 int nSyncStarted GUARDED_BY (cs_main) = 0;
814820
815821 /* * Hash of the last block we received via INV */
816- uint256 m_last_block_inv_triggering_headers_sync{};
822+ uint256 m_last_block_inv_triggering_headers_sync GUARDED_BY (g_msgproc_mutex) {};
817823
818824 /* *
819825 * Sources of received blocks, saved to be able punish them when processing
@@ -889,7 +895,7 @@ class PeerManagerImpl final : public PeerManager
889895 uint256 m_most_recent_block_hash GUARDED_BY (m_most_recent_block_mutex);
890896
891897 /* * Height of the highest block announced using BIP 152 high-bandwidth mode. */
892- int m_highest_fast_announce{0 };
898+ int m_highest_fast_announce GUARDED_BY (::cs_main) {0 };
893899
894900 /* Returns a bool indicating whether we requested this block.
895901 * Also used if a block was /not/ received and timed out or started with another peer
@@ -981,13 +987,13 @@ static bool IsAddrCompatible(const Peer& peer, const CAddress& addr)
981987 return peer.m_wants_addrv2 || addr.IsAddrV1Compatible ();
982988}
983989
984- static void AddAddressKnown (Peer& peer, const CAddress& addr)
990+ void PeerManagerImpl:: AddAddressKnown (Peer& peer, const CAddress& addr)
985991{
986992 assert (peer.m_addr_known );
987993 peer.m_addr_known ->insert (addr.GetKey ());
988994}
989995
990- static void PushAddress (Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand)
996+ void PeerManagerImpl:: PushAddress (Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand)
991997{
992998 // Known checking here is only to save space from duplicates.
993999 // Before sending, we'll filter it again for known addresses that were
@@ -3250,6 +3256,8 @@ void PeerManagerImpl::ProcessMessage(
32503256 const std::chrono::microseconds time_received,
32513257 const std::atomic<bool >& interruptMsgProc)
32523258{
3259+ AssertLockHeld (g_msgproc_mutex);
3260+
32533261 LogPrint (BCLog::NET, " received: %s (%u bytes) peer=%d\n " , SanitizeString (msg_type), vRecv.size (), pfrom.GetId ());
32543262 statsClient.inc (" message.received." + SanitizeString (msg_type), 1 .0f );
32553263
@@ -4971,6 +4979,8 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer)
49714979
49724980bool PeerManagerImpl::ProcessMessages (CNode* pfrom, std::atomic<bool >& interruptMsgProc)
49734981{
4982+ AssertLockHeld (g_msgproc_mutex);
4983+
49744984 bool fMoreWork = false ;
49754985
49764986 PeerRef peer = GetPeerRef (pfrom->GetId ());
@@ -5313,7 +5323,7 @@ void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::micros
53135323
53145324 // Remove addr records that the peer already knows about, and add new
53155325 // addrs to the m_addr_known filter on the same pass.
5316- auto addr_already_known = [&peer](const CAddress& addr) {
5326+ auto addr_already_known = [&peer](const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED (g_msgproc_mutex) {
53175327 bool ret = peer.m_addr_known ->contains (addr.GetKey ());
53185328 if (!ret) peer.m_addr_known ->insert (addr.GetKey ());
53195329 return ret;
@@ -5379,6 +5389,8 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer)
53795389
53805390bool PeerManagerImpl::SendMessages (CNode* pto)
53815391{
5392+ AssertLockHeld (g_msgproc_mutex);
5393+
53825394 assert (m_llmq_ctx);
53835395
53845396 const bool is_masternode = m_mn_activeman != nullptr ;
0 commit comments