@@ -239,6 +239,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
239239 for (const auto & sigShare : receivedSigShares) {
240240 ProcessMessageSigShare (pfrom.GetId (), peerman, sigShare);
241241 }
242+ NotifyWorker ();
242243 }
243244
244245 if (msg_type == NetMsgType::QSIGSESANN) {
@@ -254,6 +255,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
254255 BanNode (pfrom.GetId (), peerman);
255256 return ;
256257 }
258+ NotifyWorker ();
257259 } else if (msg_type == NetMsgType::QSIGSHARESINV) {
258260 std::vector<CSigSharesInv> msgs;
259261 vRecv >> msgs;
@@ -267,6 +269,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
267269 BanNode (pfrom.GetId (), peerman);
268270 return ;
269271 }
272+ NotifyWorker ();
270273 } else if (msg_type == NetMsgType::QGETSIGSHARES) {
271274 std::vector<CSigSharesInv> msgs;
272275 vRecv >> msgs;
@@ -280,6 +283,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
280283 BanNode (pfrom.GetId (), peerman);
281284 return ;
282285 }
286+ NotifyWorker ();
283287 } else if (msg_type == NetMsgType::QBSIGSHARES) {
284288 std::vector<CBatchedSigShares> msgs;
285289 vRecv >> msgs;
@@ -297,10 +301,8 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
297301 BanNode (pfrom.GetId (), peerman);
298302 return ;
299303 }
304+ NotifyWorker ();
300305 }
301-
302- // New inbound messages can create work (requests, shares, announcements)
303- NotifyWorker ();
304306}
305307
306308bool CSigSharesManager::ProcessMessageSigSesAnn (const CNode& pfrom, const CSigSesAnn& ann)
@@ -732,7 +734,6 @@ void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& s
732734 return ;
733735 }
734736
735- bool queued_announce = false ;
736737 {
737738 LOCK (cs);
738739
@@ -741,7 +742,6 @@ void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& s
741742 }
742743 if (!IsAllMembersConnectedEnabled (llmqType, m_sporkman)) {
743744 sigSharesQueuedToAnnounce.Add (sigShare.GetKey (), true );
744- queued_announce = true ;
745745 }
746746
747747 // Update the time we've seen the last sigShare
@@ -763,9 +763,9 @@ void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& s
763763 }
764764 }
765765
766- if (queued_announce) {
767- NotifyWorker ();
768- }
766+ // Note: Don't call NotifyWorker() here even when queued_announce is true
767+ // When called from worker thread: SendMessages() will handle announcements in same iteration
768+ // When called from external thread: ProcessMessage() already calls NotifyWorker() (line 303)
769769
770770 if (canTryRecovery) {
771771 TryRecoverSig (peerman, quorum, sigShare.getId (), sigShare.getMsgHash ());
@@ -1029,7 +1029,7 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
10291029 proTxToNode.try_emplace (verifiedProRegTxHash, pnode);
10301030 }
10311031
1032- auto curTime = GetTime< std::chrono::milliseconds>(). count ();
1032+ auto curTime = std::chrono::steady_clock::now ();
10331033
10341034 for (auto & [_, signedSession] : signedSessions) {
10351035 if (!IsAllMembersConnectedEnabled (signedSession.quorum ->params .type , m_sporkman)) {
@@ -1043,7 +1043,7 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
10431043 if (curTime >= signedSession.nextAttemptTime ) {
10441044 int64_t waitTime = exp2 (signedSession.attempt ) * EXP_SEND_FOR_RECOVERY_TIMEOUT;
10451045 waitTime = std::min (MAX_SEND_FOR_RECOVERY_TIMEOUT, waitTime);
1046- signedSession.nextAttemptTime = curTime + waitTime;
1046+ signedSession.nextAttemptTime = curTime + std::chrono::milliseconds ( waitTime) ;
10471047 auto dmn = SelectMemberForRecovery (signedSession.quorum , signedSession.sigShare .getId (), signedSession.attempt );
10481048 signedSession.attempt ++;
10491049
@@ -1430,6 +1430,7 @@ void CSigSharesManager::Cleanup(const CConnman& connman)
14301430 }
14311431
14321432 lastCleanupTime = GetTime<std::chrono::seconds>().count ();
1433+ lastCleanupTimeSteady = std::chrono::steady_clock::now ();
14331434}
14341435
14351436void CSigSharesManager::RemoveSigSharesForSession (const uint256& signHash)
@@ -1519,24 +1520,19 @@ void CSigSharesManager::WorkThreadMain(CConnman& connman, PeerManager& peerman)
15191520 auto next_deadline = std::chrono::steady_clock::time_point::max ();
15201521 // Respect cleanup cadence (~5s) even when idle
15211522 {
1522- auto now_tp = std::chrono::steady_clock::now ();
1523- int64_t now_s = GetTime<std::chrono::seconds>().count ();
1524- int64_t target_s = lastCleanupTime + 5 ;
1525- if (target_s > now_s) {
1526- auto delta_ms = (target_s - now_s) * 1000 ;
1527- auto cand = now_tp + std::chrono::milliseconds (delta_ms);
1528- if (cand < next_deadline) next_deadline = cand;
1523+ auto now_steady = std::chrono::steady_clock::now ();
1524+ auto next_cleanup = lastCleanupTimeSteady + std::chrono::seconds (5 );
1525+ if (next_cleanup > now_steady) {
1526+ if (next_cleanup < next_deadline) next_deadline = next_cleanup;
15291527 }
15301528 }
15311529 {
15321530 // Consider next recovery attempt times for signed sessions to avoid polling
15331531 LOCK (cs);
1534- int64_t cur_ms = TicksSinceEpoch< std::chrono::milliseconds>( SystemClock ::now () );
1532+ auto now_steady = std::chrono::steady_clock ::now ();
15351533 for (const auto & [_, s] : signedSessions) {
1536- if (s.nextAttemptTime > cur_ms) {
1537- auto d = s.nextAttemptTime - cur_ms;
1538- auto cand = std::chrono::steady_clock::now () + std::chrono::milliseconds (d);
1539- if (cand < next_deadline) next_deadline = cand;
1534+ if (s.nextAttemptTime > now_steady) {
1535+ if (s.nextAttemptTime < next_deadline) next_deadline = s.nextAttemptTime ;
15401536 }
15411537 }
15421538 }
@@ -1581,13 +1577,13 @@ void CSigSharesManager::SignPendingSigShares(const CConnman& connman, PeerManage
15811577 auto & session = signedSessions[sigShare.GetSignHash ()];
15821578 session.sigShare = sigShare;
15831579 session.quorum = pQuorum;
1584- session.nextAttemptTime = 0 ;
1580+ session.nextAttemptTime = std::chrono::steady_clock::time_point{} ;
15851581 session.attempt = 0 ;
15861582 }
15871583 }
15881584 }
1589- // New sig shares or recovery attempts may be available
1590- NotifyWorker ();
1585+ // Note: Don't call NotifyWorker() here as this function is called by the worker thread itself
1586+ // NotifyWorker() is only needed when external threads add work
15911587}
15921588
15931589std::optional<CSigShare> CSigSharesManager::CreateSigShare (const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) const
0 commit comments