Skip to content

Commit 8ddbb12

Browse files
committed
Move net_processing's ProcessNewBlock calls to resolve async.
Essentially, our goal is to not process anything for the given peer until the block finishes processing (emulating the previous behavior) without actually blocking the ProcessMessages loops. Obviously, in most cases, we'll just go on to the next peer and immediately hit a cs_main lock, blocking us anyway, but this we can slowly improve that state over time by moving things from CNodeState to CPeerState.
1 parent e898229 commit 8ddbb12

File tree

1 file changed

+64
-30
lines changed

1 file changed

+64
-30
lines changed

src/net_processing.cpp

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,13 @@ struct CBlockReject {
206206
* move most (non-validation-specific) state here.
207207
*/
208208
struct CPeerState {
209+
//! If this peer generated some headers for us to add, we store the resulting
210+
//! future here and wait for it to complete before we process more data from this
211+
//! peer.
212+
std::future<bool> pending_block_processing;
213+
//! The hash of the block which is pending download.
214+
uint256 pending_block_hash;
215+
209216
CPeerState() {}
210217
};
211218

@@ -2717,8 +2724,15 @@ bool static ProcessMessage(CNode* pfrom, CPeerState* peerstate, const std::strin
27172724
std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> >::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash());
27182725
bool fAlreadyInFlight = blockInFlightIt != mapBlocksInFlight.end();
27192726

2720-
if (pindex->nStatus & BLOCK_HAVE_DATA) // Nothing to do here
2727+
if (pindex->nStatus & BLOCK_HAVE_DATA) { // Nothing to do here
2728+
if (fAlreadyInFlight) {
2729+
// There is a possibility the block was opportunistically reconstructed
2730+
// from another peer while we were waiting for the block to come in here.
2731+
// Thus, we need to wipe its in-flight state.
2732+
MarkBlockAsReceived(pindex->GetBlockHash());
2733+
}
27212734
return true;
2735+
}
27222736

27232737
if (pindex->nChainWork <= ::ChainActive().Tip()->nChainWork || // We know something better
27242738
pindex->nTx != 0) { // We had this block at some point, but pruned it
@@ -2848,21 +2862,8 @@ bool static ProcessMessage(CNode* pfrom, CPeerState* peerstate, const std::strin
28482862
// we have a chain with at least nMinimumChainWork), and we ignore
28492863
// compact blocks with less work than our tip, it is safe to treat
28502864
// reconstructed compact blocks as having been requested.
2851-
bool fNewBlock = ProcessNewBlock(chainparams, pblock, /*fForceProcessing=*/true).get();
2852-
if (fNewBlock) {
2853-
pfrom->nLastBlockTime = GetTime();
2854-
} else {
2855-
LOCK(cs_main);
2856-
mapBlockSource.erase(pblock->GetHash());
2857-
}
2858-
LOCK(cs_main); // hold cs_main for CBlockIndex::IsValid()
2859-
if (pindex->IsValid(BLOCK_VALID_TRANSACTIONS)) {
2860-
// Clear download state for this block, which is in
2861-
// process from some other peer. We do this after calling
2862-
// ProcessNewBlock so that a malleated cmpctblock announcement
2863-
// can't be used to interfere with block relay.
2864-
MarkBlockAsReceived(pblock->GetHash());
2865-
}
2865+
peerstate->pending_block_hash = pblock->GetHash();
2866+
peerstate->pending_block_processing = ProcessNewBlock(chainparams, pblock, /*fForceProcessing=*/true);
28662867
}
28672868
return true;
28682869
}
@@ -2890,6 +2891,16 @@ bool static ProcessMessage(CNode* pfrom, CPeerState* peerstate, const std::strin
28902891
return true;
28912892
}
28922893

2894+
const CBlockIndex* pindex = LookupBlockIndex(resp.blockhash);
2895+
if (pindex && pindex->nStatus & BLOCK_HAVE_DATA) { // Nothing to do here
2896+
// There is a possibility the block was opportunistically reconstructed
2897+
// from another peer while we were waiting for the block to come in here.
2898+
// Thus, we may want to just skip processing (and potentially downloading
2899+
// the full block in case of a shortid collision).
2900+
MarkBlockAsReceived(pindex->GetBlockHash());
2901+
return true;
2902+
}
2903+
28932904
PartiallyDownloadedBlock& partialBlock = *it->second.second->partialBlock;
28942905
ReadStatus status = partialBlock.FillBlock(*pblock, resp.txn);
28952906
if (status == READ_STATUS_INVALID) {
@@ -2936,13 +2947,8 @@ bool static ProcessMessage(CNode* pfrom, CPeerState* peerstate, const std::strin
29362947
// disk-space attacks), but this should be safe due to the
29372948
// protections in the compact block handler -- see related comment
29382949
// in compact block optimistic reconstruction handling.
2939-
bool fNewBlock = ProcessNewBlock(chainparams, pblock, /*fForceProcessing=*/true).get();
2940-
if (fNewBlock) {
2941-
pfrom->nLastBlockTime = GetTime();
2942-
} else {
2943-
LOCK(cs_main);
2944-
mapBlockSource.erase(pblock->GetHash());
2945-
}
2950+
peerstate->pending_block_hash = pblock->GetHash();
2951+
peerstate->pending_block_processing = ProcessNewBlock(chainparams, pblock, /*fForceProcessing=*/true);
29462952
}
29472953
return true;
29482954
}
@@ -2997,13 +3003,8 @@ bool static ProcessMessage(CNode* pfrom, CPeerState* peerstate, const std::strin
29973003
// so the race between here and cs_main in ProcessNewBlock is fine.
29983004
mapBlockSource.emplace(hash, std::make_pair(pfrom->GetId(), true));
29993005
}
3000-
bool fNewBlock = ProcessNewBlock(chainparams, pblock, forceProcessing).get();
3001-
if (fNewBlock) {
3002-
pfrom->nLastBlockTime = GetTime();
3003-
} else {
3004-
LOCK(cs_main);
3005-
mapBlockSource.erase(pblock->GetHash());
3006-
}
3006+
peerstate->pending_block_hash = hash;
3007+
peerstate->pending_block_processing = ProcessNewBlock(chainparams, pblock, forceProcessing);
30073008
return true;
30083009
}
30093010

