Skip to content

Commit 82d1aed

Browse files
kwvgknst
andcommitted
refactor: move CConnman, PeerManager out of CSigSharesManager ctor
Co-authored-by: Konstantin Akimov <[email protected]>
1 parent 7498a38 commit 82d1aed

File tree

7 files changed

+94
-83
lines changed

7 files changed

+94
-83
lines changed

src/llmq/context.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
3434
*quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests,
3535
wipe)},
3636
sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, unit_tests, wipe)},
37-
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)},
37+
shareman{std::make_unique<llmq::CSigSharesManager>(*sigman, mn_activeman, *qman, sporkman)},
3838
clhandler{[&]() -> llmq::CChainLocksHandler* const {
3939
assert(llmq::chainLocksHandler == nullptr);
4040
llmq::chainLocksHandler = std::make_unique<llmq::CChainLocksHandler>(chainman.ActiveChainstate(), *qman,
@@ -83,7 +83,7 @@ void LLMQContext::Start(CConnman& connman, PeerManager& peerman)
8383
}
8484
qman->Start();
8585
shareman->RegisterAsRecoveredSigsListener();
86-
shareman->StartWorkerThread();
86+
shareman->StartWorkerThread(connman, peerman);
8787
sigman->StartWorkerThread(peerman);
8888

8989
llmq::chainLocksHandler->Start();

src/llmq/dkgsessionhandler.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,8 @@ std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<
433433
return ret;
434434
}
435435

436-
static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman, PeerManager& peerman, const CInv& inv)
436+
static void RelayInvToParticipants(const CDKGSession& session, const CConnman& connman, PeerManager& peerman,
437+
const CInv& inv)
437438
{
438439
CDKGLogger logger(session, __func__, __LINE__);
439440
std::stringstream ss;
@@ -466,7 +467,7 @@ static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman
466467
}
467468

468469
template <typename Message, int MessageType>
469-
bool ProcessPendingMessageBatch(CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages,
470+
bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages,
470471
PeerManager& peerman, size_t maxCount)
471472
{
472473
auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);

src/llmq/signing_shares.cpp

Lines changed: 54 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -178,14 +178,15 @@ void CSigSharesNodeState::RemoveSession(const uint256& signHash)
178178

179179
//////////////////////
180180

181-
void CSigSharesManager::StartWorkerThread()
181+
void CSigSharesManager::StartWorkerThread(CConnman& connman, PeerManager& peerman)
182182
{
183183
// can't start new thread if we have one running already
184184
if (workThread.joinable()) {
185185
assert(false);
186186
}
187187

188-
workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); });
188+
workThread = std::thread(&util::TraceThread, "sigshares",
189+
[this, &connman, &peerman] { WorkThreadMain(connman, peerman); });
189190
}
190191

191192
void CSigSharesManager::StopWorkerThread()
@@ -215,7 +216,8 @@ void CSigSharesManager::InterruptWorkerThread()
215216
workInterrupt();
216217
}
217218

218-
void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& sporkman, const std::string& msg_type, CDataStream& vRecv)
219+
void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const CSporkManager& sporkman,
220+
const std::string& msg_type, CDataStream& vRecv)
219221
{
220222
// non-masternodes are not interested in sigshares
221223
if (m_mn_activeman == nullptr) return;
@@ -227,12 +229,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
227229

228230
if (receivedSigShares.size() > MAX_MSGS_SIG_SHARES) {
229231
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, receivedSigShares.size(), MAX_MSGS_SIG_SHARES, pfrom.GetId());
230-
BanNode(pfrom.GetId());
232+
BanNode(pfrom.GetId(), peerman);
231233
return;
232234
}
233235

234236
for (const auto& sigShare : receivedSigShares) {
235-
ProcessMessageSigShare(pfrom.GetId(), sigShare);
237+
ProcessMessageSigShare(pfrom.GetId(), peerman, sigShare);
236238
}
237239
}
238240

