Skip to content

Commit ae2dc7a

Browse files
fix: separate mocked time from steady_clock in worker threads
Fixes time-mixing bugs where mocked time (controllable in tests) was being used to compute steady_clock deadlines. Since mocked time and system time move independently, this caused incorrect wait behavior in tests. Changes: - Use steady_clock::time_point for all wait deadlines (nextAttemptTime, lastCleanupTimeSteady) - Keep mocked time (GetTime<>()) for business logic only (timeouts, session tracking) - Remove redundant workEpoch increments (NotifyWorker already does this) - Move NotifyWorker() calls to individual message handlers for better control This ensures that: 1. In production: steady_clock provides monotonic, reliable timing 2. In tests: mocked time controls business logic while steady_clock handles waits 3. No double-incrementing of workEpoch that could cause busy-wait issues
1 parent dcd1d9a commit ae2dc7a

File tree

4 files changed

+38
-49
lines changed

4 files changed

+38
-49
lines changed

src/llmq/signing.cpp

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,6 @@ MessageProcessingResult CSigningManager::ProcessMessage(NodeId from, std::string
431431
}
432432

433433
pendingRecoveredSigs[from].emplace_back(recoveredSig);
434-
workEpoch.fetch_add(1, std::memory_order_acq_rel);
435434
NotifyWorker();
436435
return ret;
437436
}
@@ -523,13 +522,9 @@ bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman)
523522
const size_t nMaxBatchSize{32};
524523
CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums);
525524
if (recSigsByNode.empty()) {
526-
// Check if reconstructed queue has work pending so the caller can keep looping
527-
{
528-
LOCK(cs_pending);
529-
if (!pendingReconstructedRecoveredSigs.empty() || !pendingRecoveredSigs.empty()) {
530-
return true;
531-
}
532-
}
525+
// No work in this batch. Don't proactively check queues for work that may have been
526+
// added by listeners during processing, as this causes busy-wait when combined with
527+
// epoch changes. External threads will call NotifyWorker() to wake us if needed.
533528
return false;
534529
}
535530

@@ -582,13 +577,10 @@ bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman)
582577
}
583578
}
584579

585-
// If we still have pending items in queues, report more work to avoid sleeping
586-
bool more_in_queues = false;
587-
{
588-
LOCK(cs_pending);
589-
more_in_queues = !pendingReconstructedRecoveredSigs.empty() || !pendingRecoveredSigs.empty();
590-
}
591-
return more_in_queues || recSigsByNode.size() >= nMaxBatchSize;
580+
// Only report more work if we processed a full batch, indicating there's likely more
581+
// work from the original collection. Don't check queues for work added by listeners
582+
// during processing, as that would cause busy-wait with epoch-based wake conditions.
583+
return recSigsByNode.size() >= nMaxBatchSize;
592584
}
593585

594586
// signature must be verified already
@@ -640,15 +632,14 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered
640632
}
641633

642634
GetMainSignals().NotifyRecoveredSig(recoveredSig, recoveredSig->GetHash().ToString());
643-
workEpoch.fetch_add(1, std::memory_order_acq_rel);
644-
NotifyWorker();
635+
// Note: Don't call NotifyWorker() here as this function is called by the worker thread itself
636+
// NotifyWorker() is only needed when external threads add work
645637
}
646638

647639
void CSigningManager::PushReconstructedRecoveredSig(const std::shared_ptr<const llmq::CRecoveredSig>& recoveredSig)
648640
{
649641
LOCK(cs_pending);
650642
pendingReconstructedRecoveredSigs.emplace(std::piecewise_construct, std::forward_as_tuple(recoveredSig->GetHash()), std::forward_as_tuple(recoveredSig));
651-
workEpoch.fetch_add(1, std::memory_order_acq_rel);
652643
NotifyWorker();
653644
}
654645

@@ -670,6 +661,7 @@ void CSigningManager::Cleanup()
670661
db.CleanupOldVotes(maxAge);
671662

672663
lastCleanupTime = TicksSinceEpoch<std::chrono::milliseconds>(SystemClock::now());
664+
lastCleanupTimeSteady = std::chrono::steady_clock::now();
673665
}
674666

