@@ -47,52 +47,14 @@ QuorumParticipant::~QuorumParticipant() = default;
4747
4848void QuorumParticipant::TriggerQuorumDataRecoveryThreads (CConnman& connman, gsl::not_null<const CBlockIndex*> pIndex) const
4949{
50- if ((m_mn_activeman == nullptr && !m_quorums_watch) || !m_quorums_recovery) {
50+ if (!m_quorums_recovery) {
5151 return ;
5252 }
5353
54- LogPrint (BCLog::LLMQ, " QuorumParticipant::%s -- Process block %s\n " , __func__, pIndex->GetBlockHash ().ToString ());
55-
56- for (const auto & params : Params ().GetConsensus ().llmqs ) {
57- auto vecQuorums = m_qman.ScanQuorums (params.type , pIndex, params.keepOldConnections );
58-
59- // First check if we are member of any quorum of this type
60- const uint256 proTxHash = m_mn_activeman != nullptr ? m_mn_activeman->GetProTxHash () : uint256 ();
61-
62- bool fWeAreQuorumTypeMember = ranges::any_of (vecQuorums, [&proTxHash](const auto & pQuorum) {
63- return pQuorum->IsValidMember (proTxHash);
64- });
65-
66- for (auto & pQuorum : vecQuorums) {
67- // If there is already a thread running for this specific quorum skip it
68- if (pQuorum->fQuorumDataRecoveryThreadRunning ) {
69- continue ;
70- }
71-
72- uint16_t nDataMask{0 };
73- const bool fWeAreQuorumMember = pQuorum->IsValidMember (proTxHash);
74- const bool fSyncForTypeEnabled = m_sync_map.count (pQuorum->qc ->llmqType ) > 0 ;
75- const QvvecSyncMode syncMode = fSyncForTypeEnabled ? m_sync_map.at (pQuorum->qc ->llmqType )
76- : QvvecSyncMode::Invalid;
77- const bool fSyncCurrent = syncMode == QvvecSyncMode::Always || (syncMode == QvvecSyncMode::OnlyIfTypeMember && fWeAreQuorumTypeMember );
78-
79- if ((fWeAreQuorumMember || (fSyncForTypeEnabled && fSyncCurrent )) && !pQuorum->HasVerificationVector ()) {
80- nDataMask |= llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
81- }
82-
83- if (fWeAreQuorumMember && !pQuorum->GetSkShare ().IsValid ()) {
84- nDataMask |= llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
85- }
86-
87- if (nDataMask == 0 ) {
88- LogPrint (BCLog::LLMQ, " QuorumParticipant::%s -- No data needed from (%d, %s) at height %d\n " ,
89- __func__, ToUnderlying (pQuorum->qc ->llmqType ), pQuorum->qc ->quorumHash .ToString (), pIndex->nHeight );
90- continue ;
91- }
92-
93- // Finally start the thread which triggers the requests for this quorum
94- StartDataRecoveryThread (connman, std::move (pQuorum), pIndex, nDataMask);
95- }
54+ if (m_mn_activeman) {
55+ TriggerDataRecoveryThreads (connman, pIndex);
56+ } else if (m_quorums_watch) {
57+ TriggerVvecSyncThreads (connman, pIndex);
9658 }
9759}
9860
@@ -397,8 +359,8 @@ void QuorumParticipant::DataRecoveryThread(CConnman& connman, gsl::not_null<cons
397359 printLog (" Done" );
398360}
399361
400- void QuorumParticipant::StartDataRecoveryThread (CConnman& connman, CQuorumCPtr pQuorum ,
401- gsl::not_null< const CBlockIndex*> pIndex , uint16_t nDataMaskIn) const
362+ void QuorumParticipant::StartDataRecoveryThread (CConnman& connman, gsl::not_null< const CBlockIndex*> pIndex ,
363+ CQuorumCPtr pQuorum , uint16_t nDataMaskIn) const
402364{
403365 assert (m_mn_activeman);
404366
@@ -414,6 +376,84 @@ void QuorumParticipant::StartDataRecoveryThread(CConnman& connman, CQuorumCPtr p
414376 });
415377}
416378
379+ void QuorumParticipant::TriggerDataRecoveryThreads (CConnman& connman, gsl::not_null<const CBlockIndex*> block_index) const
380+ {
381+ assert (m_mn_activeman);
382+
383+ LogPrint (BCLog::LLMQ, " QuorumParticipant::%s -- Process block %s\n " , __func__, block_index->GetBlockHash ().ToString ());
384+
385+ const uint256 proTxHash = m_mn_activeman->GetProTxHash ();
386+
387+ for (const auto & params : Params ().GetConsensus ().llmqs ) {
388+ auto vecQuorums = m_qman.ScanQuorums (params.type , block_index, params.keepOldConnections );
389+ const bool fWeAreQuorumTypeMember = ranges::any_of (vecQuorums, [&proTxHash](const auto & pQuorum) { return pQuorum->IsValidMember (proTxHash); });
390+
391+ for (auto & pQuorum : vecQuorums) {
392+ if (pQuorum->IsValidMember (proTxHash)) {
393+ uint16_t nDataMask{0 };
394+ if (!pQuorum->HasVerificationVector ()) {
395+ nDataMask |= CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
396+ }
397+ if (!pQuorum->GetSkShare ().IsValid ()) {
398+ nDataMask |= CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
399+ }
400+ if (nDataMask != 0 ) {
401+ StartDataRecoveryThread (connman, block_index, std::move (pQuorum), nDataMask);
402+ } else {
403+ LogPrint (BCLog::LLMQ, " QuorumParticipant::%s -- No data needed from (%d, %s) at height %d\n " , __func__,
404+ ToUnderlying (pQuorum->qc ->llmqType ), pQuorum->qc ->quorumHash .ToString (), block_index->nHeight );
405+ }
406+ } else {
407+ TryStartVvecSyncThread (connman, block_index, std::move (pQuorum), fWeAreQuorumTypeMember );
408+ }
409+ }
410+ }
411+ }
412+
413+ void QuorumParticipant::StartVvecSyncThread (CConnman& connman, gsl::not_null<const CBlockIndex*> block_index,
414+ CQuorumCPtr pQuorum) const
415+ {
416+ bool expected = false ;
417+ if (!pQuorum->fQuorumDataRecoveryThreadRunning .compare_exchange_strong (expected, true )) {
418+ LogPrint (BCLog::LLMQ, " QuorumParticipant::%s -- Already running\n " , __func__);
419+ return ;
420+ }
421+
422+ m_qman.workerPool .push ([&connman, pQuorum = std::move (pQuorum), block_index, this ](int threadId) mutable {
423+ DataRecoveryThread (connman, block_index, std::move (pQuorum), CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR,
424+ /* protx_hash=*/ uint256 (), /* start_offset=*/ 0 );
425+ });
426+ }
427+
428+ void QuorumParticipant::TriggerVvecSyncThreads (CConnman& connman, gsl::not_null<const CBlockIndex*> block_index) const
429+ {
430+ LogPrint (BCLog::LLMQ, " QuorumParticipant::%s -- Process block %s\n " , __func__, block_index->GetBlockHash ().ToString ());
431+ for (const auto & params : Params ().GetConsensus ().llmqs ) {
432+ auto vecQuorums = m_qman.ScanQuorums (params.type , block_index, params.keepOldConnections );
433+ for (auto & pQuorum : vecQuorums) {
434+ TryStartVvecSyncThread (connman, block_index, std::move (pQuorum), /* fWeAreQuorumTypeMember=*/ false );
435+ }
436+ }
437+ }
438+
439+ void QuorumParticipant::TryStartVvecSyncThread (CConnman& connman, gsl::not_null<const CBlockIndex*> block_index,
440+ CQuorumCPtr pQuorum, bool fWeAreQuorumTypeMember ) const
441+ {
442+ if (pQuorum->fQuorumDataRecoveryThreadRunning ) return ;
443+
444+ const bool fSyncForTypeEnabled = m_sync_map.count (pQuorum->qc ->llmqType ) > 0 ;
445+ const QvvecSyncMode syncMode = fSyncForTypeEnabled ? m_sync_map.at (pQuorum->qc ->llmqType ) : QvvecSyncMode::Invalid;
446+ const bool fSyncCurrent = syncMode == QvvecSyncMode::Always ||
447+ (syncMode == QvvecSyncMode::OnlyIfTypeMember && fWeAreQuorumTypeMember );
448+
449+ if ((fSyncForTypeEnabled && fSyncCurrent ) && !pQuorum->HasVerificationVector ()) {
450+ StartVvecSyncThread (connman, block_index, std::move (pQuorum));
451+ } else {
452+ LogPrint (BCLog::LLMQ, " QuorumParticipant::%s -- No data needed from (%d, %s) at height %d\n " , __func__,
453+ ToUnderlying (pQuorum->qc ->llmqType ), pQuorum->qc ->quorumHash .ToString (), block_index->nHeight );
454+ }
455+ }
456+
417457void QuorumParticipant::StartCleanupOldQuorumDataThread (gsl::not_null<const CBlockIndex*> pIndex) const
418458{
419459 // Note: this function is CPU heavy and we don't want it to be running during DKGs.
0 commit comments