@@ -241,38 +243,38 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
241243
vRecv >> msgs;
242244
if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) {
243245
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId());
244-
BanNode(pfrom.GetId());
246+
BanNode(pfrom.GetId(), peerman);
245247
return;
246248
}
247249
if (!ranges::all_of(msgs,
248250
[this, &pfrom](const auto& ann){ return ProcessMessageSigSesAnn(pfrom, ann); })) {
249-
BanNode(pfrom.GetId());
251+
BanNode(pfrom.GetId(), peerman);
250252
return;
251253
}
252254
} else if (msg_type == NetMsgType::QSIGSHARESINV) {
253255
std::vector<CSigSharesInv> msgs;
254256
vRecv >> msgs;
255257
if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) {
256258
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom.GetId());
257-
BanNode(pfrom.GetId());
259+
BanNode(pfrom.GetId(), peerman);
258260
return;
259261
}
260262
if (!ranges::all_of(msgs,
261263
[this, &pfrom](const auto& inv){ return ProcessMessageSigSharesInv(pfrom, inv); })) {
262-
BanNode(pfrom.GetId());
264+
BanNode(pfrom.GetId(), peerman);
263265
return;
264266
}
265267
} else if (msg_type == NetMsgType::QGETSIGSHARES) {
266268
std::vector<CSigSharesInv> msgs;
267269
vRecv >> msgs;
268270
if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) {
269271
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom.GetId());
270-
BanNode(pfrom.GetId());
272+
BanNode(pfrom.GetId(), peerman);
271273
return;
272274
}
273275
if (!ranges::all_of(msgs,
274276
[this, &pfrom](const auto& inv){ return ProcessMessageGetSigShares(pfrom, inv); })) {
275-
BanNode(pfrom.GetId());
277+
BanNode(pfrom.GetId(), peerman);
276278
return;
277279
}
278280
} else if (msg_type == NetMsgType::QBSIGSHARES) {
@@ -284,12 +286,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
284286
}
285287
if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) {
286288
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId());
287-
BanNode(pfrom.GetId());
289+
BanNode(pfrom.GetId(), peerman);
288290
return;
289291
}
290292
if (!ranges::all_of(msgs,
291293
[this, &pfrom](const auto& bs){ return ProcessMessageBatchedSigShares(pfrom, bs); })) {
292-
BanNode(pfrom.GetId());
294+
BanNode(pfrom.GetId(), peerman);
293295
return;
294296
}
295297
}
@@ -454,7 +456,7 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode& pfrom, const
454456
return true;
455457
}
456458

457-
void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare)
459+
void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, PeerManager& peerman, const CSigShare& sigShare)
458460
{
459461
assert(m_mn_activeman);
460462

@@ -479,12 +481,12 @@ void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& s
479481

480482
if (sigShare.getQuorumMember() >= quorum->members.size()) {
481483
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__);
482-
BanNode(fromId);
484+
BanNode(fromId, peerman);
483485
return;
484486
}
485487
if (!quorum->qc->validMembers[sigShare.getQuorumMember()]) {
486488
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__);
487-
BanNode(fromId);
489+
BanNode(fromId, peerman);
488490
return;
489491
}
490492

@@ -620,7 +622,7 @@ bool CSigSharesManager::CollectPendingSigSharesToVerify(
620622
return true;
621623
}
622624

623-
bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
625+
bool CSigSharesManager::ProcessPendingSigShares(PeerManager& peerman, const CConnman& connman)
624626
{
625627
std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
626628
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
@@ -646,7 +648,7 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
646648
// we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive
647649
// deserialization in the message thread
648650
if (!sigShare.sigShare.Get().IsValid()) {
649-
BanNode(nodeId);
651+
BanNode(nodeId, peerman);
650652
// don't process any additional shares from this node
651653
break;
652654
}
@@ -678,25 +680,26 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
678680
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n",
679681
__func__, nodeId);
680682
// this will also cause re-requesting of the shares that were sent by this node
681-
BanNode(nodeId);
683+
BanNode(nodeId, peerman);
682684
continue;
683685
}
684686

685-
ProcessPendingSigShares(v, quorums, connman);
687+
ProcessPendingSigShares(v, quorums, peerman, connman);
686688
}
687689

688690
return sigSharesByNodes.size() >= nMaxBatchSize;
689691
}
690692