@@ -3264,6 +3265,24 @@ bool PeerLogicValidation::SendRejectsAndCheckIfBanned(CNode* pnode, bool enable_
32643265
return false;
32653266
}
32663267

3268+
bool IsPendingBlockValidation(CNode* pfrom, CPeerState* peerstate) EXCLUSIVE_LOCKS_REQUIRED(cs_peerstate)
3269+
{
3270+
if (peerstate->pending_block_processing.valid()) {
3271+
if (peerstate->pending_block_processing.wait_for(std::chrono::duration<int>::zero()) == std::future_status::ready) {
3272+
bool fNewBlock = peerstate->pending_block_processing.get();
3273+
if (fNewBlock) {
3274+
pfrom->nLastBlockTime = GetTime();
3275+
} else {
3276+
LOCK(cs_main);
3277+
mapBlockSource.erase(peerstate->pending_block_hash);
3278+
}
3279+
peerstate->pending_block_processing = std::future<bool>();
3280+
peerstate->pending_block_hash = uint256();
3281+
return false;
3282+
} else { return true; }
3283+
} else { return false; }
3284+
}
3285+
32673286
bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc)
32683287
{
32693288
const CChainParams& chainparams = Params();
@@ -3291,6 +3310,17 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
32913310
}
32923311
}
32933312

3313+
if (IsPendingBlockValidation(pfrom, peerstate)) {
3314+
return false;
3315+
}
3316+
{
3317+
// Somewhat annoyingly, tests currently rely on any pending bans/disconnects
3318+
// being processed prior to any pong responses, thus if we were waiting on a
3319+
// block validation to complete, we need to recheck bans.
3320+
LOCK(cs_main);
3321+
SendRejectsAndCheckIfBanned(pfrom, m_enable_bip61);
3322+
}
3323+
32943324
if (pfrom->fDisconnect)
32953325
return false;
32963326

@@ -3563,6 +3593,10 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
35633593
LOCK(cs_peerstate);
35643594
CPeerState* peerstate = PeerState(pto->GetId());
35653595

3596+
if (IsPendingBlockValidation(pto, peerstate)) {
3597+
return true;
3598+
}
3599+
35663600
//
35673601
// Message: ping
35683602
//

0 commit comments

Comments
 (0)