From 1f912bd791ed71d76b406def8b8af5e814e3b4e6 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 14 Dec 2025 04:35:02 +0530 Subject: [PATCH 1/8] refactor: squash `cs_{map,scan}_quorums` to `m_cs_maps` Resolves a TODO --- src/llmq/quorumsman.cpp | 12 ++++++------ src/llmq/quorumsman.h | 21 ++++++++++----------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/llmq/quorumsman.cpp b/src/llmq/quorumsman.cpp index ffcb90878fbf..dcc4bf89fcaa 100644 --- a/src/llmq/quorumsman.cpp +++ b/src/llmq/quorumsman.cpp @@ -82,7 +82,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l quorum->Init(std::make_unique(std::move(qc)), pQuorumBaseBlockIndex, minedBlockHash, members); if (populate_cache && llmq_params_opt->size == 1) { - WITH_LOCK(cs_map_quorums, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); + WITH_LOCK(m_cs_maps, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); return quorum; } @@ -106,7 +106,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l QueueQuorumForWarming(quorum); } - WITH_LOCK(cs_map_quorums, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); + WITH_LOCK(m_cs_maps, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); return quorum; } @@ -230,7 +230,7 @@ std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp std::vector vecResultQuorums; { - LOCK(cs_scan_quorums); + LOCK(m_cs_maps); if (scanQuorumsCache.empty()) { for (const auto& llmq : Params().GetConsensus().llmqs) { // NOTE: We store it for each block hash in the DKG mining phase here @@ -293,7 +293,7 @@ std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp const size_t nCountResult{vecResultQuorums.size()}; if (nCountResult > 0) { - LOCK(cs_scan_quorums); + LOCK(m_cs_maps); // Don't cache more than keepOldConnections elements // because signing by old quorums requires the exact quorum hash // to be specified and quorum scanning isn't needed there. @@ -399,7 +399,7 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, gsl::not_nul } CQuorumPtr pQuorum; - if (LOCK(cs_map_quorums); mapQuorumsCache[llmqType].get(quorumHash, pQuorum)) { + if (LOCK(m_cs_maps); mapQuorumsCache[llmqType].get(quorumHash, pQuorum)) { return pQuorum; } @@ -530,7 +530,7 @@ MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& c CQuorumPtr pQuorum; { - if (LOCK(cs_map_quorums); !mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) { + if (LOCK(m_cs_maps); !mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) { // Don't bump score because we asked for it LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: Quorum not found, from peer=%d\n", __func__, msg_type, pfrom.GetId()); return {}; diff --git a/src/llmq/quorumsman.h b/src/llmq/quorumsman.h index 98fdaf5f6c65..5b24473dccaa 100644 --- a/src/llmq/quorumsman.h +++ b/src/llmq/quorumsman.h @@ -83,12 +83,11 @@ class CQuorumManager final : public QuorumObserverParent mutable std::unordered_map mapQuorumDataRequests GUARDED_BY(cs_data_requests); - mutable Mutex cs_map_quorums; - mutable std::map> mapQuorumsCache GUARDED_BY(cs_map_quorums); - - mutable Mutex cs_scan_quorums; // TODO: merge cs_map_quorums, cs_scan_quorums mutexes + mutable Mutex m_cs_maps; + mutable std::map> mapQuorumsCache + GUARDED_BY(m_cs_maps); mutable std::map>> scanQuorumsCache - GUARDED_BY(cs_scan_quorums); + GUARDED_BY(m_cs_maps); // On mainnet, we have around 62 quorums active at any point; let's cache a little more than double that to be safe. // it maps `quorum_hash` to `pindex` @@ -130,7 +129,7 @@ class CQuorumManager final : public QuorumObserverParent [[nodiscard]] MessageProcessingResult ProcessMessage(CNode& pfrom, CConnman& connman, std::string_view msg_type, CDataStream& vRecv) - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_data_requests, !cs_map_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_data_requests, !m_cs_maps, !m_cache_cs); static bool HasQuorum(Consensus::LLMQType llmqType, const CQuorumBlockProcessor& quorum_block_processor, const uint256& quorumHash); @@ -140,14 +139,14 @@ class CQuorumManager final : public QuorumObserverParent // all these methods will lock cs_main for a short period of time CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); std::vector ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !cs_scan_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); // this one is cs_main-free std::vector ScanQuorums(Consensus::LLMQType llmqType, gsl::not_null pindexStart, size_t nCountRequested) const override - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !cs_scan_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); bool IsMasternode() const; bool IsWatching() const; @@ -167,11 +166,11 @@ class CQuorumManager final : public QuorumObserverParent CQuorumPtr BuildQuorumFromCommitment(Consensus::LLMQType llmqType, gsl::not_null pQuorumBaseBlockIndex, bool populate_cache) const - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, gsl::not_null pindex, bool populate_cache = true) const - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_map_quorums, !m_cache_cs); + EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); void QueueQuorumForWarming(CQuorumCPtr pQuorum) const EXCLUSIVE_LOCKS_REQUIRED(!m_cache_cs); void CacheWarmingThreadMain() const EXCLUSIVE_LOCKS_REQUIRED(!m_cache_cs); From 5a0f361ba68e7d7231eba377c60aaee24eaa26bc Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 11 Jan 2026 22:11:37 +0530 Subject: [PATCH 2/8] refactor: use `unordered_set`s in deduplication/tracking-only paths Most usage is not ordering-sensitive, can reduce lookup complexity. --- src/active/dkgsession.cpp | 4 ++-- src/active/dkgsession.h | 2 +- src/active/dkgsessionhandler.cpp | 6 +++--- src/llmq/debug.h | 4 ++-- src/llmq/dkgsession.cpp | 2 +- src/llmq/dkgsession.h | 18 +++++++++--------- src/llmq/dkgsessionhandler.h | 2 +- src/llmq/ehf_signals.h | 4 +++- src/llmq/observer/quorums.cpp | 4 ++-- src/llmq/observer/quorums.h | 4 ++-- src/llmq/quorums.cpp | 2 +- src/llmq/quorums.h | 2 +- src/llmq/quorumsman.cpp | 2 +- src/llmq/quorumsman.h | 3 ++- src/llmq/utils.cpp | 9 +++++---- src/llmq/utils.h | 8 ++++---- src/net.cpp | 2 +- src/net.h | 11 ++++++----- 18 files changed, 47 insertions(+), 42 deletions(-) diff --git a/src/active/dkgsession.cpp b/src/active/dkgsession.cpp index 15eb449ece7f..bc0dde353903 100644 --- a/src/active/dkgsession.cpp +++ b/src/active/dkgsession.cpp @@ -304,7 +304,7 @@ void ActiveDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages, Pe CDKGLogger logger(*this, __func__, __LINE__); - std::set justifyFor; + Uint256HashSet justifyFor; for (const auto& m : members) { if (m->bad) { @@ -338,7 +338,7 @@ void ActiveDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages, Pe } void ActiveDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, - const std::set& forMembers) + const Uint256HashSet& forMembers) { CDKGLogger logger(*this, __func__, __LINE__); diff --git a/src/active/dkgsession.h b/src/active/dkgsession.h index 412d69e7d84b..3350a06c6b2d 100644 --- a/src/active/dkgsession.h +++ b/src/active/dkgsession.h @@ -43,7 +43,7 @@ class ActiveDKGSession final : public llmq::CDKGSession void VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override EXCLUSIVE_LOCKS_REQUIRED(!invCs); void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, - const std::set& forMembers) override; + const Uint256HashSet& forMembers) override; // Phase 4: commit void VerifyAndCommit(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override; diff --git a/src/active/dkgsessionhandler.cpp b/src/active/dkgsessionhandler.cpp index c0ed65b75402..eb10b625a83d 100644 --- a/src/active/dkgsessionhandler.cpp +++ b/src/active/dkgsessionhandler.cpp @@ -251,19 +251,19 @@ void ActiveDKGSessionHandler::HandlePhase(QuorumPhase curPhase, QuorumPhase next // returns a set of NodeIds which sent invalid messages template -std::set BatchVerifyMessageSigs(CDKGSession& session, const std::vector>>& messages) +std::unordered_set BatchVerifyMessageSigs(CDKGSession& session, const std::vector>>& messages) { if (messages.empty()) { return {}; } - std::set ret; + std::unordered_set ret; bool revertToSingleVerification = false; CBLSSignature aggSig; std::vector pubKeys; std::vector messageHashes; - std::set messageHashesSet; + Uint256HashSet messageHashesSet; pubKeys.reserve(messages.size()); messageHashes.reserve(messages.size()); bool first = true; diff --git a/src/llmq/debug.h b/src/llmq/debug.h index f6923abbe9a4..7228be3e2246 100644 --- a/src/llmq/debug.h +++ b/src/llmq/debug.h @@ -10,7 +10,7 @@ #include #include -#include +#include class CDataStream; class CDeterministicMNManager; @@ -45,7 +45,7 @@ class CDKGDebugMemberStatus uint8_t statusBitset; }; - std::set complaintsFromMembers; + std::unordered_set complaintsFromMembers; public: CDKGDebugMemberStatus() : statusBitset(0) {} diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index fba7e8b38d39..904f5643f787 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -409,7 +409,7 @@ bool CDKGSession::PreVerifyMessage(const CDKGJustification& qj, bool& retBan) co return false; } - std::set contributionsSet; + std::unordered_set contributionsSet; for (const auto& p : qj.contributions) { if (p.index > members.size()) { logger.Batch("invalid contribution index"); diff --git a/src/llmq/dkgsession.h b/src/llmq/dkgsession.h index 16b2e1416653..c68fdaa9f4af 100644 --- a/src/llmq/dkgsession.h +++ b/src/llmq/dkgsession.h @@ -12,8 +12,8 @@ #include #include #include - #include + #include #include @@ -215,13 +215,13 @@ class CDKGMember size_t idx; CBLSId id; - std::set contributions; - std::set complaints; - std::set justifications; - std::set prematureCommitments; + Uint256HashSet contributions; + Uint256HashSet complaints; + Uint256HashSet justifications; + Uint256HashSet prematureCommitments; - std::set badMemberVotes; - std::set complaintsFromOthers; + Uint256HashSet badMemberVotes; + Uint256HashSet complaintsFromOthers; bool bad{false}; bool badConnection{false}; @@ -325,7 +325,7 @@ class CDKGSession std::vector pendingContributionVerifications GUARDED_BY(cs_pending); // filled by ReceivePrematureCommitment and used by FinalizeCommitments - std::set validCommitments GUARDED_BY(invCs); + Uint256HashSet validCommitments GUARDED_BY(invCs); public: CDKGSession(CBLSWorker& _blsWorker, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, @@ -366,7 +366,7 @@ class CDKGSession // Phase 3: justification virtual void VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!invCs) {} - virtual void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, const std::set& forMembers) {} + virtual void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, const Uint256HashSet& forMembers) {} bool PreVerifyMessage(const CDKGJustification& qj, bool& retBan) const; std::optional ReceiveMessage(const CDKGJustification& qj) EXCLUSIVE_LOCKS_REQUIRED(!invCs); diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index 19ea22cdd6c7..04fa3781e947 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -67,7 +67,7 @@ class CDKGPendingMessages mutable Mutex cs_messages; std::list pendingMessages GUARDED_BY(cs_messages); std::map messagesPerNode GUARDED_BY(cs_messages); - std::set seenMessages GUARDED_BY(cs_messages); + Uint256HashSet seenMessages GUARDED_BY(cs_messages); public: explicit CDKGPendingMessages(size_t _maxMessagesPerNode, uint32_t _invType) : diff --git a/src/llmq/ehf_signals.h b/src/llmq/ehf_signals.h index cc9893061dca..33600f85475f 100644 --- a/src/llmq/ehf_signals.h +++ b/src/llmq/ehf_signals.h @@ -7,6 +7,7 @@ #include #include +#include #include @@ -32,7 +33,8 @@ class CEHFSignalsHandler : public CRecoveredSigsListener * keep freshly generated IDs for easier filter sigs in HandleNewRecoveredSig */ mutable Mutex cs; - std::set ids GUARDED_BY(cs); + Uint256HashSet ids GUARDED_BY(cs); + public: explicit CEHFSignalsHandler(ChainstateManager& chainman, CMNHFManager& mnhfman, CSigningManager& sigman, CSigSharesManager& shareman, const CQuorumManager& qman); diff --git a/src/llmq/observer/quorums.cpp b/src/llmq/observer/quorums.cpp index cd37b14f86d9..4a2f9b68165a 100644 --- a/src/llmq/observer/quorums.cpp +++ b/src/llmq/observer/quorums.cpp @@ -334,7 +334,7 @@ void QuorumObserver::StartCleanupOldQuorumDataThread(gsl::not_null dbKeysToSkip; + Uint256HashSet dbKeysToSkip; if (LOCK(cs_cleanup); cleanupQuorumsCache.empty()) { utils::InitQuorumsCache(cleanupQuorumsCache, m_chainman.GetConsensus(), /*limit_by_connections=*/false); @@ -346,7 +346,7 @@ void QuorumObserver::StartCleanupOldQuorumDataThread(gsl::not_null quorum_keys; + Uint256HashSet quorum_keys; while (pindex_loop != nullptr && pIndex->nHeight - pindex_loop->nHeight < params.max_store_depth()) { uint256 quorum_key; if (cache.get(pindex_loop->GetBlockHash(), quorum_key)) { diff --git a/src/llmq/observer/quorums.h b/src/llmq/observer/quorums.h index 020dc62e7fa5..b7006cfae674 100644 --- a/src/llmq/observer/quorums.h +++ b/src/llmq/observer/quorums.h @@ -9,9 +9,9 @@ #include #include #include +#include #include -#include #include #include #include @@ -59,7 +59,7 @@ class QuorumObserverParent gsl::not_null pindexStart, size_t nCountRequested) const = 0; virtual void CleanupExpiredDataRequests() const = 0; - virtual void CleanupOldQuorumData(const std::set& dbKeysToSkip) const = 0; + virtual void CleanupOldQuorumData(const Uint256HashSet& dbKeysToSkip) const = 0; }; class QuorumObserver diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index 51108903cd70..060305c077db 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -26,7 +26,7 @@ uint256 MakeQuorumKey(const CQuorum& q) return hw.GetHash(); } -void DataCleanupHelper(CDBWrapper& db, const std::set& skip_list, bool compact) +void DataCleanupHelper(CDBWrapper& db, const Uint256HashSet& skip_list, bool compact) { const auto prefixes = {DB_QUORUM_QUORUM_VVEC, DB_QUORUM_SK_SHARE}; diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index 6e3899723c8c..02ca9ed4d121 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -35,7 +35,7 @@ extern const std::string DB_QUORUM_SK_SHARE; extern const std::string DB_QUORUM_QUORUM_VVEC; uint256 MakeQuorumKey(const CQuorum& q); -void DataCleanupHelper(CDBWrapper& db, const std::set& skip_list, bool compact = false); +void DataCleanupHelper(CDBWrapper& db, const Uint256HashSet& skip_list, bool compact = false); /** * Object used as a key to store CQuorumDataRequest diff --git a/src/llmq/quorumsman.cpp b/src/llmq/quorumsman.cpp index dcc4bf89fcaa..36efc974e840 100644 --- a/src/llmq/quorumsman.cpp +++ b/src/llmq/quorumsman.cpp @@ -359,7 +359,7 @@ void CQuorumManager::CleanupExpiredDataRequests() const } } -void CQuorumManager::CleanupOldQuorumData(const std::set& dbKeysToSkip) const +void CQuorumManager::CleanupOldQuorumData(const Uint256HashSet& dbKeysToSkip) const { LOCK(cs_db); DataCleanupHelper(*db, dbKeysToSkip); diff --git a/src/llmq/quorumsman.h b/src/llmq/quorumsman.h index 5b24473dccaa..d4427bfaea6c 100644 --- a/src/llmq/quorumsman.h +++ b/src/llmq/quorumsman.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -157,7 +158,7 @@ class CQuorumManager final : public QuorumObserverParent Consensus::LLMQType llmqType) const override EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); void CleanupExpiredDataRequests() const override EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); - void CleanupOldQuorumData(const std::set& dbKeysToSkip) const override EXCLUSIVE_LOCKS_REQUIRED(!cs_db); + void CleanupOldQuorumData(const Uint256HashSet& dbKeysToSkip) const override EXCLUSIVE_LOCKS_REQUIRED(!cs_db); private: // all private methods here are cs_main-free diff --git a/src/llmq/utils.cpp b/src/llmq/utils.cpp index 71720b6a3ba1..24f74988dbfb 100644 --- a/src/llmq/utils.cpp +++ b/src/llmq/utils.cpp @@ -754,8 +754,9 @@ Uint256HashSet GetQuorumRelayMembers(const Consensus::LLMQParams& llmqParams, co return result; } -std::set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, gsl::not_null pQuorumBaseBlockIndex, - size_t memberCount, size_t connectionCount) +std::unordered_set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, + gsl::not_null pQuorumBaseBlockIndex, + size_t memberCount, size_t connectionCount) { static uint256 qwatchConnectionSeed; static std::atomic qwatchConnectionSeedGenerated{false}; @@ -766,7 +767,7 @@ std::set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, qwatchConnectionSeedGenerated = true; } - std::set result; + std::unordered_set result; uint256 rnd = qwatchConnectionSeed; for ([[maybe_unused]] const auto _ : irange::range(connectionCount)) { rnd = ::SerializeHash(std::make_pair(rnd, std::make_pair(llmqType, pQuorumBaseBlockIndex->GetBlockHash()))); @@ -841,7 +842,7 @@ void AddQuorumProbeConnections(const Consensus::LLMQParams& llmqParams, CConnman auto members = GetAllQuorumMembers(llmqParams.type, util_params); auto curTime = GetTime().count(); - std::set probeConnections; + Uint256HashSet probeConnections; for (const auto& dmn : members) { if (dmn->proTxHash == myProTxHash) { continue; diff --git a/src/llmq/utils.h b/src/llmq/utils.h index 5279baf8cf4c..2f839dc16810 100644 --- a/src/llmq/utils.h +++ b/src/llmq/utils.h @@ -16,7 +16,7 @@ #include -#include +#include #include class CBlockIndex; @@ -62,9 +62,9 @@ struct BlsCheck { uint256 DeterministicOutboundConnection(const uint256& proTxHash1, const uint256& proTxHash2); -std::set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, - gsl::not_null pQuorumBaseBlockIndex, - size_t memberCount, size_t connectionCount); +std::unordered_set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, + gsl::not_null pQuorumBaseBlockIndex, + size_t memberCount, size_t connectionCount); // includes members which failed DKG std::vector GetAllQuorumMembers(Consensus::LLMQType llmqType, const UtilParameters& util_params, diff --git a/src/net.cpp b/src/net.cpp index f62ebb16820d..99c478f3013d 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -4455,7 +4455,7 @@ bool CConnman::IsMasternodeQuorumRelayMember(const uint256& protxHash) return false; } -void CConnman::AddPendingProbeConnections(const std::set &proTxHashes) +void CConnman::AddPendingProbeConnections(const Uint256HashSet& proTxHashes) { LOCK(cs_vPendingMasternodes); masternodePendingProbes.insert(proTxHashes.begin(), proTxHashes.end()); diff --git a/src/net.h b/src/net.h index 1da1093f187b..9e4e15ed2f7a 100644 --- a/src/net.h +++ b/src/net.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -24,18 +25,18 @@ #include #include #include -#include #include #include #include #include #include -#include #include #include #include + +#include +#include #include -#include #include #include @@ -1465,7 +1466,7 @@ friend class CNode; void RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash); bool IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) const; bool IsMasternodeQuorumRelayMember(const uint256& protxHash); - void AddPendingProbeConnections(const std::set& proTxHashes); + void AddPendingProbeConnections(const Uint256HashSet& proTxHashes); size_t GetNodeCount(ConnectionDirection) const EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex); std::map getNetLocalAddresses() const; @@ -1782,7 +1783,7 @@ friend class CNode; mutable RecursiveMutex cs_vPendingMasternodes; std::map, Uint256HashSet> masternodeQuorumNodes GUARDED_BY(cs_vPendingMasternodes); std::map, Uint256HashSet> masternodeQuorumRelayMembers GUARDED_BY(cs_vPendingMasternodes); - std::set masternodePendingProbes GUARDED_BY(cs_vPendingMasternodes); + Uint256HashSet masternodePendingProbes GUARDED_BY(cs_vPendingMasternodes); mutable Mutex cs_mapSocketToNode; std::unordered_map mapSocketToNode GUARDED_BY(cs_mapSocketToNode); From 3dba7e7f3c9ddd29193f705eab016198c241d9c4 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 14 Dec 2025 18:13:29 +0530 Subject: [PATCH 3/8] refactor: simplify DKG debug reporting --- src/active/context.cpp | 2 +- src/llmq/core_write.cpp | 4 ++-- src/llmq/debug.cpp | 33 +++++++++++++++++++-------------- src/llmq/debug.h | 27 +++++++++++++-------------- src/llmq/observer/context.cpp | 2 +- src/rpc/quorums.cpp | 14 +++++--------- 6 files changed, 41 insertions(+), 41 deletions(-) diff --git a/src/active/context.cpp b/src/active/context.cpp index d025c61b6149..be578bd26c97 100644 --- a/src/active/context.cpp +++ b/src/active/context.cpp @@ -36,7 +36,7 @@ ActiveContext::ActiveContext(CBLSWorker& bls_worker, ChainstateManager& chainman m_isman{isman}, m_qman{qman}, nodeman{std::make_unique(connman, dmnman, operator_sk)}, - dkgdbgman{std::make_unique()}, + dkgdbgman{std::make_unique(dmnman, qsnapman, chainman)}, qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params, quorums_watch)}, shareman{std::make_unique(connman, chainman.ActiveChainstate(), sigman, peerman, *nodeman, qman, sporkman)}, diff --git a/src/llmq/core_write.cpp b/src/llmq/core_write.cpp index 89add66af10b..a5846169a78a 100644 --- a/src/llmq/core_write.cpp +++ b/src/llmq/core_write.cpp @@ -77,8 +77,8 @@ RPCResult CDKGDebugSessionStatus::GetJsonHelp(const std::string& key, bool optio }}; } -// CDKGDebugStatus::ToJson() defined in llmq/debug.cpp -RPCResult CDKGDebugStatus::GetJsonHelp(const std::string& key, bool optional, bool inner_optional) +// CDKGDebugManager::ToJson() defined in llmq/debug.cpp +RPCResult CDKGDebugManager::GetJsonHelp(const std::string& key, bool optional, bool inner_optional) { return {RPCResult::Type::OBJ, key, optional, key.empty() ? "" : "The state of the node's DKG sessions", { diff --git a/src/llmq/debug.cpp b/src/llmq/debug.cpp index 5d48d36682d4..5bd4887d22d4 100644 --- a/src/llmq/debug.cpp +++ b/src/llmq/debug.cpp @@ -107,21 +107,32 @@ UniValue CDKGDebugSessionStatus::ToJson(CDeterministicMNManager& dmnman, CQuorum return ret; } -CDKGDebugManager::CDKGDebugManager() = default; +CDKGDebugManager::CDKGDebugManager(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, + const ChainstateManager& chainman) : + m_dmnman{dmnman}, + m_qsnapman{qsnapman}, + m_chainman{chainman} +{ +} CDKGDebugManager::~CDKGDebugManager() = default; -UniValue CDKGDebugStatus::ToJson(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, - const ChainstateManager& chainman, int detailLevel) const +size_t CDKGDebugManager::GetSessionCount() const { - UniValue ret(UniValue::VOBJ); + return WITH_LOCK(cs_lockStatus, return localStatus.sessions.size()); +} + +UniValue CDKGDebugManager::ToJson(int detailLevel) const +{ + LOCK(cs_lockStatus); - ret.pushKV("time", nTime); - ret.pushKV("timeStr", FormatISO8601DateTime(nTime)); + UniValue ret(UniValue::VOBJ); + ret.pushKV("time", localStatus.nTime); + ret.pushKV("timeStr", FormatISO8601DateTime(localStatus.nTime)); // TODO Support array of sessions UniValue sessionsArrJson(UniValue::VARR); - for (const auto& p : sessions) { + for (const auto& p : localStatus.sessions) { const auto& llmq_params_opt = Params().GetLLMQ(p.first.first); if (!llmq_params_opt.has_value()) { continue; @@ -129,7 +140,7 @@ UniValue CDKGDebugStatus::ToJson(CDeterministicMNManager& dmnman, CQuorumSnapsho UniValue s(UniValue::VOBJ); s.pushKV("llmqType", std::string(llmq_params_opt->name)); s.pushKV("quorumIndex", p.first.second); - s.pushKV("status", p.second.ToJson(dmnman, qsnapman, chainman, p.first.second, detailLevel)); + s.pushKV("status", p.second.ToJson(m_dmnman, m_qsnapman, m_chainman, p.first.second, detailLevel)); sessionsArrJson.push_back(s); } @@ -138,12 +149,6 @@ UniValue CDKGDebugStatus::ToJson(CDeterministicMNManager& dmnman, CQuorumSnapsho return ret; } -void CDKGDebugManager::GetLocalDebugStatus(llmq::CDKGDebugStatus& ret) const -{ - LOCK(cs_lockStatus); - ret = localStatus; -} - void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex) { LOCK(cs_lockStatus); diff --git a/src/llmq/debug.h b/src/llmq/debug.h index 7228be3e2246..5fef407fa1b1 100644 --- a/src/llmq/debug.h +++ b/src/llmq/debug.h @@ -83,22 +83,18 @@ class CDKGDebugSessionStatus const ChainstateManager& chainman, int quorumIndex, int detailLevel) const; }; -class CDKGDebugStatus -{ -public: +struct CDKGDebugStatus { int64_t nTime{0}; - std::map, CDKGDebugSessionStatus> sessions; - //std::map sessions; - -public: - [[nodiscard]] static RPCResult GetJsonHelp(const std::string& key, bool optional, bool inner_optional = false); - [[nodiscard]] UniValue ToJson(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, - const ChainstateManager& chainman, int detailLevel) const; }; class CDKGDebugManager { +private: + CDeterministicMNManager& m_dmnman; + CQuorumSnapshotManager& m_qsnapman; + const ChainstateManager& m_chainman; + private: mutable Mutex cs_lockStatus; CDKGDebugStatus localStatus GUARDED_BY(cs_lockStatus); @@ -106,11 +102,9 @@ class CDKGDebugManager public: CDKGDebugManager(const CDKGDebugManager&) = delete; CDKGDebugManager& operator=(const CDKGDebugManager&) = delete; - CDKGDebugManager(); + CDKGDebugManager(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman); ~CDKGDebugManager(); - void GetLocalDebugStatus(CDKGDebugStatus& ret) const EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); - void ResetLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex) EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); void InitLocalSessionStatus(const Consensus::LLMQParams& llmqParams, int quorumIndex, const uint256& quorumHash, int quorumHeight) EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); @@ -121,8 +115,13 @@ class CDKGDebugManager void UpdateLocalMemberStatus(Consensus::LLMQType llmqType, int quorumIndex, size_t memberIdx, std::function&& func) EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); -}; + size_t GetSessionCount() const + EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); + [[nodiscard]] static RPCResult GetJsonHelp(const std::string& key, bool optional, bool inner_optional = false); + [[nodiscard]] UniValue ToJson(int detailLevel) const + EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); +}; } // namespace llmq #endif // BITCOIN_LLMQ_DEBUG_H diff --git a/src/llmq/observer/context.cpp b/src/llmq/observer/context.cpp index 7fe664cee825..82312fb66960 100644 --- a/src/llmq/observer/context.cpp +++ b/src/llmq/observer/context.cpp @@ -19,7 +19,7 @@ ObserverContext::ObserverContext(CBLSWorker& bls_worker, CConnman& connman, CDet const CSporkManager& sporkman, const llmq::QvvecSyncModeMap& sync_map, const util::DbWrapperParams& db_params, bool quorums_recovery) : m_qman{qman}, - dkgdbgman{std::make_unique()}, + dkgdbgman{std::make_unique(dmnman, qsnapman, chainman)}, qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params, /*quorums_watch=*/true)}, qman_handler{std::make_unique(connman, dmnman, qman, qsnapman, chainman, mn_sync, sporkman, diff --git a/src/rpc/quorums.cpp b/src/rpc/quorums.cpp index f82ec32def8d..c860b018feff 100644 --- a/src/rpc/quorums.cpp +++ b/src/rpc/quorums.cpp @@ -297,7 +297,7 @@ static RPCHelpMan quorum_info() static RPCResult quorum_dkgstatus_help() { - auto ret = llmq::CDKGDebugStatus::GetJsonHelp(/*key=*/"", /*optional=*/false, /*inner_optional=*/true); + auto ret = llmq::CDKGDebugManager::GetJsonHelp(/*key=*/"", /*optional=*/false, /*inner_optional=*/true); auto mod_inner = ret.m_inner; mod_inner.push_back({RPCResult::Type::ARR, "quorumConnections", "Array of objects containing quorum connection information", { {RPCResult::Type::OBJ, "", "", { @@ -346,18 +346,16 @@ static RPCHelpMan quorum_dkgstatus() UniValue quorumArrConnections(UniValue::VARR); const NodeContext& node = EnsureAnyNodeContext(request.context); - const ChainstateManager& chainman = EnsureChainman(node); - const LLMQContext& llmq_ctx = EnsureLLMQContext(node); if (const auto* debugman = node.active_ctx ? node.active_ctx->dkgdbgman.get() : node.observer_ctx ? node.observer_ctx->dkgdbgman.get() : nullptr; debugman) { - llmq::CDKGDebugStatus status; - debugman->GetLocalDebugStatus(status); - ret = status.ToJson(*CHECK_NONFATAL(node.dmnman), *llmq_ctx.qsnapman, chainman, detailLevel); + ret = debugman->ToJson(detailLevel); } const CConnman& connman = EnsureConnman(node); + const ChainstateManager& chainman = EnsureChainman(node); const CBlockIndex* const pindexTip = WITH_LOCK(cs_main, return chainman.ActiveChain().Tip()); + const LLMQContext& llmq_ctx = EnsureLLMQContext(node); const int tipHeight = pindexTip->nHeight; const uint256 proTxHash = node.active_ctx ? node.active_ctx->nodeman->GetProTxHash() : uint256{}; for (const auto& type : llmq::GetEnabledQuorumTypes(chainman, pindexTip)) { @@ -1004,10 +1002,8 @@ static RPCHelpMan quorum_dkginfo() } const auto& dkgdbgman = *(node.active_ctx ? node.active_ctx->dkgdbgman.get() : node.observer_ctx->dkgdbgman.get()); - llmq::CDKGDebugStatus status; - dkgdbgman.GetLocalDebugStatus(status); UniValue ret(UniValue::VOBJ); - ret.pushKV("active_dkgs", status.sessions.size()); + ret.pushKV("active_dkgs", dkgdbgman.GetSessionCount()); const ChainstateManager& chainman = EnsureChainman(node); const int nTipHeight{WITH_LOCK(cs_main, return chainman.ActiveChain().Height())}; From ae3ffb97f6b6bc37dae0cd31a221a59a771fcfa2 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 15 Dec 2025 03:33:41 +0530 Subject: [PATCH 4/8] trivial: drop condition for compatibility with ReadDataStream(k2, writeTimeDs) && writeTimeDs.size() == sizeof(uint32_t)) { + if (db->ReadDataStream(k2, writeTimeDs)) { uint32_t writeTime; writeTimeDs >> writeTime; auto k5 = std::make_tuple(std::string("rs_t"), (uint32_t) htobe32_internal(writeTime), recSig.getLlmqType(), recSig.getId()); From 8262237c30bb376effcc8b51b04a9c441b53a611 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 15 Dec 2025 18:36:40 +0530 Subject: [PATCH 5/8] refactor: deduplicate common routines in most `ReceiveMessage` variants Also: - Don't start DKG logger if we expect to bail right after in ActiveSession --- src/active/dkgsession.cpp | 4 +- src/llmq/dkgsession.cpp | 173 ++++++++++++++++++-------------------- src/llmq/dkgsession.h | 22 ++++- 3 files changed, 106 insertions(+), 93 deletions(-) diff --git a/src/active/dkgsession.cpp b/src/active/dkgsession.cpp index bc0dde353903..9cdcc3981c81 100644 --- a/src/active/dkgsession.cpp +++ b/src/active/dkgsession.cpp @@ -37,14 +37,14 @@ ActiveDKGSession::~ActiveDKGSession() = default; void ActiveDKGSession::Contribute(CDKGPendingMessages& pendingMessages, PeerManager& peerman) { - CDKGLogger logger(*this, __func__, __LINE__); - if (!AreWeMember()) { return; } assert(params.threshold > 1); // we should not get there with single-node-quorums + CDKGLogger logger(*this, __func__, __LINE__); + cxxtimer::Timer t1(true); logger.Batch("generating contributions"); if (!blsWorker.GenerateContributions(params.threshold, memberIds, vvecContribution, m_sk_contributions)) { diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index 904f5643f787..d4013645c977 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace llmq { CDKGLogger::CDKGLogger(const CDKGSession& _quorumDkg, std::string_view _func, int source_line) : @@ -197,42 +198,15 @@ bool CDKGSession::PreVerifyMessage(const CDKGContribution& qc, bool& retBan) con return true; } -// TODO: remove duplicated code between all ReceiveMessage: CDKGContribution, CDKGComplaint, CDKGJustification, CDKGPrematureCommitment std::optional CDKGSession::ReceiveMessage(const CDKGContribution& qc) { CDKGLogger logger(*this, __func__, __LINE__); - - auto* member = GetMember(qc.proTxHash); - cxxtimer::Timer t1(true); - logger.Batch("received contribution from %s", qc.proTxHash.ToString()); - // relay, no matter if further verification fails - // This ensures the whole quorum sees the bad behavior - - if (member->contributions.size() >= 2) { - // only relay up to 2 contributions, that's enough to let the other members know about his bad behavior - return std::nullopt; - } - - const uint256 hash = ::SerializeHash(qc); - WITH_LOCK(invCs, contributions.emplace(hash, qc)); - member->contributions.emplace(hash); - - CInv inv(MSG_QUORUM_CONTRIB, hash); - - dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { - status.statusBits.receivedContribution = true; - return true; - }); - - if (member->contributions.size() > 1) { - // don't do any further processing if we got more than 1 contribution. we already relayed it, - // so others know about his bad behavior - MarkBadMember(member->idx); - logger.Batch("%s did send multiple contributions", member->dmn->proTxHash.ToString()); - return inv; - } + auto state = WITH_LOCK(invCs, return ReceiveMessagePreamble(qc, MsgPhase::Contribution, logger)); + if (!state) return std::nullopt; + auto& [member, hash, inv, should_process] = *state; + if (!should_process) return inv; receivedVvecs[member->idx] = qc.vvec; @@ -240,13 +214,11 @@ std::optional CDKGSession::ReceiveMessage(const CDKGContribution& qc) logger.Batch("received and relayed contribution. received=%d/%d, time=%d", receivedCount, members.size(), t1.count()); - cxxtimer::Timer t2(true); - if (!AreWeMember()) { - // can't further validate return inv; } + cxxtimer::Timer t2(true); dkgManager.WriteVerifiedVvecContribution(params.type, m_quorum_base_block_index, qc.proTxHash, qc.vvec); bool complain = false; @@ -327,33 +299,10 @@ std::optional CDKGSession::ReceiveMessage(const CDKGComplaint& qc) { CDKGLogger logger(*this, __func__, __LINE__); - logger.Batch("received complaint from %s", qc.proTxHash.ToString()); - - auto* member = GetMember(qc.proTxHash); - - if (member->complaints.size() >= 2) { - // only relay up to 2 complaints, that's enough to let the other members know about his bad behavior - return std::nullopt; - } - - const uint256 hash = ::SerializeHash(qc); - WITH_LOCK(invCs, complaints.emplace(hash, qc)); - member->complaints.emplace(hash); - - CInv inv(MSG_QUORUM_COMPLAINT, hash); - - dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { - status.statusBits.receivedComplaint = true; - return true; - }); - - if (member->complaints.size() > 1) { - // don't do any further processing if we got more than 1 complaint. we already relayed it, - // so others know about his bad behavior - MarkBadMember(member->idx); - logger.Batch("%s did send multiple complaints", member->dmn->proTxHash.ToString()); - return inv; - } + auto state = WITH_LOCK(invCs, return ReceiveMessagePreamble(qc, MsgPhase::Complaint, logger)); + if (!state) return std::nullopt; + auto& [member, hash, inv, should_process] = *state; + if (!should_process) return inv; int receivedCount = 0; for (const auto i : irange::range(members.size())) { @@ -447,34 +396,10 @@ std::optional CDKGSession::ReceiveMessage(const CDKGJustification& qj) { CDKGLogger logger(*this, __func__, __LINE__); - logger.Batch("received justification from %s", qj.proTxHash.ToString()); - - auto* member = GetMember(qj.proTxHash); - - if (member->justifications.size() >= 2) { - // only relay up to 2 justifications, that's enough to let the other members know about his bad behavior - return std::nullopt; - } - - const uint256 hash = ::SerializeHash(qj); - WITH_LOCK(invCs, justifications.emplace(hash, qj)); - member->justifications.emplace(hash); - - // we always relay, even if further verification fails - CInv inv(MSG_QUORUM_JUSTIFICATION, hash); - - dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { - status.statusBits.receivedJustification = true; - return true; - }); - - if (member->justifications.size() > 1) { - // don't do any further processing if we got more than 1 justification. we already relayed it, - // so others know about his bad behavior - logger.Batch("%s did send multiple justifications", member->dmn->proTxHash.ToString()); - MarkBadMember(member->idx); - return inv; - } + auto state = WITH_LOCK(invCs, return ReceiveMessagePreamble(qj, MsgPhase::Justification, logger)); + if (!state) return std::nullopt; + auto& [member, hash, inv, should_process] = *state; + if (!should_process) return inv; if (member->bad) { // we locally determined him to be bad (sent none or more then one contributions) @@ -683,6 +608,76 @@ CDKGMember* CDKGSession::GetMember(const uint256& proTxHash) const return members[it->second].get(); } +template +std::optional CDKGSession::ReceiveMessagePreamble(const MsgType& msg, MsgPhase phase, CDKGLogger& logger) +{ + auto* member = GetMember(msg.proTxHash); + + GetDataMsg inv_type{0}; + std::string msg_name; + + // Select member set, inv type, and name based on phase + auto& member_set = [&]() -> Uint256HashSet& { + switch (phase) { + case MsgPhase::Contribution: + inv_type = MSG_QUORUM_CONTRIB; + msg_name = "contribution"; + return member->contributions; + case MsgPhase::Complaint: + inv_type = MSG_QUORUM_COMPLAINT; + msg_name = "complaint"; + return member->complaints; + case MsgPhase::Justification: + inv_type = MSG_QUORUM_JUSTIFICATION; + msg_name = "justification"; + return member->justifications; + } + assert(false); + }(); + + logger.Batch("received %s from %s", msg_name, msg.proTxHash.ToString()); + + if (member_set.size() >= 2) { + // only relay up to 2 justifications, that's enough to let the other members know about his bad behavior + return std::nullopt; + } + + const uint256 hash = ::SerializeHash(msg); + member_set.emplace(hash); + if constexpr (std::is_same_v) { + contributions.emplace(hash, msg); + } else if constexpr (std::is_same_v) { + complaints.emplace(hash, msg); + } else if constexpr (std::is_same_v) { + justifications.emplace(hash, msg); + } + + dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [phase](CDKGDebugMemberStatus& status) { + switch (phase) { + case MsgPhase::Contribution: status.statusBits.receivedContribution = true; break; + case MsgPhase::Complaint: status.statusBits.receivedComplaint = true; break; + case MsgPhase::Justification: status.statusBits.receivedJustification = true; break; + } + return true; + }); + + bool should_process{true}; + if (member_set.size() > 1) { + // don't do any further processing if we got more than 1 justification. we already relayed it, + // so others know about his bad behavior + MarkBadMember(member->idx); + logger.Batch("%s did send multiple %ss", member->dmn->proTxHash.ToString(), msg_name); + should_process = false; + } + + // we always relay, even if further verification fails + return ReceiveMessageState{member, hash, CInv{inv_type, hash}, should_process}; +} + +template std::optional CDKGSession::ReceiveMessagePreamble(const CDKGContribution&, MsgPhase, CDKGLogger&); +template std::optional CDKGSession::ReceiveMessagePreamble(const CDKGComplaint&, MsgPhase, CDKGLogger&); +template std::optional CDKGSession::ReceiveMessagePreamble(const CDKGJustification&, MsgPhase, CDKGLogger&); + void CDKGSession::MarkBadMember(size_t idx) { auto* member = members.at(idx).get(); diff --git a/src/llmq/dkgsession.h b/src/llmq/dkgsession.h index c68fdaa9f4af..90120b1dc018 100644 --- a/src/llmq/dkgsession.h +++ b/src/llmq/dkgsession.h @@ -6,6 +6,7 @@ #define BITCOIN_LLMQ_DKGSESSION_H #include +#include #include #include @@ -21,7 +22,6 @@ #include class CActiveMasternodeManager; -class CInv; class CConnman; class CDeterministicMN; class CMasternodeMetaMan; @@ -281,6 +281,20 @@ class CDKGSession friend class CDKGSessionManager; friend class CDKGLogger; +private: + enum class MsgPhase : uint8_t { + Contribution, + Complaint, + Justification + }; + + struct ReceiveMessageState { + CDKGMember* member; + uint256 hash; + CInv inv; + bool should_process; + }; + private: CBLSWorker& blsWorker; CBLSWorkerCache cache; @@ -400,12 +414,16 @@ class CDKGSession private: [[nodiscard]] bool ShouldSimulateError(DKGError::type type) const; + + template + [[nodiscard]] std::optional ReceiveMessagePreamble(const MsgType& msg, MsgPhase phase, CDKGLogger& logger) + EXCLUSIVE_LOCKS_REQUIRED(invCs); + void MarkBadMember(size_t idx); }; void SetSimulatedDKGErrorRate(DKGError::type type, double rate); double GetSimulatedErrorRate(DKGError::type type); - } // namespace llmq #endif // BITCOIN_LLMQ_DKGSESSION_H From 9a13ac0bad0bcc42354ab84643e8990a40c7fc66 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 11 Jan 2026 16:12:20 +0530 Subject: [PATCH 6/8] refactor: pass worker count by argument --- src/active/context.cpp | 6 +++--- src/active/context.h | 2 +- src/bench/bls.cpp | 5 ++++- src/bench/bls_dkg.cpp | 9 ++++++--- src/bls/bls_worker.cpp | 6 ++---- src/bls/bls_worker.h | 2 +- src/init.cpp | 5 +++-- src/llmq/context.cpp | 5 +++-- src/llmq/context.h | 3 ++- src/llmq/observer/context.cpp | 4 ++-- src/llmq/observer/context.h | 2 +- src/llmq/observer/quorums.cpp | 6 ++---- src/llmq/observer/quorums.h | 2 +- src/llmq/options.h | 11 +++++++---- src/llmq/signing_shares.cpp | 5 ++--- src/llmq/signing_shares.h | 2 +- src/node/chainstate.cpp | 7 +++++-- src/node/chainstate.h | 2 ++ src/test/util/setup_common.cpp | 4 +++- 19 files changed, 51 insertions(+), 37 deletions(-) diff --git a/src/active/context.cpp b/src/active/context.cpp index be578bd26c97..d46a991a7878 100644 --- a/src/active/context.cpp +++ b/src/active/context.cpp @@ -72,11 +72,11 @@ void ActiveContext::Interrupt() shareman->InterruptWorkerThread(); } -void ActiveContext::Start(CConnman& connman, PeerManager& peerman) +void ActiveContext::Start(CConnman& connman, PeerManager& peerman, int8_t worker_count) { - qman_handler->Start(); + qman_handler->Start(worker_count); qdkgsman->StartThreads(connman, peerman); - shareman->Start(); + shareman->Start(worker_count); cl_signer->RegisterRecoveryInterface(); is_signer->RegisterRecoveryInterface(); shareman->RegisterRecoveryInterface(); diff --git a/src/active/context.h b/src/active/context.h index c147ed7ea6ff..fc6364d73aa7 100644 --- a/src/active/context.h +++ b/src/active/context.h @@ -73,7 +73,7 @@ struct ActiveContext final : public CValidationInterface { ~ActiveContext(); void Interrupt(); - void Start(CConnman& connman, PeerManager& peerman); + void Start(CConnman& connman, PeerManager& peerman, int8_t worker_count); void Stop(); CCoinJoinServer& GetCJServer() const; diff --git a/src/bench/bls.cpp b/src/bench/bls.cpp index e6206d241d9f..eeb6652dc455 100644 --- a/src/bench/bls.cpp +++ b/src/bench/bls.cpp @@ -3,7 +3,10 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include + #include +#include + #include #include @@ -321,7 +324,7 @@ static void BLS_Verify_BatchedParallel(benchmark::Bench& bench) }; CBLSWorker blsWorker; - blsWorker.Start(); + blsWorker.Start(llmq::DEFAULT_WORKER_COUNT); // Benchmark. bench.minEpochIterations(bench.output() ? 1000 : 1).run([&] { diff --git a/src/bench/bls_dkg.cpp b/src/bench/bls_dkg.cpp index 14e83810a100..2b542dd339ea 100644 --- a/src/bench/bls_dkg.cpp +++ b/src/bench/bls_dkg.cpp @@ -3,10 +3,13 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include + #include -#include +#include #include +#include + struct Member { CBLSId id; @@ -67,7 +70,7 @@ class DKG ids.emplace_back(id); } - blsWorker.Start(); + blsWorker.Start(llmq::DEFAULT_WORKER_COUNT); for (auto& member : members) { blsWorker.GenerateContributions(quorumSize / 2 + 1, ids, member.vvec, member.skShares); } @@ -110,7 +113,7 @@ class DKG static void BLSDKG_GenerateContributions(benchmark::Bench& bench, uint32_t epoch_iters, int quorumSize) { CBLSWorker blsWorker; - blsWorker.Start(); + blsWorker.Start(llmq::DEFAULT_WORKER_COUNT); std::vector ids; std::vector members; if (!bench.output()) { diff --git a/src/bls/bls_worker.cpp b/src/bls/bls_worker.cpp index 44f15e46c22b..d807d6710b38 100644 --- a/src/bls/bls_worker.cpp +++ b/src/bls/bls_worker.cpp @@ -56,11 +56,9 @@ CBLSWorker::~CBLSWorker() Stop(); } -void CBLSWorker::Start() +void CBLSWorker::Start(int8_t worker_count) { - int workerCount = std::thread::hardware_concurrency() / 2; - workerCount = std::clamp(workerCount, 1, 4); - workerPool.resize(workerCount); + workerPool.resize(worker_count); RenameThreadPool(workerPool, "bls-work"); } diff --git a/src/bls/bls_worker.h b/src/bls/bls_worker.h index 9fbf812aaa88..abed3e4fcc10 100644 --- a/src/bls/bls_worker.h +++ b/src/bls/bls_worker.h @@ -54,7 +54,7 @@ class CBLSWorker CBLSWorker(); ~CBLSWorker(); - void Start(); + void Start(int8_t worker_count); void Stop(); bool GenerateContributions(int threshold, Span ids, BLSVerificationVectorPtr& vvecRet, std::vector& skSharesRet); diff --git a/src/init.cpp b/src/init.cpp index 6e79ed5b0d21..20744ee7ab6a 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -2028,6 +2028,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // Subtract 1 because the main thread counts towards the par threads return std::clamp(threads - 1, 0, llmq::MAX_BLSCHECK_THREADS); }(), + llmq::DEFAULT_WORKER_COUNT, args.GetIntArg("-maxrecsigsage", llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE), /*shutdown_requested=*/ShutdownRequested, /*coins_error_cb=*/[]() { @@ -2328,7 +2329,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.llmq_ctx->Start(); node.peerman->StartHandlers(); - if (node.observer_ctx) node.observer_ctx->Start(); + if (node.observer_ctx) node.observer_ctx->Start(llmq::DEFAULT_WORKER_COUNT); node.scheduler->scheduleEvery(std::bind(&CNetFulfilledRequestManager::DoMaintenance, std::ref(*node.netfulfilledman)), std::chrono::minutes{1}); node.scheduler->scheduleEvery(std::bind(&CMasternodeUtils::DoMaintenance, std::ref(*node.connman), std::ref(*node.dmnman), std::ref(*node.mn_sync), node.cj_walletman.get()), std::chrono::minutes{1}); @@ -2336,7 +2337,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman->ScheduleHandlers(*node.scheduler); if (node.active_ctx) { - node.active_ctx->Start(*node.connman, *node.peerman); + node.active_ctx->Start(*node.connman, *node.peerman, llmq::DEFAULT_WORKER_COUNT); node.scheduler->scheduleEvery(std::bind(&llmq::CDKGSessionManager::CleanupOldContributions, std::ref(*node.active_ctx->qdkgsman)), std::chrono::hours{1}); } diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index d22a8de1156a..04e7dcecf2d5 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -15,7 +15,8 @@ LLMQContext::LLMQContext(CDeterministicMNManager& dmnman, CEvoDB& evo_db, CSporkManager& sporkman, CTxMemPool& mempool, const ChainstateManager& chainman, const CMasternodeSync& mn_sync, - const util::DbWrapperParams& db_params, int8_t bls_threads, int64_t max_recsigs_age) : + const util::DbWrapperParams& db_params, int8_t bls_threads, int8_t worker_count, + int64_t max_recsigs_age) : bls_worker{std::make_shared()}, qsnapman{std::make_unique(evo_db)}, quorum_block_processor{std::make_unique(chainman.ActiveChainstate(), dmnman, evo_db, @@ -28,7 +29,7 @@ LLMQContext::LLMQContext(CDeterministicMNManager& dmnman, CEvoDB& evo_db, CSpork mempool, mn_sync, db_params)} { // Have to start it early to let VerifyDB check ChainLock signatures in coinbase - bls_worker->Start(); + bls_worker->Start(worker_count); } LLMQContext::~LLMQContext() diff --git a/src/llmq/context.h b/src/llmq/context.h index 1e2e84e25a0e..61b4eb61421d 100644 --- a/src/llmq/context.h +++ b/src/llmq/context.h @@ -38,7 +38,8 @@ struct LLMQContext { LLMQContext& operator=(const LLMQContext&) = delete; explicit LLMQContext(CDeterministicMNManager& dmnman, CEvoDB& evo_db, CSporkManager& sporkman, CTxMemPool& mempool, const ChainstateManager& chainman, const CMasternodeSync& mn_sync, - const util::DbWrapperParams& db_params, int8_t bls_threads, int64_t max_recsigs_age); + const util::DbWrapperParams& db_params, int8_t bls_threads, int8_t worker_count, + int64_t max_recsigs_age); ~LLMQContext(); void Start(); diff --git a/src/llmq/observer/context.cpp b/src/llmq/observer/context.cpp index 82312fb66960..76055c16da0c 100644 --- a/src/llmq/observer/context.cpp +++ b/src/llmq/observer/context.cpp @@ -37,9 +37,9 @@ ObserverContext::~ObserverContext() m_qman.DisconnectManagers(); } -void ObserverContext::Start() +void ObserverContext::Start(int8_t worker_count) { - qman_handler->Start(); + qman_handler->Start(worker_count); } void ObserverContext::Stop() diff --git a/src/llmq/observer/context.h b/src/llmq/observer/context.h index 97ff248bbf97..ac8bd416d049 100644 --- a/src/llmq/observer/context.h +++ b/src/llmq/observer/context.h @@ -47,7 +47,7 @@ struct ObserverContext final : public CValidationInterface { const util::DbWrapperParams& db_params, bool quorums_recovery); ~ObserverContext(); - void Start(); + void Start(int8_t worker_count); void Stop(); protected: diff --git a/src/llmq/observer/quorums.cpp b/src/llmq/observer/quorums.cpp index 4a2f9b68165a..6c1c3fd718db 100644 --- a/src/llmq/observer/quorums.cpp +++ b/src/llmq/observer/quorums.cpp @@ -44,11 +44,9 @@ QuorumObserver::~QuorumObserver() Stop(); } -void QuorumObserver::Start() +void QuorumObserver::Start(int8_t worker_count) { - int workerCount = std::thread::hardware_concurrency() / 2; - workerCount = std::clamp(workerCount, 1, 4); - workerPool.resize(workerCount); + workerPool.resize(worker_count); RenameThreadPool(workerPool, "q-mngr"); } diff --git a/src/llmq/observer/quorums.h b/src/llmq/observer/quorums.h index b7006cfae674..1811bc090e8b 100644 --- a/src/llmq/observer/quorums.h +++ b/src/llmq/observer/quorums.h @@ -92,7 +92,7 @@ class QuorumObserver const llmq::QvvecSyncModeMap& sync_map, bool quorums_recovery); virtual ~QuorumObserver(); - void Start(); + void Start(int8_t worker_count); void Stop(); void UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) const; diff --git a/src/llmq/options.h b/src/llmq/options.h index 7c644be5f7ef..6245315d8232 100644 --- a/src/llmq/options.h +++ b/src/llmq/options.h @@ -9,6 +9,7 @@ #include #include +#include #include class ArgsManager; @@ -27,14 +28,16 @@ enum class QvvecSyncMode : int8_t { OnlyIfTypeMember = 1, }; -/** Maximum number of dedicated script-checking threads allowed */ -static const int8_t MAX_BLSCHECK_THREADS{33}; -/** -parbls default (number of bls-checking threads, 0 = auto) */ -static const int8_t DEFAULT_BLSCHECK_THREADS{0}; /** -llmq-data-recovery default */ static constexpr bool DEFAULT_ENABLE_QUORUM_DATA_RECOVERY{true}; /** -watchquorums default, if true, we will connect to all new quorums and watch their communication */ static constexpr bool DEFAULT_WATCH_QUORUMS{false}; +/** -parbls default (number of bls-checking threads, 0 = auto) */ +static constexpr int8_t DEFAULT_BLSCHECK_THREADS{0}; +/** Number of workers allocated per worker pool */ +static int8_t DEFAULT_WORKER_COUNT{std::clamp(std::thread::hardware_concurrency() / 2, 1, 4)}; +/** Maximum number of dedicated script-checking threads allowed */ +static constexpr int8_t MAX_BLSCHECK_THREADS{33}; bool IsAllMembersConnectedEnabled(const Consensus::LLMQType llmqType, const CSporkManager& sporkman); bool IsQuorumPoseEnabled(const Consensus::LLMQType llmqType, const CSporkManager& sporkman); diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 5892b6e51bc5..b2c82ba11418 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -196,7 +196,7 @@ CSigSharesManager::CSigSharesManager(CConnman& connman, CChainState& chainstate, CSigSharesManager::~CSigSharesManager() = default; -void CSigSharesManager::Start() +void CSigSharesManager::Start(int8_t worker_count) { // can't start if threads are already running if (housekeepingThread.joinable() || dispatcherThread.joinable()) { @@ -204,8 +204,7 @@ void CSigSharesManager::Start() } // Initialize worker pool - int workerCount = std::clamp(static_cast(std::thread::hardware_concurrency() / 2), 1, 4); - workerPool.resize(workerCount); + workerPool.resize(worker_count); RenameThreadPool(workerPool, "sigsh-work"); // Start housekeeping thread diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index e6d91f7b25a2..9907d80326ce 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -429,7 +429,7 @@ class CSigSharesManager : public llmq::CRecoveredSigsListener const CQuorumManager& _qman, const CSporkManager& sporkman); ~CSigSharesManager() override; - void Start() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void Start(int8_t worker_count) EXCLUSIVE_LOCKS_REQUIRED(!cs); void Stop() EXCLUSIVE_LOCKS_REQUIRED(!cs); void RegisterRecoveryInterface() EXCLUSIVE_LOCKS_REQUIRED(!cs); void UnregisterRecoveryInterface() EXCLUSIVE_LOCKS_REQUIRED(!cs); diff --git a/src/node/chainstate.cpp b/src/node/chainstate.cpp index e60eaace5630..bc41e210565e 100644 --- a/src/node/chainstate.cpp +++ b/src/node/chainstate.cpp @@ -61,6 +61,7 @@ std::optional LoadChainstate(bool fReset, bool coins_db_in_memory, bool dash_dbs_in_memory, int8_t bls_threads, + int8_t worker_count, int64_t max_recsigs_age, std::function shutdown_requested, std::function coins_error_cb) @@ -89,7 +90,8 @@ std::optional LoadChainstate(bool fReset, DashChainstateSetup(chainman, govman, mn_metaman, mn_sync, sporkman, chain_helper, cpoolman, dmnman, evodb, mnhf_manager, llmq_ctx, mempool, data_dir, dash_dbs_in_memory, - /*llmq_dbs_wipe=*/fReset || fReindexChainState, bls_threads, max_recsigs_age, consensus_params); + /*llmq_dbs_wipe=*/fReset || fReindexChainState, bls_threads, worker_count, + max_recsigs_age, consensus_params); if (fReset) { pblocktree->WriteReindexing(true); @@ -225,6 +227,7 @@ void DashChainstateSetup(ChainstateManager& chainman, bool llmq_dbs_in_memory, bool llmq_dbs_wipe, int8_t bls_threads, + int8_t worker_count, int64_t max_recsigs_age, const Consensus::Params& consensus_params) { @@ -241,7 +244,7 @@ void DashChainstateSetup(ChainstateManager& chainman, llmq_ctx.reset(); llmq_ctx = std::make_unique(*dmnman, *evodb, sporkman, *mempool, chainman, mn_sync, util::DbWrapperParams{.path = data_dir, .memory = llmq_dbs_in_memory, .wipe = llmq_dbs_wipe}, - bls_threads, max_recsigs_age); + bls_threads, worker_count, max_recsigs_age); mempool->ConnectManagers(dmnman.get(), llmq_ctx->isman.get()); // Enable CMNHFManager::{Process, Undo}Block mnhf_manager->ConnectManagers(llmq_ctx->qman.get()); diff --git a/src/node/chainstate.h b/src/node/chainstate.h index 0022b7637d87..0e3ead18e715 100644 --- a/src/node/chainstate.h +++ b/src/node/chainstate.h @@ -105,6 +105,7 @@ std::optional LoadChainstate(bool fReset, bool coins_db_in_memory, bool dash_dbs_in_memory, int8_t bls_threads, + int8_t worker_count, int64_t max_recsigs_age, std::function shutdown_requested = nullptr, std::function coins_error_cb = nullptr); @@ -126,6 +127,7 @@ void DashChainstateSetup(ChainstateManager& chainman, bool llmq_dbs_in_memory, bool llmq_dbs_wipe, int8_t bls_threads, + int8_t worker_count, int64_t max_recsigs_age, const Consensus::Params& consensus_params); diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 5c1aa4543124..08a619f7d2c9 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -146,7 +146,8 @@ void DashChainstateSetup(ChainstateManager& chainman, DashChainstateSetup(chainman, *Assert(node.govman.get()), *Assert(node.mn_metaman.get()), *Assert(node.mn_sync.get()), *Assert(node.sporkman.get()), node.chain_helper, node.cpoolman, node.dmnman, node.evodb, node.mnhf_manager, node.llmq_ctx, Assert(node.mempool.get()), node.args->GetDataDirNet(), llmq_dbs_in_memory, llmq_dbs_wipe, - llmq::DEFAULT_BLSCHECK_THREADS, llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE, consensus_params); + llmq::DEFAULT_BLSCHECK_THREADS, llmq::DEFAULT_WORKER_COUNT, llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE, + consensus_params); } void DashChainstateSetupClose(NodeContext& node) @@ -344,6 +345,7 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vector Date: Mon, 12 Jan 2026 00:28:52 +0530 Subject: [PATCH 7/8] refactor: consolidate Dash-specific chainstate params with `DashParams` --- src/init.cpp | 33 ++++++++++-------- src/llmq/context.cpp | 15 ++++---- src/llmq/context.h | 8 ++--- src/llmq/options.h | 9 +++++ src/node/chainstate.cpp | 23 +++---------- src/node/chainstate.h | 20 +++-------- src/test/util/setup_common.cpp | 34 +++++++++++++------ src/test/util/setup_common.h | 2 -- .../validation_chainstatemanager_tests.cpp | 4 +-- 9 files changed, 72 insertions(+), 76 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index 20744ee7ab6a..b86e88be5c1d 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -2005,7 +2005,6 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.mnhf_manager, node.llmq_ctx, Assert(node.mempool.get()), - args.GetDataDirNet(), fPruneMode, args.GetBoolArg("-addressindex", DEFAULT_ADDRESSINDEX), args.GetBoolArg("-spentindex", DEFAULT_SPENTINDEX), @@ -2017,19 +2016,25 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) cache_sizes.coins, /*block_tree_db_in_memory=*/false, /*coins_db_in_memory=*/false, - /*dash_dbs_in_memory=*/false, - /*bls_threads=*/[&args]() -> int8_t { - int8_t threads = args.GetIntArg("-parbls", llmq::DEFAULT_BLSCHECK_THREADS); - if (threads <= 0) { - // -parbls=0 means autodetect (number of cores - 1 validator threads) - // -parbls=-n means "leave n cores free" (number of cores - n - 1 validator threads) - threads += GetNumCores(); - } - // Subtract 1 because the main thread counts towards the par threads - return std::clamp(threads - 1, 0, llmq::MAX_BLSCHECK_THREADS); - }(), - llmq::DEFAULT_WORKER_COUNT, - args.GetIntArg("-maxrecsigsage", llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE), + llmq::DashParams{ + .bls_threads = [&args]() -> int8_t { + int8_t threads = args.GetIntArg("-parbls", llmq::DEFAULT_BLSCHECK_THREADS); + if (threads <= 0) { + // -parbls=0 means autodetect (number of cores - 1 validator threads) + // -parbls=-n means "leave n cores free" (number of cores - n - 1 validator threads) + threads += GetNumCores(); + } + // Subtract 1 because the main thread counts towards the par threads + return std::clamp(threads - 1, 0, llmq::MAX_BLSCHECK_THREADS); + }(), + .worker_count = llmq::DEFAULT_WORKER_COUNT, + .max_recsigs_age = args.GetIntArg("-maxrecsigsage", llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE), + .db_params = util::DbWrapperParams{ + .path = args.GetDataDirNet(), + .memory = false, + .wipe = fReset || fReindexChainState, + }, + }, /*shutdown_requested=*/ShutdownRequested, /*coins_error_cb=*/[]() { uiInterface.ThreadSafeMessageBox( diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index 04e7dcecf2d5..1bc4167b76fc 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -8,28 +8,27 @@ #include #include #include +#include #include #include #include #include LLMQContext::LLMQContext(CDeterministicMNManager& dmnman, CEvoDB& evo_db, CSporkManager& sporkman, CTxMemPool& mempool, - const ChainstateManager& chainman, const CMasternodeSync& mn_sync, - const util::DbWrapperParams& db_params, int8_t bls_threads, int8_t worker_count, - int64_t max_recsigs_age) : + const ChainstateManager& chainman, const CMasternodeSync& mn_sync, const llmq::DashParams& params) : bls_worker{std::make_shared()}, qsnapman{std::make_unique(evo_db)}, quorum_block_processor{std::make_unique(chainman.ActiveChainstate(), dmnman, evo_db, - *qsnapman, bls_threads)}, + *qsnapman, params.bls_threads)}, qman{std::make_unique(*bls_worker, dmnman, evo_db, *quorum_block_processor, *qsnapman, - chainman, db_params)}, - sigman{std::make_unique(*qman, db_params, max_recsigs_age)}, + chainman, params.db_params)}, + sigman{std::make_unique(*qman, params.db_params, params.max_recsigs_age)}, clhandler{std::make_unique(chainman.ActiveChainstate(), *qman, sporkman, mempool, mn_sync)}, isman{std::make_unique(*clhandler, chainman.ActiveChainstate(), *sigman, sporkman, - mempool, mn_sync, db_params)} + mempool, mn_sync, params.db_params)} { // Have to start it early to let VerifyDB check ChainLock signatures in coinbase - bls_worker->Start(worker_count); + bls_worker->Start(params.worker_count); } LLMQContext::~LLMQContext() diff --git a/src/llmq/context.h b/src/llmq/context.h index 61b4eb61421d..cf6a1f5eff5b 100644 --- a/src/llmq/context.h +++ b/src/llmq/context.h @@ -26,10 +26,8 @@ class CQuorumBlockProcessor; class CQuorumManager; class CQuorumSnapshotManager; class CSigningManager; +struct DashParams; } // namespace llmq -namespace util { -struct DbWrapperParams; -} // namespace util struct LLMQContext { public: @@ -37,9 +35,7 @@ struct LLMQContext { LLMQContext(const LLMQContext&) = delete; LLMQContext& operator=(const LLMQContext&) = delete; explicit LLMQContext(CDeterministicMNManager& dmnman, CEvoDB& evo_db, CSporkManager& sporkman, CTxMemPool& mempool, - const ChainstateManager& chainman, const CMasternodeSync& mn_sync, - const util::DbWrapperParams& db_params, int8_t bls_threads, int8_t worker_count, - int64_t max_recsigs_age); + const ChainstateManager& chainman, const CMasternodeSync& mn_sync, const llmq::DashParams& params); ~LLMQContext(); void Start(); diff --git a/src/llmq/options.h b/src/llmq/options.h index 6245315d8232..bf7f7835a4f4 100644 --- a/src/llmq/options.h +++ b/src/llmq/options.h @@ -5,6 +5,8 @@ #ifndef BITCOIN_LLMQ_OPTIONS_H #define BITCOIN_LLMQ_OPTIONS_H +#include + #include #include @@ -28,6 +30,13 @@ enum class QvvecSyncMode : int8_t { OnlyIfTypeMember = 1, }; +struct DashParams { + int8_t bls_threads; + int8_t worker_count; + int64_t max_recsigs_age; + util::DbWrapperParams db_params; +}; + /** -llmq-data-recovery default */ static constexpr bool DEFAULT_ENABLE_QUORUM_DATA_RECOVERY{true}; /** -watchquorums default, if true, we will connect to all new quorums and watch their communication */ diff --git a/src/node/chainstate.cpp b/src/node/chainstate.cpp index bc41e210565e..3c8db9275d45 100644 --- a/src/node/chainstate.cpp +++ b/src/node/chainstate.cpp @@ -47,7 +47,6 @@ std::optional LoadChainstate(bool fReset, std::unique_ptr& mnhf_manager, std::unique_ptr& llmq_ctx, CTxMemPool* mempool, - const fs::path& data_dir, bool fPruneMode, bool is_addrindex_enabled, bool is_spentindex_enabled, @@ -59,10 +58,7 @@ std::optional LoadChainstate(bool fReset, int64_t nCoinCacheUsage, bool block_tree_db_in_memory, bool coins_db_in_memory, - bool dash_dbs_in_memory, - int8_t bls_threads, - int8_t worker_count, - int64_t max_recsigs_age, + const llmq::DashParams& dash_params, std::function shutdown_requested, std::function coins_error_cb) { @@ -73,7 +69,7 @@ std::optional LoadChainstate(bool fReset, LOCK(cs_main); evodb.reset(); - evodb = std::make_unique(util::DbWrapperParams{.path = data_dir, .memory = dash_dbs_in_memory, .wipe = fReset || fReindexChainState}); + evodb = std::make_unique(dash_params.db_params); mnhf_manager.reset(); mnhf_manager = std::make_unique(*evodb, chainman); @@ -89,9 +85,7 @@ std::optional LoadChainstate(bool fReset, pblocktree.reset(new CBlockTreeDB(nBlockTreeDBCache, block_tree_db_in_memory, fReset)); DashChainstateSetup(chainman, govman, mn_metaman, mn_sync, sporkman, chain_helper, cpoolman, - dmnman, evodb, mnhf_manager, llmq_ctx, mempool, data_dir, dash_dbs_in_memory, - /*llmq_dbs_wipe=*/fReset || fReindexChainState, bls_threads, worker_count, - max_recsigs_age, consensus_params); + dmnman, evodb, mnhf_manager, llmq_ctx, mempool, dash_params, consensus_params); if (fReset) { pblocktree->WriteReindexing(true); @@ -223,12 +217,7 @@ void DashChainstateSetup(ChainstateManager& chainman, std::unique_ptr& mnhf_manager, std::unique_ptr& llmq_ctx, CTxMemPool* mempool, - const fs::path& data_dir, - bool llmq_dbs_in_memory, - bool llmq_dbs_wipe, - int8_t bls_threads, - int8_t worker_count, - int64_t max_recsigs_age, + const llmq::DashParams& dash_params, const Consensus::Params& consensus_params) { // Same logic as pblocktree @@ -242,9 +231,7 @@ void DashChainstateSetup(ChainstateManager& chainman, llmq_ctx->Stop(); } llmq_ctx.reset(); - llmq_ctx = std::make_unique(*dmnman, *evodb, sporkman, *mempool, chainman, mn_sync, - util::DbWrapperParams{.path = data_dir, .memory = llmq_dbs_in_memory, .wipe = llmq_dbs_wipe}, - bls_threads, worker_count, max_recsigs_age); + llmq_ctx = std::make_unique(*dmnman, *evodb, sporkman, *mempool, chainman, mn_sync, dash_params); mempool->ConnectManagers(dmnman.get(), llmq_ctx->isman.get()); // Enable CMNHFManager::{Process, Undo}Block mnhf_manager->ConnectManagers(llmq_ctx->qman.get()); diff --git a/src/node/chainstate.h b/src/node/chainstate.h index 0e3ead18e715..099d5b6f8285 100644 --- a/src/node/chainstate.h +++ b/src/node/chainstate.h @@ -24,13 +24,12 @@ class CMNHFManager; class CSporkManager; class CTxMemPool; struct LLMQContext; - namespace Consensus { struct Params; } // namespace Consensus -namespace fs { -class path; -} // namespace fs +namespace llmq { +struct DashParams; +} // namespace util namespace node { enum class ChainstateLoadingError { @@ -91,7 +90,6 @@ std::optional LoadChainstate(bool fReset, std::unique_ptr& mnhf_manager, std::unique_ptr& llmq_ctx, CTxMemPool* mempool, - const fs::path& data_dir, bool fPruneMode, bool is_addrindex_enabled, bool is_spentindex_enabled, @@ -103,10 +101,7 @@ std::optional LoadChainstate(bool fReset, int64_t nCoinCacheUsage, bool block_tree_db_in_memory, bool coins_db_in_memory, - bool dash_dbs_in_memory, - int8_t bls_threads, - int8_t worker_count, - int64_t max_recsigs_age, + const llmq::DashParams& dash_params, std::function shutdown_requested = nullptr, std::function coins_error_cb = nullptr); @@ -123,12 +118,7 @@ void DashChainstateSetup(ChainstateManager& chainman, std::unique_ptr& mnhf_manager, std::unique_ptr& llmq_ctx, CTxMemPool* mempool, - const fs::path& data_dir, - bool llmq_dbs_in_memory, - bool llmq_dbs_wipe, - int8_t bls_threads, - int8_t worker_count, - int64_t max_recsigs_age, + const llmq::DashParams& dash_params, const Consensus::Params& consensus_params); void DashChainstateSetupClose(std::unique_ptr& chain_helper, diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 08a619f7d2c9..09726ae1239b 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -139,15 +139,21 @@ std::unique_ptr MakePeerManager(CConnman& connman, void DashChainstateSetup(ChainstateManager& chainman, NodeContext& node, - bool llmq_dbs_in_memory, - bool llmq_dbs_wipe, const Consensus::Params& consensus_params) { DashChainstateSetup(chainman, *Assert(node.govman.get()), *Assert(node.mn_metaman.get()), *Assert(node.mn_sync.get()), *Assert(node.sporkman.get()), node.chain_helper, node.cpoolman, node.dmnman, node.evodb, node.mnhf_manager, - node.llmq_ctx, Assert(node.mempool.get()), node.args->GetDataDirNet(), llmq_dbs_in_memory, llmq_dbs_wipe, - llmq::DEFAULT_BLSCHECK_THREADS, llmq::DEFAULT_WORKER_COUNT, llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE, - consensus_params); + node.llmq_ctx, Assert(node.mempool.get()), + llmq::DashParams{ + .bls_threads = llmq::DEFAULT_BLSCHECK_THREADS, + .worker_count = llmq::DEFAULT_WORKER_COUNT, + .max_recsigs_age = llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE, + .db_params = util::DbWrapperParams{ + .path = node.args->GetDataDirNet(), + .memory = true, + .wipe = false, + }, + }, consensus_params); } void DashChainstateSetupClose(NodeContext& node) @@ -318,6 +324,7 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vectorGetDataDirNet(), fPruneMode, m_args.GetBoolArg("-addressindex", DEFAULT_ADDRESSINDEX), m_args.GetBoolArg("-spentindex", DEFAULT_SPENTINDEX), m_args.GetBoolArg("-timestampindex", DEFAULT_TIMESTAMPINDEX), chainparams.GetConsensus(), - m_args.GetBoolArg("-reindex-chainstate", false), + fReindexChainState, m_cache_sizes.block_tree_db, m_cache_sizes.coins_db, m_cache_sizes.coins, /*block_tree_db_in_memory=*/true, /*coins_db_in_memory=*/true, - /*dash_dbs_in_memory=*/true, - llmq::DEFAULT_BLSCHECK_THREADS, - llmq::DEFAULT_WORKER_COUNT, - llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE); + llmq::DashParams{ + .bls_threads = llmq::DEFAULT_BLSCHECK_THREADS, + .worker_count = llmq::DEFAULT_WORKER_COUNT, + .max_recsigs_age = llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE, + .db_params = util::DbWrapperParams{ + .path = Assert(m_node.args)->GetDataDirNet(), + .memory = true, + .wipe = fReindex.load() || fReindexChainState, + }, + }); assert(!maybe_load_error.has_value()); auto maybe_verify_error = VerifyLoadedChainstate( diff --git a/src/test/util/setup_common.h b/src/test/util/setup_common.h index 74f45605078a..95a7f3e6831c 100644 --- a/src/test/util/setup_common.h +++ b/src/test/util/setup_common.h @@ -93,8 +93,6 @@ std::unique_ptr MakePeerManager(CConnman& connman, bool ignore_incoming_txs); void DashChainstateSetup(ChainstateManager& chainman, node::NodeContext& node, - bool llmq_dbs_in_memory, - bool llmq_dbs_wipe, const Consensus::Params& consensus_params); void DashChainstateSetupClose(node::NodeContext& node); diff --git a/src/test/validation_chainstatemanager_tests.cpp b/src/test/validation_chainstatemanager_tests.cpp index 5352527c1a35..a1889ec3d32f 100644 --- a/src/test/validation_chainstatemanager_tests.cpp +++ b/src/test/validation_chainstatemanager_tests.cpp @@ -53,7 +53,7 @@ BOOST_AUTO_TEST_CASE(chainstatemanager) /*cache_size_bytes=*/1 << 23, /*in_memory=*/true, /*should_wipe=*/false); WITH_LOCK(::cs_main, c1.InitCoinsCache(1 << 23)); - DashChainstateSetup(manager, m_node, /*llmq_dbs_in_memory=*/true, /*llmq_dbs_wipe=*/false, consensus_params); + DashChainstateSetup(manager, m_node, consensus_params); BOOST_CHECK(!manager.IsSnapshotActive()); BOOST_CHECK(WITH_LOCK(::cs_main, return !manager.IsSnapshotValidated())); @@ -85,7 +85,7 @@ BOOST_AUTO_TEST_CASE(chainstatemanager) ); chainstates.push_back(&c2); - DashChainstateSetup(manager, m_node, /*llmq_dbs_in_memory=*/true, /*llmq_dbs_wipe=*/false, consensus_params); + DashChainstateSetup(manager, m_node, consensus_params); BOOST_CHECK_EQUAL(manager.SnapshotBlockhash().value(), snapshot_blockhash); From 176f929759f94863911d8cb14bee89a5f7f9edf7 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 11 Jan 2026 22:07:32 +0530 Subject: [PATCH 8/8] refactor: isolate DKG session networking code in multiple entities --- src/net_processing.cpp | 53 ++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index e5b83dc0f88f..a0e916d2a209 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -964,6 +964,9 @@ class PeerManagerImpl final : public PeerManager bool AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_recent_confirmed_transactions_mutex); + bool DKGSessionAlreadyHave(const CInv& inv); + MessageProcessingResult DKGSessionProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type, CDataStream& vRecv); + /** * Filter for transactions that were recently rejected by the mempool. * These are not rerequested until the chain tip changes, at which point @@ -2351,8 +2354,7 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) case MSG_QUORUM_COMPLAINT: case MSG_QUORUM_JUSTIFICATION: case MSG_QUORUM_PREMATURE_COMMITMENT: - return (m_observer_ctx && m_observer_ctx->qdkgsman->AlreadyHave(inv)) - || (m_active_ctx && m_active_ctx->qdkgsman->AlreadyHave(inv)); + return DKGSessionAlreadyHave(inv); case MSG_QUORUM_RECOVERED_SIG: // TODO: move it to NetSigning return m_llmq_ctx->sigman->AlreadyHave(inv); @@ -2377,6 +2379,16 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) return true; } +bool PeerManagerImpl::DKGSessionAlreadyHave(const CInv& inv) +{ + if (m_observer_ctx) { + return m_observer_ctx->qdkgsman->AlreadyHave(inv); + } else if (m_active_ctx) { + return m_active_ctx->qdkgsman->AlreadyHave(inv); + } + return false; +} + bool PeerManagerImpl::AlreadyHaveBlock(const uint256& block_hash) { return m_chainman.m_blockman.LookupBlockIndex(block_hash) != nullptr; @@ -5454,24 +5466,9 @@ void PeerManagerImpl::ProcessMessage( PostProcessMessage(m_cj_walletman->processMessage(pfrom, m_chainman.ActiveChainstate(), m_connman, m_mempool, msg_type, vRecv), pfrom.GetId()); } if (m_active_ctx) { - assert(is_masternode); m_active_ctx->shareman->ProcessMessage(pfrom, msg_type, vRecv); - PostProcessMessage(m_active_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId()); - } - if (m_observer_ctx) { - assert(!is_masternode); - PostProcessMessage(m_observer_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId()); - } - if (!m_active_ctx && !m_observer_ctx) { - assert(!is_masternode); - if (msg_type == NetMsgType::QCONTRIB - || msg_type == NetMsgType::QCOMPLAINT - || msg_type == NetMsgType::QJUSTIFICATION - || msg_type == NetMsgType::QPCOMMITMENT - || msg_type == NetMsgType::QWATCH) { - Misbehaving(pfrom.GetId(), /*howmuch=*/10); - } } + PostProcessMessage(DKGSessionProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_sporkman.ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(CMNAuth::ProcessMessage(pfrom, peer->m_their_services, m_connman, m_mn_metaman, (m_active_ctx ? m_active_ctx->nodeman.get() : nullptr), m_mn_sync, m_dmnman->GetListAtChainTip(), msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId()); @@ -5501,6 +5498,26 @@ void PeerManagerImpl::ProcessMessage( return; } +MessageProcessingResult PeerManagerImpl::DKGSessionProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type, CDataStream& vRecv) +{ + if (m_active_ctx) { + assert(is_masternode); + return m_active_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv); + } else if (m_observer_ctx) { + assert(!is_masternode); + return m_observer_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv); + } + assert(!is_masternode); + if (msg_type == NetMsgType::QCONTRIB + || msg_type == NetMsgType::QCOMPLAINT + || msg_type == NetMsgType::QJUSTIFICATION + || msg_type == NetMsgType::QPCOMMITMENT + || msg_type == NetMsgType::QWATCH) { + return MisbehavingError{10}; + } + return {}; +} + bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer) { {