675667
void CSigningManager::RegisterRecoveredSigsListener(CRecoveredSigsListener* l)
@@ -845,11 +837,10 @@ void CSigningManager::WorkThreadMain(PeerManager& peerman)
845837
// new work arrives or the deadline is reached.
846838
auto next_deadline = std::chrono::steady_clock::time_point::max();
847839
{
848-
int64_t now_ms = TicksSinceEpoch<std::chrono::milliseconds>(SystemClock::now());
849-
int64_t target_ms = lastCleanupTime + 5000;
850-
if (target_ms > now_ms) {
851-
auto delta_ms = target_ms - now_ms;
852-
next_deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(delta_ms);
840+
auto now_steady = std::chrono::steady_clock::now();
841+
auto next_cleanup = lastCleanupTimeSteady + std::chrono::milliseconds(5000);
842+
if (next_cleanup > now_steady) {
843+
next_deadline = next_cleanup;
853844
}
854845
}
855846
if (next_deadline == std::chrono::steady_clock::time_point::max()) {

src/llmq/signing.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ class CSigningManager
177177
FastRandomContext rnd GUARDED_BY(cs_pending);
178178

179179
int64_t lastCleanupTime{0};
180+
std::chrono::steady_clock::time_point lastCleanupTimeSteady{};
180181

181182
mutable Mutex cs_listeners;
182183
std::vector<CRecoveredSigsListener*> recoveredSigsListeners GUARDED_BY(cs_listeners);

src/llmq/signing_shares.cpp

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
239239
for (const auto& sigShare : receivedSigShares) {
240240
ProcessMessageSigShare(pfrom.GetId(), peerman, sigShare);
241241
}
242+
NotifyWorker();
242243
}
243244

244245
if (msg_type == NetMsgType::QSIGSESANN) {
@@ -254,6 +255,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
254255
BanNode(pfrom.GetId(), peerman);
255256
return;
256257
}
258+
NotifyWorker();
257259
} else if (msg_type == NetMsgType::QSIGSHARESINV) {
258260
std::vector<CSigSharesInv> msgs;
259261
vRecv >> msgs;
@@ -267,6 +269,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
267269
BanNode(pfrom.GetId(), peerman);
268270
return;
269271
}
272+
NotifyWorker();
270273
} else if (msg_type == NetMsgType::QGETSIGSHARES) {
271274
std::vector<CSigSharesInv> msgs;
272275
vRecv >> msgs;
@@ -280,6 +283,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
280283
BanNode(pfrom.GetId(), peerman);
281284
return;
282285
}
286+
NotifyWorker();
283287
} else if (msg_type == NetMsgType::QBSIGSHARES) {
284288
std::vector<CBatchedSigShares> msgs;
285289
vRecv >> msgs;
@@ -297,10 +301,8 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman,
297301
BanNode(pfrom.GetId(), peerman);
298302
return;
299303
}
304+
NotifyWorker();
300305
}
301-
302-
// New inbound messages can create work (requests, shares, announcements)
303-
NotifyWorker();
304306
}
305307

306308
bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSesAnn& ann)
@@ -732,7 +734,6 @@ void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& s
732734
return;
733735
}
734736

735-
bool queued_announce = false;
736737
{
737738
LOCK(cs);
738739

@@ -741,7 +742,6 @@ void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& s
741742
}
742743
if (!IsAllMembersConnectedEnabled(llmqType, m_sporkman)) {
743744
sigSharesQueuedToAnnounce.Add(sigShare.GetKey(), true);
744-
queued_announce = true;
745745
}
746746

747747
// Update the time we've seen the last sigShare
@@ -763,9 +763,9 @@ void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& s
763763
}
764764
}
765765

766-
if (queued_announce) {
767-
NotifyWorker();
768-
}
766+
// Note: Don't call NotifyWorker() here even when queued_announce is true
767+
// When called from worker thread: SendMessages() will handle announcements in same iteration
768+
// When called from external thread: ProcessMessage() already calls NotifyWorker() (line 303)
769769

770770
if (canTryRecovery) {
771771
TryRecoverSig(peerman, quorum, sigShare.getId(), sigShare.getMsgHash());
@@ -1029,7 +1029,7 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
10291029
proTxToNode.try_emplace(verifiedProRegTxHash, pnode);
10301030
}
10311031

1032-
auto curTime = GetTime<std::chrono::milliseconds>().count();
1032+
auto curTime = std::chrono::steady_clock::now();
10331033