691693
// It's ensured that no duplicates are passed to this method
692-
void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& sigSharesToProcess,
693-
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
694-
const CConnman& connman)
694+
void CSigSharesManager::ProcessPendingSigShares(
695+
const std::vector<CSigShare>& sigSharesToProcess,
696+
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
697+
PeerManager& peerman, const CConnman& connman)
695698
{
696699
cxxtimer::Timer t(true);
697700
for (const auto& sigShare : sigSharesToProcess) {
698701
auto quorumKey = std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash());
699-
ProcessSigShare(sigShare, connman, quorums.at(quorumKey));
702+
ProcessSigShare(peerman, sigShare, connman, quorums.at(quorumKey));
700703
}
701704
t.stop();
702705

@@ -705,7 +708,8 @@ void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& si
705708
}
706709

707710
// sig shares are already verified when entering this method
708-
void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnman& connman, const CQuorumCPtr& quorum)
711+
void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& sigShare, const CConnman& connman,
712+
const CQuorumCPtr& quorum)
709713
{
710714
assert(m_mn_activeman);
711715

@@ -754,11 +758,12 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnma
754758
}
755759

756760
if (canTryRecovery) {
757-
TryRecoverSig(quorum, sigShare.getId(), sigShare.getMsgHash());
761+
TryRecoverSig(peerman, quorum, sigShare.getId(), sigShare.getMsgHash());
758762
}
759763
}
760764

761-
void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
765+
void CSigSharesManager::TryRecoverSig(PeerManager& peerman, const CQuorumCPtr& quorum, const uint256& id,
766+
const uint256& msgHash)
762767
{
763768
if (sigman.HasRecoveredSigForId(quorum->params.type, id)) {
764769
return;
@@ -817,7 +822,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256&
817822
}
818823
}
819824

820-
sigman.ProcessRecoveredSig(rs, *m_peerman);
825+
sigman.ProcessRecoveredSig(rs, peerman);
821826
}
822827

823828
CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt)
@@ -1027,16 +1032,18 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
10271032
}
10281033
}
10291034

