Skip to content

Commit 1c9f741

Browse files
committed
Call ProcessNewBlock() asynchronously in a separate thread from p2p layer
1 parent b471728 commit 1c9f741

File tree

9 files changed

+148
-54
lines changed

9 files changed

+148
-54
lines changed

src/init.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include <walletinitinterface.h>
4949
#include <stdint.h>
5050
#include <stdio.h>
51+
#include <validation_layer.h>
5152

5253
#ifndef WIN32
5354
#include <signal.h>
@@ -73,6 +74,7 @@ static const bool DEFAULT_STOPAFTERBLOCKIMPORT = false;
7374

7475
std::unique_ptr<CConnman> g_connman;
7576
std::unique_ptr<PeerLogicValidation> peerLogic;
77+
std::unique_ptr<ValidationLayer> g_validation_layer;
7678

7779
#if !(ENABLE_WALLET)
7880
class DummyWalletInit : public WalletInitInterface {
@@ -209,6 +211,7 @@ void Shutdown()
209211
// using the other before destroying them.
210212
if (peerLogic) UnregisterValidationInterface(peerLogic.get());
211213
if (g_connman) g_connman->Stop();
214+
if (g_validation_layer) g_validation_layer->Stop();
212215
peerLogic.reset();
213216
g_connman.reset();
214217
if (g_txindex) {
@@ -1306,7 +1309,10 @@ bool AppInitMain()
13061309
g_connman = std::unique_ptr<CConnman>(new CConnman(GetRand(std::numeric_limits<uint64_t>::max()), GetRand(std::numeric_limits<uint64_t>::max())));
13071310
CConnman& connman = *g_connman;
13081311

1309-
peerLogic.reset(new PeerLogicValidation(&connman, scheduler, gArgs.GetBoolArg("-enablebip61", DEFAULT_ENABLE_BIP61)));
1312+
g_validation_layer.reset(new ValidationLayer(chainparams));
1313+
g_validation_layer->Start();
1314+
1315+
peerLogic.reset(new PeerLogicValidation(&connman, *g_validation_layer, scheduler, gArgs.GetBoolArg("-enablebip61", DEFAULT_ENABLE_BIP61)));
13101316
RegisterValidationInterface(peerLogic.get());
13111317

13121318
// sanitize comments per BIP-0014, format user agent and check total size

src/net.cpp

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414
#include <consensus/consensus.h>
1515
#include <crypto/common.h>
1616
#include <crypto/sha256.h>
17-
#include <primitives/transaction.h>
1817
#include <netbase.h>
18+
#include <primitives/transaction.h>
1919
#include <scheduler.h>
2020
#include <ui_interface.h>
2121
#include <utilstrencodings.h>
22+
#include <validation_layer.h>
2223

2324
#ifdef WIN32
2425
#include <string.h>
@@ -2045,11 +2046,28 @@ void CConnman::ThreadMessageHandler()
20452046
if (pnode->fDisconnect)
20462047
continue;
20472048

2048-
// Receive messages
2049-
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
2050-
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
2049+
bool request_was_queued = pnode->IsAwaitingInternalRequest();
2050+
2051+
// If an internal request was queued and it's not done yet, skip this node
2052+
if (request_was_queued && !pnode->ProcessInternalRequestResults(m_msgproc))
2053+
continue;
2054+
2055+
// If no internal request was queued receive messages
2056+
if (!request_was_queued) {
2057+
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
2058+
request_was_queued = pnode->IsAwaitingInternalRequest();
2059+
2060+
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend && !request_was_queued);
2061+
} else {
2062+
request_was_queued = false;
2063+
}
2064+
20512065
if (flagInterruptMsgProc)
20522066
return;
2067+
2068+
if (request_was_queued)
2069+
continue;
2070+
20532071
// Send messages
20542072
{
20552073
LOCK(pnode->cs_sendProcessing);
@@ -2813,6 +2831,37 @@ void CNode::AskFor(const CInv& inv)
28132831
mapAskFor.insert(std::make_pair(nRequestTime, inv));
28142832
}
28152833

2834+
bool CNode::IsAwaitingInternalRequest()
2835+
{
2836+
return m_block_validation_response.valid();
2837+
}
2838+
2839+
bool CNode::ProcessInternalRequestResults(NetEventsInterface* peerlogic)
2840+
{
2841+
bool all_cleared = true;
2842+
2843+
if (m_block_validation_response.valid()) {
2844+
if (m_block_validation_response.wait_for(std::chrono::milliseconds::zero()) == std::future_status::ready) {
2845+
peerlogic->ProcessBlockValidationResponse(this, m_block_validating, m_block_validating_index, m_block_validation_response.get());
2846+
2847+
m_block_validating = nullptr;
2848+
m_block_validating_index = nullptr;
2849+
m_block_validation_response = std::future<BlockValidationResponse>();
2850+
} else {
2851+
all_cleared = false;
2852+
}
2853+
}
2854+
2855+
return all_cleared;
2856+
}
2857+
2858+
void CNode::SetPendingInternalRequest(const std::shared_ptr<const CBlock> block, std::future<BlockValidationResponse>&& pending_response, const CBlockIndex* pindex)
2859+
{
2860+
m_block_validating = block;
2861+
m_block_validating_index = pindex;
2862+
m_block_validation_response = std::move(pending_response);
2863+
}
2864+
28162865
bool CConnman::NodeFullyConnected(const CNode* pnode)
28172866
{
28182867
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;

src/net.h

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,23 @@
2323
#include <threadinterrupt.h>
2424

2525
#include <atomic>
26+
#include <chrono>
27+
#include <condition_variable>
2628
#include <deque>
29+
#include <future>
30+
#include <memory>
2731
#include <stdint.h>
2832
#include <thread>
29-
#include <memory>
30-
#include <condition_variable>
3133

3234
#ifndef WIN32
3335
#include <arpa/inet.h>
3436
#endif
3537

36-
38+
struct BlockValidationResponse;
3739
class CScheduler;
3840
class CNode;
41+
class CBlock;
42+
class CBlockIndex;
3943

4044
/** Time between pings automatically sent out for latency probing and keepalive (in seconds). */
4145
static const int PING_INTERVAL = 2 * 60;
@@ -478,6 +482,7 @@ class NetEventsInterface
478482
virtual bool SendMessages(CNode* pnode) = 0;
479483
virtual void InitializeNode(CNode* pnode) = 0;
480484
virtual void FinalizeNode(NodeId id, bool& update_connection_time) = 0;
485+
virtual void ProcessBlockValidationResponse(CNode* pfrom, const std::shared_ptr<const CBlock> pblock, const CBlockIndex* pindex, const BlockValidationResponse& validation_response) = 0;
481486

482487
protected:
483488
/**
@@ -755,6 +760,13 @@ class CNode
755760
// Our address, as reported by the peer
756761
CService addrLocal;
757762
mutable CCriticalSection cs_addrLocal;
763+
764+
// If an asynchronous request to validate a block received over the network is pending
765+
// these members hold details of that request
766+
std::future<BlockValidationResponse> m_block_validation_response;
767+
std::shared_ptr<const CBlock> m_block_validating;
768+
const CBlockIndex* m_block_validating_index;
769+
758770
public:
759771

760772
NodeId GetId() const {
@@ -865,6 +877,16 @@ class CNode
865877
std::string GetAddrName() const;
866878
//! Sets the addrName only if it was not previously set
867879
void MaybeSetAddrName(const std::string& addrNameIn);
880+
881+
//! Is an asynchronous internal request pending
882+
bool IsAwaitingInternalRequest();
883+
884+
//! If a result from an asynchronous internal request is ready, process the results
885+
bool ProcessInternalRequestResults(NetEventsInterface*);
886+
887+
//! Mark this node as waiting for an asynchronous internal request to complete
888+
//! before any further processing of this node may occurb
889+
void SetPendingInternalRequest(const std::shared_ptr<const CBlock> block, std::future<BlockValidationResponse>&& pending_response, const CBlockIndex* pindex = nullptr);
868890
};
869891

870892

src/net_processing.cpp

Lines changed: 41 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,9 @@
1111
#include <chainparams.h>
1212
#include <consensus/validation.h>
1313
#include <hash.h>
14-
#include <validation.h>
1514
#include <merkleblock.h>
16-
#include <netmessagemaker.h>
1715
#include <netbase.h>
16+
#include <netmessagemaker.h>
1817
#include <policy/fees.h>
1918
#include <policy/policy.h>
2019
#include <primitives/block.h>
@@ -28,6 +27,8 @@
2827
#include <util.h>
2928
#include <utilmoneystr.h>
3029
#include <utilstrencodings.h>
30+
#include <validation.h>
31+
#include <validation_layer.h>
3132

3233
#include <memory>
3334

@@ -845,9 +846,9 @@ static bool BlockRequestAllowed(const CBlockIndex* pindex, const Consensus::Para
845846
(GetBlockProofEquivalentTime(*pindexBestHeader, *pindex, *pindexBestHeader, consensusParams) < STALE_RELAY_AGE_LIMIT);
846847
}
847848

848-
PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &scheduler, bool enable_bip61)
849-
: connman(connmanIn), m_stale_tip_check_time(0), m_enable_bip61(enable_bip61) {
850-
849+
PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, ValidationLayer& validation_layer, CScheduler& scheduler, bool enable_bip61)
850+
: connman(connmanIn), m_validation_layer(validation_layer), m_stale_tip_check_time(0), m_enable_bip61(enable_bip61)
851+
{
851852
// Initialize global variables that cannot be constructed at startup.
852853
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));
853854

@@ -1263,6 +1264,12 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c
12631264
}
12641265
}
12651266

1267+
void SubmitBlock(CConnman* connman, ValidationLayer& validation_layer, CNode* pfrom, const std::shared_ptr<const CBlock> pblock, bool force_processing, const CBlockIndex* pindex = nullptr)
1268+
{
1269+
std::future<BlockValidationResponse> result = validation_layer.SubmitForValidation(pblock, force_processing, std::bind(&CConnman::WakeMessageHandler, connman));
1270+
pfrom->SetPendingInternalRequest(pblock, std::move(result), pindex);
1271+
}
1272+
12661273
void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
12671274
{
12681275
AssertLockNotHeld(cs_main);
@@ -1572,7 +1579,7 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve
15721579
return true;
15731580
}
15741581

1575-
bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc, bool enable_bip61)
1582+
bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, ValidationLayer& validation_layer, const std::atomic<bool>& interruptMsgProc, bool enable_bip61)
15761583
{
15771584
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId());
15781585
if (gArgs.IsArgSet("-dropmessagestest") && GetRand(gArgs.GetArg("-dropmessagestest", 0)) == 0)
@@ -2552,7 +2559,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
25522559
} // cs_main
25532560

25542561
if (fProcessBLOCKTXN)
2555-
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, interruptMsgProc, enable_bip61);
2562+
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, validation_layer, interruptMsgProc, enable_bip61);
25562563

25572564
if (fRevertToHeaderProcessing) {
25582565
// Headers received from HB compact block peers are permitted to be
@@ -2570,7 +2577,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
25702577
LOCK(cs_main);
25712578
mapBlockSource.emplace(pblock->GetHash(), std::make_pair(pfrom->GetId(), false));
25722579
}
2573-
bool fNewBlock = false;
2580+
25742581
// Setting fForceProcessing to true means that we bypass some of
25752582
// our anti-DoS protections in AcceptBlock, which filters
25762583
// unrequested blocks that might be trying to waste our resources
@@ -2580,21 +2587,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
25802587
// we have a chain with at least nMinimumChainWork), and we ignore
25812588
// compact blocks with less work than our tip, it is safe to treat
25822589
// reconstructed compact blocks as having been requested.
2583-
ProcessNewBlock(chainparams, pblock, /*fForceProcessing=*/true, &fNewBlock);
2584-
if (fNewBlock) {
2585-
pfrom->nLastBlockTime = GetTime();
2586-
} else {
2587-
LOCK(cs_main);
2588-
mapBlockSource.erase(pblock->GetHash());
2589-
}
2590-
LOCK(cs_main); // hold cs_main for CBlockIndex::IsValid()
2591-
if (pindex->IsValid(BLOCK_VALID_TRANSACTIONS)) {
2592-
// Clear download state for this block, which is in
2593-
// process from some other peer. We do this after calling
2594-
// ProcessNewBlock so that a malleated cmpctblock announcement
2595-
// can't be used to interfere with block relay.
2596-
MarkBlockAsReceived(pblock->GetHash());
2597-
}
2590+
SubmitBlock(connman, validation_layer, pfrom, pblock, /*fForceProcessing*/ true, pindex);
25982591
}
25992592

