Skip to content

Commit eaee1a8

Browse files
committed
refactor: disentangle watch-only and masternode mode thread triggers
1 parent e6d8e69 commit eaee1a8

File tree

3 files changed

+100
-50
lines changed

3 files changed

+100
-50
lines changed

src/active/quorums.cpp

Lines changed: 85 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -47,52 +47,14 @@ QuorumParticipant::~QuorumParticipant() = default;
4747

4848
void QuorumParticipant::TriggerQuorumDataRecoveryThreads(CConnman& connman, gsl::not_null<const CBlockIndex*> pIndex) const
4949
{
50-
if ((m_mn_activeman == nullptr && !m_quorums_watch) || !m_quorums_recovery) {
50+
if (!m_quorums_recovery) {
5151
return;
5252
}
5353

54-
LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- Process block %s\n", __func__, pIndex->GetBlockHash().ToString());
55-
56-
for (const auto& params : Params().GetConsensus().llmqs) {
57-
auto vecQuorums = m_qman.ScanQuorums(params.type, pIndex, params.keepOldConnections);
58-
59-
// First check if we are member of any quorum of this type
60-
const uint256 proTxHash = m_mn_activeman != nullptr ? m_mn_activeman->GetProTxHash() : uint256();
61-
62-
bool fWeAreQuorumTypeMember = ranges::any_of(vecQuorums, [&proTxHash](const auto& pQuorum) {
63-
return pQuorum->IsValidMember(proTxHash);
64-
});
65-
66-
for (auto& pQuorum : vecQuorums) {
67-
// If there is already a thread running for this specific quorum skip it
68-
if (pQuorum->fQuorumDataRecoveryThreadRunning) {
69-
continue;
70-
}
71-
72-
uint16_t nDataMask{0};
73-
const bool fWeAreQuorumMember = pQuorum->IsValidMember(proTxHash);
74-
const bool fSyncForTypeEnabled = m_sync_map.count(pQuorum->qc->llmqType) > 0;
75-
const QvvecSyncMode syncMode = fSyncForTypeEnabled ? m_sync_map.at(pQuorum->qc->llmqType)
76-
: QvvecSyncMode::Invalid;
77-
const bool fSyncCurrent = syncMode == QvvecSyncMode::Always || (syncMode == QvvecSyncMode::OnlyIfTypeMember && fWeAreQuorumTypeMember);
78-
79-
if ((fWeAreQuorumMember || (fSyncForTypeEnabled && fSyncCurrent)) && !pQuorum->HasVerificationVector()) {
80-
nDataMask |= llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
81-
}
82-
83-
if (fWeAreQuorumMember && !pQuorum->GetSkShare().IsValid()) {
84-
nDataMask |= llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
85-
}
86-
87-
if (nDataMask == 0) {
88-
LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- No data needed from (%d, %s) at height %d\n",
89-
__func__, ToUnderlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), pIndex->nHeight);
90-
continue;
91-
}
92-
93-
// Finally start the thread which triggers the requests for this quorum
94-
StartDataRecoveryThread(connman, std::move(pQuorum), pIndex, nDataMask);
95-
}
54+
if (m_mn_activeman) {
55+
TriggerDataRecoveryThreads(connman, pIndex);
56+
} else if (m_quorums_watch) {
57+
TriggerVvecSyncThreads(connman, pIndex);
9658
}
9759
}
9860

@@ -397,8 +359,8 @@ void QuorumParticipant::DataRecoveryThread(CConnman& connman, gsl::not_null<cons
397359
printLog("Done");
398360
}
399361

400-
void QuorumParticipant::StartDataRecoveryThread(CConnman& connman, CQuorumCPtr pQuorum,
401-
gsl::not_null<const CBlockIndex*> pIndex, uint16_t nDataMaskIn) const
362+
void QuorumParticipant::StartDataRecoveryThread(CConnman& connman, gsl::not_null<const CBlockIndex*> pIndex,
363+
CQuorumCPtr pQuorum, uint16_t nDataMaskIn) const
402364
{
403365
assert(m_mn_activeman);
404366

@@ -414,6 +376,84 @@ void QuorumParticipant::StartDataRecoveryThread(CConnman& connman, CQuorumCPtr p
414376
});
415377
}
416378