10341034
for (auto& [_, signedSession] : signedSessions) {
10351035
if (!IsAllMembersConnectedEnabled(signedSession.quorum->params.type, m_sporkman)) {
@@ -1043,7 +1043,7 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
10431043
if (curTime >= signedSession.nextAttemptTime) {
10441044
int64_t waitTime = exp2(signedSession.attempt) * EXP_SEND_FOR_RECOVERY_TIMEOUT;
10451045
waitTime = std::min(MAX_SEND_FOR_RECOVERY_TIMEOUT, waitTime);
1046-
signedSession.nextAttemptTime = curTime + waitTime;
1046+
signedSession.nextAttemptTime = curTime + std::chrono::milliseconds(waitTime);
10471047
auto dmn = SelectMemberForRecovery(signedSession.quorum, signedSession.sigShare.getId(), signedSession.attempt);
10481048
signedSession.attempt++;
10491049

@@ -1430,6 +1430,7 @@ void CSigSharesManager::Cleanup(const CConnman& connman)
14301430
}
14311431

14321432
lastCleanupTime = GetTime<std::chrono::seconds>().count();
1433+
lastCleanupTimeSteady = std::chrono::steady_clock::now();
14331434
}
14341435

14351436
void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash)
@@ -1519,24 +1520,19 @@ void CSigSharesManager::WorkThreadMain(CConnman& connman, PeerManager& peerman)
15191520
auto next_deadline = std::chrono::steady_clock::time_point::max();
15201521
// Respect cleanup cadence (~5s) even when idle
15211522
{
1522-
auto now_tp = std::chrono::steady_clock::now();
1523-
int64_t now_s = GetTime<std::chrono::seconds>().count();
1524-
int64_t target_s = lastCleanupTime + 5;
1525-
if (target_s > now_s) {
1526-
auto delta_ms = (target_s - now_s) * 1000;
1527-
auto cand = now_tp + std::chrono::milliseconds(delta_ms);
1528-
if (cand < next_deadline) next_deadline = cand;
1523+
auto now_steady = std::chrono::steady_clock::now();
1524+
auto next_cleanup = lastCleanupTimeSteady + std::chrono::seconds(5);
1525+
if (next_cleanup > now_steady) {
1526+
if (next_cleanup < next_deadline) next_deadline = next_cleanup;
15291527
}
15301528
}
15311529
{
15321530
// Consider next recovery attempt times for signed sessions to avoid polling
15331531
LOCK(cs);
1534-
int64_t cur_ms = TicksSinceEpoch<std::chrono::milliseconds>(SystemClock::now());
1532+
auto now_steady = std::chrono::steady_clock::now();
15351533
for (const auto& [_, s] : signedSessions) {
1536-
if (s.nextAttemptTime > cur_ms) {
1537-
auto d = s.nextAttemptTime - cur_ms;
1538-
auto cand = std::chrono::steady_clock::now() + std::chrono::milliseconds(d);
1539-
if (cand < next_deadline) next_deadline = cand;
1534+
if (s.nextAttemptTime > now_steady) {
1535+
if (s.nextAttemptTime < next_deadline) next_deadline = s.nextAttemptTime;
15401536
}
15411537
}
15421538
}
@@ -1581,13 +1577,13 @@ void CSigSharesManager::SignPendingSigShares(const CConnman& connman, PeerManage
15811577
auto& session = signedSessions[sigShare.GetSignHash()];
15821578
session.sigShare = sigShare;
15831579
session.quorum = pQuorum;
1584-
session.nextAttemptTime = 0;
1580+
session.nextAttemptTime = std::chrono::steady_clock::time_point{};
15851581
session.attempt = 0;
15861582
}
15871583
}
15881584
}
1589-
// New sig shares or recovery attempts may be available
1590-
NotifyWorker();
1585+
// Note: Don't call NotifyWorker() here as this function is called by the worker thread itself
1586+
// NotifyWorker() is only needed when external threads add work
15911587
}
15921588

15931589
std::optional<CSigShare> CSigSharesManager::CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) const

src/llmq/signing_shares.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ class CSignedSession
355355
CSigShare sigShare;
356356
CQuorumCPtr quorum;
357357

358-
int64_t nextAttemptTime{0};
358+
std::chrono::steady_clock::time_point nextAttemptTime{};
359359
int attempt{0};
360360
};
361361

@@ -414,6 +414,7 @@ class CSigSharesManager : public CRecoveredSigsListener
414414
const CSporkManager& m_sporkman;
415415

416416
int64_t lastCleanupTime{0};
417+
std::chrono::steady_clock::time_point lastCleanupTimeSteady{};
417418
std::atomic<uint32_t> recoveredSigsCounter{0};
418419

419420
public:

0 commit comments

Comments
 (0)