26002593
}
@@ -2645,7 +2638,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
26452638
// though the block was successfully read, and rely on the
26462639
// handling in ProcessNewBlock to ensure the block index is
26472640
// updated, reject messages go out, etc.
2648-
MarkBlockAsReceived(resp.blockhash); // it is now an empty pointer
26492641
fBlockRead = true;
26502642
// mapBlockSource is only used for sending reject messages and DoS scores,
26512643
// so the race between here and cs_main in ProcessNewBlock is fine.
@@ -2656,20 +2648,13 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
26562648
}
26572649
} // Don't hold cs_main when we call into ProcessNewBlock
26582650
if (fBlockRead) {
2659-
bool fNewBlock = false;
26602651
// Since we requested this block (it was in mapBlocksInFlight), force it to be processed,
26612652
// even if it would not be a candidate for new tip (missing previous block, chain not long enough, etc)
26622653
// This bypasses some anti-DoS logic in AcceptBlock (eg to prevent
26632654
// disk-space attacks), but this should be safe due to the
26642655
// protections in the compact block handler -- see related comment
26652656
// in compact block optimistic reconstruction handling.
2666-
ProcessNewBlock(chainparams, pblock, /*fForceProcessing=*/true, &fNewBlock);
2667-
if (fNewBlock) {
2668-
pfrom->nLastBlockTime = GetTime();
2669-
} else {
2670-
LOCK(cs_main);
2671-
mapBlockSource.erase(pblock->GetHash());
2672-
}
2657+
SubmitBlock(connman, validation_layer, pfrom, pblock, /*fForceProcessing*/ true);
26732658
}
26742659
}
26752660