1030-
void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
1035+
void CSigSharesManager::CollectSigSharesToAnnounce(
1036+
const CConnman& connman,
1037+
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
10311038
{
10321039
AssertLockHeld(cs);
10331040

10341041
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::unordered_set<NodeId>, StaticSaltedHasher> quorumNodesMap;
10351042

10361043
// TODO: remove NO_THREAD_SAFETY_ANALYSIS
10371044
// using here template ForEach makes impossible to use lock annotation
1038-
sigSharesQueuedToAnnounce.ForEach([this, &quorumNodesMap, &sigSharesToAnnounce](const SigShareKey& sigShareKey,
1039-
bool) NO_THREAD_SAFETY_ANALYSIS {
1045+
sigSharesQueuedToAnnounce.ForEach([this, &connman, &quorumNodesMap,
1046+
&sigSharesToAnnounce](const SigShareKey& sigShareKey, bool) NO_THREAD_SAFETY_ANALYSIS {
10401047
AssertLockHeld(cs);
10411048
const auto& signHash = sigShareKey.first;
10421049
auto quorumMember = sigShareKey.second;
@@ -1084,7 +1091,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, st
10841091
sigSharesQueuedToAnnounce.Clear();
10851092
}
10861093

1087-
bool CSigSharesManager::SendMessages()
1094+
bool CSigSharesManager::SendMessages(CConnman& connman)
10881095
{
10891096
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToRequest;
10901097
std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>> sigShareBatchesToSend;
@@ -1113,7 +1120,7 @@ bool CSigSharesManager::SendMessages()
11131120
LOCK(cs);
11141121
CollectSigSharesToRequest(sigSharesToRequest);
11151122
CollectSigSharesToSend(sigShareBatchesToSend);
1116-
CollectSigSharesToAnnounce(sigSharesToAnnounce);
1123+
CollectSigSharesToAnnounce(connman, sigSharesToAnnounce);
11171124
CollectSigSharesToSendConcentrated(sigSharesToSend, snap.Nodes());
11181125

11191126
for (auto& [nodeId, sigShareMap] : sigSharesToRequest) {
@@ -1254,7 +1261,7 @@ CSigShare CSigSharesManager::RebuildSigShare(const CSigSharesNodeState::SessionI
12541261
return sigShare;
12551262
}
12561263

1257-
void CSigSharesManager::Cleanup()
1264+
void CSigSharesManager::Cleanup(const CConnman& connman)
12581265
{
12591266
int64_t now = GetTime<std::chrono::seconds>().count();
12601267
if (now - lastCleanupTime < 5) {
@@ -1407,13 +1414,13 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash)
14071414
timeSeenForSessions.erase(signHash);
14081415
}
14091416

1410-
void CSigSharesManager::RemoveBannedNodeStates()
1417+
void CSigSharesManager::RemoveBannedNodeStates(PeerManager& peerman)
14111418
{
14121419
// Called regularly to cleanup local node states for banned nodes
14131420

14141421
LOCK(cs);
14151422
for (auto it = nodeStates.begin(); it != nodeStates.end();) {
1416-
if (Assert(m_peerman)->IsBanned(it->first)) {
1423+
if (peerman.IsBanned(it->first)) {
14171424
// re-request sigshares from other nodes
14181425
// TODO: remove NO_THREAD_SAFETY_ANALYSIS
14191426
// using here template ForEach makes impossible to use lock annotation
@@ -1428,23 +1435,21 @@ void CSigSharesManager::RemoveBannedNodeStates()
14281435
}
14291436
}
14301437

1431-
void CSigSharesManager::BanNode(NodeId nodeId)
1438+
void CSigSharesManager::BanNode(NodeId nodeId, PeerManager& peerman)
14321439
{
14331440
if (nodeId == -1) {
14341441
return;
14351442
}
14361443

1437-
{
1438-
Assert(m_peerman)->Misbehaving(nodeId, 100);
1439-
}
1444+
peerman.Misbehaving(nodeId, 100);
14401445

14411446
LOCK(cs);
14421447
auto it = nodeStates.find(nodeId);
14431448
if (it == nodeStates.end()) {
14441449
return;
14451450
}
1446-
auto& nodeState = it->second;
14471451

1452+
auto& nodeState = it->second;
14481453
// Whatever we requested from him, let's request it from someone else now
14491454
// TODO: remove NO_THREAD_SAFETY_ANALYSIS
14501455
// using here template ForEach makes impossible to use lock annotation
@@ -1453,26 +1458,25 @@ void CSigSharesManager::BanNode(NodeId nodeId)
14531458
sigSharesRequested.Erase(k);
14541459
});
14551460
nodeState.requestedSigShares.Clear();
1456-
14571461
nodeState.banned = true;
14581462
}
14591463

1460-
void CSigSharesManager::WorkThreadMain()
1464+
void CSigSharesManager::WorkThreadMain(CConnman& connman, PeerManager& peerman)
14611465
{
14621466
int64_t lastSendTime = 0;
14631467

14641468
while (!workInterrupt) {
1465-
RemoveBannedNodeStates();
1469+
RemoveBannedNodeStates(peerman);
14661470

1467-
bool fMoreWork = ProcessPendingSigShares(connman);
1468-
SignPendingSigShares();
1471+
bool fMoreWork = ProcessPendingSigShares(peerman, connman);
1472+
SignPendingSigShares(connman, peerman);
14691473

14701474
if (GetTimeMillis() - lastSendTime > 100) {
1471-
SendMessages();
1475+
SendMessages(connman);
14721476
lastSendTime = GetTimeMillis();
14731477
}
14741478

1475-
Cleanup();
1479+
Cleanup(connman);
14761480

14771481
// TODO Wakeup when pending signing is needed?
14781482
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
@@ -1487,7 +1491,7 @@ void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id,
14871491
pendingSigns.emplace_back(quorum, id, msgHash);
14881492
}
14891493

1490-
void CSigSharesManager::SignPendingSigShares()
1494+
void CSigSharesManager::SignPendingSigShares(const CConnman& connman, PeerManager& peerman)
14911495
{
14921496
std::vector<PendingSignatureData> v;
14931497
WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns));
@@ -1497,7 +1501,7 @@ void CSigSharesManager::SignPendingSigShares()
14971501

14981502
if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) {
14991503
auto sigShare = *opt_sigShare;
1500-
ProcessSigShare(sigShare, connman, pQuorum);
1504+
ProcessSigShare(peerman, sigShare, connman, pQuorum);
15011505

15021506
if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) {
15031507
LOCK(cs);

0 commit comments

Comments
 (0)