379+
void QuorumParticipant::TriggerDataRecoveryThreads(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index) const
380+
{
381+
assert(m_mn_activeman);
382+
383+
LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- Process block %s\n", __func__, block_index->GetBlockHash().ToString());
384+
385+
const uint256 proTxHash = m_mn_activeman->GetProTxHash();
386+
387+
for (const auto& params : Params().GetConsensus().llmqs) {
388+
auto vecQuorums = m_qman.ScanQuorums(params.type, block_index, params.keepOldConnections);
389+
const bool fWeAreQuorumTypeMember = ranges::any_of(vecQuorums, [&proTxHash](const auto& pQuorum) { return pQuorum->IsValidMember(proTxHash); });
390+
391+
for (auto& pQuorum : vecQuorums) {
392+
if (pQuorum->IsValidMember(proTxHash)) {
393+
uint16_t nDataMask{0};
394+
if (!pQuorum->HasVerificationVector()) {
395+
nDataMask |= CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
396+
}
397+
if (!pQuorum->GetSkShare().IsValid()) {
398+
nDataMask |= CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
399+
}
400+
if (nDataMask != 0) {
401+
StartDataRecoveryThread(connman, block_index, std::move(pQuorum), nDataMask);
402+
} else {
403+
LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- No data needed from (%d, %s) at height %d\n", __func__,
404+
ToUnderlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), block_index->nHeight);
405+
}
406+
} else {
407+
TryStartVvecSyncThread(connman, block_index, std::move(pQuorum), fWeAreQuorumTypeMember);
408+
}
409+
}
410+
}
411+
}
412+
413+
void QuorumParticipant::StartVvecSyncThread(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index,
414+
CQuorumCPtr pQuorum) const
415+
{
416+
bool expected = false;
417+
if (!pQuorum->fQuorumDataRecoveryThreadRunning.compare_exchange_strong(expected, true)) {
418+
LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- Already running\n", __func__);
419+
return;
420+
}
421+
422+
m_qman.workerPool.push([&connman, pQuorum = std::move(pQuorum), block_index, this](int threadId) mutable {
423+
DataRecoveryThread(connman, block_index, std::move(pQuorum), CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR,
424+
/*protx_hash=*/uint256(), /*start_offset=*/0);
425+
});
426+
}
427+
428+
void QuorumParticipant::TriggerVvecSyncThreads(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index) const
429+
{
430+
LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- Process block %s\n", __func__, block_index->GetBlockHash().ToString());
431+
for (const auto& params : Params().GetConsensus().llmqs) {
432+
auto vecQuorums = m_qman.ScanQuorums(params.type, block_index, params.keepOldConnections);
433+
for (auto& pQuorum : vecQuorums) {
434+
TryStartVvecSyncThread(connman, block_index, std::move(pQuorum), /*fWeAreQuorumTypeMember=*/false);
435+
}
436+
}
437+
}
438+
439+
void QuorumParticipant::TryStartVvecSyncThread(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index,
440+
CQuorumCPtr pQuorum, bool fWeAreQuorumTypeMember) const
441+
{
442+
if (pQuorum->fQuorumDataRecoveryThreadRunning) return;
443+
444+
const bool fSyncForTypeEnabled = m_sync_map.count(pQuorum->qc->llmqType) > 0;
445+
const QvvecSyncMode syncMode = fSyncForTypeEnabled ? m_sync_map.at(pQuorum->qc->llmqType) : QvvecSyncMode::Invalid;
446+
const bool fSyncCurrent = syncMode == QvvecSyncMode::Always ||
447+
(syncMode == QvvecSyncMode::OnlyIfTypeMember && fWeAreQuorumTypeMember);
448+
449+
if ((fSyncForTypeEnabled && fSyncCurrent) && !pQuorum->HasVerificationVector()) {
450+
StartVvecSyncThread(connman, block_index, std::move(pQuorum));
451+
} else {
452+
LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- No data needed from (%d, %s) at height %d\n", __func__,
453+
ToUnderlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), block_index->nHeight);
454+
}
455+
}
456+
417457
void QuorumParticipant::StartCleanupOldQuorumDataThread(gsl::not_null<const CBlockIndex*> pIndex) const
418458
{
419459
// Note: this function is CPU heavy and we don't want it to be running during DKGs.

src/active/quorums.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class CQuorum;
3232
class CQuorumDataRequest;
3333
class CQuorumManager;
3434
class CQuorumSnapshotManager;
35+
enum class QvvecSyncMode : int8_t;
3536

3637
class QuorumParticipant
3738
{
@@ -82,12 +83,21 @@ class QuorumParticipant
8283

8384
void CheckQuorumConnections(CConnman& connman, const Consensus::LLMQParams& llmqParams,
8485
gsl::not_null<const CBlockIndex*> pindexNew) const;
85-
void DataRecoveryThread(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr quorum,
86-
uint16_t data_mask, const uint256& protx_hash, size_t start_offset) const;
8786
void StartCleanupOldQuorumDataThread(gsl::not_null<const CBlockIndex*> pIndex) const;
88-
void StartDataRecoveryThread(CConnman& connman, CQuorumCPtr pQuorum, gsl::not_null<const CBlockIndex*> pIndex,
89-
uint16_t nDataMask) const;
9087
void TriggerQuorumDataRecoveryThreads(CConnman& connman, gsl::not_null<const CBlockIndex*> pIndex) const;
88+
89+
//! Data recovery
90+
void DataRecoveryThread(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr quorum,
91+
uint16_t data_mask, const uint256& protx_hash, size_t start_offset) const;
92+
93+
void StartDataRecoveryThread(CConnman& connman, gsl::not_null<const CBlockIndex*> pIndex, CQuorumCPtr pQuorum,
94+
uint16_t nDataMaskIn) const;
95+
void TriggerDataRecoveryThreads(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index) const;
96+
97+
void StartVvecSyncThread(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum) const;
98+
void TriggerVvecSyncThreads(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index) const;
99+
void TryStartVvecSyncThread(CConnman& connman, gsl::not_null<const CBlockIndex*> block_index, CQuorumCPtr pQuorum,
100+
bool fWeAreQuorumTypeMember) const;
91101
};
92102
} // namespace llmq
93103

src/llmq/options.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ enum class LLMQType : uint8_t;
2121
} // namespace Consensus
2222

2323
namespace llmq {
24-
enum class QvvecSyncMode {
24+
enum class QvvecSyncMode : int8_t {
2525
Invalid = -1,
2626
Always = 0,
2727
OnlyIfTypeMember = 1,

0 commit comments

Comments
 (0)