@@ -2712,19 +2697,13 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
27122697
LOCK(cs_main);
27132698
// Also always process if we requested the block explicitly, as we may
27142699
// need it even though it is not a candidate for a new best tip.
2715-
forceProcessing |= MarkBlockAsReceived(hash);
2700+
forceProcessing = mapBlocksInFlight.count(hash);
2701+
27162702
// mapBlockSource is only used for sending reject messages and DoS scores,
27172703
// so the race between here and cs_main in ProcessNewBlock is fine.
27182704
mapBlockSource.emplace(hash, std::make_pair(pfrom->GetId(), true));
27192705
}
2720-
bool fNewBlock = false;
2721-
ProcessNewBlock(chainparams, pblock, forceProcessing, &fNewBlock);
2722-
if (fNewBlock) {
2723-
pfrom->nLastBlockTime = GetTime();
2724-
} else {
2725-
LOCK(cs_main);
2726-
mapBlockSource.erase(pblock->GetHash());
2727-
}
2706+
SubmitBlock(connman, validation_layer, pfrom, pblock, forceProcessing);
27282707
}
27292708

27302709

@@ -3045,7 +3024,7 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
30453024
bool fRet = false;
30463025
try
30473026
{
3048-
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc, m_enable_bip61);
3027+
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, m_validation_layer, interruptMsgProc, m_enable_bip61);
30493028
if (interruptMsgProc)
30503029
return false;
30513030
if (!pfrom->vRecvGetData.empty())
@@ -3092,6 +3071,26 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
30923071
return fMoreWork;
30933072
}
30943073

3074+
void PeerLogicValidation::ProcessBlockValidationResponse(CNode* pfrom, const std::shared_ptr<const CBlock> pblock, const CBlockIndex* pindex, const BlockValidationResponse& validation_response)
3075+
{
3076+
LOCK(cs_main);
3077+
3078+
// If we've reconstructed this block via compactblocks then
3079+
// Clear download state for this block, which is in
3080+
// process from some other peer. We do this after calling
3081+
// ProcessNewBlock so that a malleated cmpctblock announcement
3082+
// can't be used to interfere with block relay.
3083+
if (!pindex || pindex->IsValid(BLOCK_VALID_TRANSACTIONS)) {
3084+
MarkBlockAsReceived(pblock->GetHash());
3085+
}
3086+
3087+
if (validation_response.is_new) {
3088+
pfrom->nLastBlockTime = GetTime();
3089+
} else {
3090+
mapBlockSource.erase(pblock->GetHash());
3091+
}
3092+
}
3093+
30953094
void PeerLogicValidation::ConsiderEviction(CNode *pto, int64_t time_in_seconds)
30963095
{
30973096
AssertLockHeld(cs_main);

0 commit comments

Comments
 (0)