diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index 78cd705b68b743..659bee2bd48341 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -31,7 +31,7 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CEvoDB& evo qdkgsman{std::make_unique(*bls_worker, chainstate, connman, *dkg_debugman, *quorum_block_processor, sporkman, peerman, unit_tests, wipe)}, qman{[&]() -> llmq::CQuorumManager* const { assert(llmq::quorumManager == nullptr); - llmq::quorumManager = std::make_unique(*bls_worker, chainstate, connman, *qdkgsman, evo_db, *quorum_block_processor, ::masternodeSync); + llmq::quorumManager = std::make_unique(*bls_worker, chainstate, connman, *qdkgsman, evo_db, *quorum_block_processor, ::masternodeSync, unit_tests, wipe); return llmq::quorumManager.get(); }()}, sigman{std::make_unique(connman, *llmq::quorumManager, peerman, unit_tests, wipe)}, diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index b9e83704bba22a..776e706c08997a 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -158,7 +158,7 @@ int CQuorum::GetMemberIndex(const uint256& proTxHash) const return -1; } -void CQuorum::WriteContributions(CEvoDB& evoDb) const +void CQuorum::WriteContributions(const std::unique_ptr& db) const { uint256 dbKey = MakeQuorumKey(*this); @@ -169,19 +169,19 @@ void CQuorum::WriteContributions(CEvoDB& evoDb) const for (auto& pubkey : *quorumVvec) { s << CBLSPublicKeyVersionWrapper(pubkey, false); } - evoDb.GetRawDB().Write(std::make_pair(DB_QUORUM_QUORUM_VVEC, dbKey), s); + db->Write(std::make_pair(DB_QUORUM_QUORUM_VVEC, dbKey), s); } if (skShare.IsValid()) { - evoDb.GetRawDB().Write(std::make_pair(DB_QUORUM_SK_SHARE, dbKey), skShare); + db->Write(std::make_pair(DB_QUORUM_SK_SHARE, dbKey), skShare); } } -bool CQuorum::ReadContributions(CEvoDB& evoDb) +bool CQuorum::ReadContributions(const std::unique_ptr& db) { uint256 dbKey = MakeQuorumKey(*this); CDataStream s(SER_DISK, CLIENT_VERSION); - if (!evoDb.GetRawDB().ReadDataStream(std::make_pair(DB_QUORUM_QUORUM_VVEC, dbKey), s)) { + if (!db->ReadDataStream(std::make_pair(DB_QUORUM_QUORUM_VVEC, dbKey), s)) { return false; } @@ -197,23 +197,25 @@ bool CQuorum::ReadContributions(CEvoDB& evoDb) quorumVvec = std::make_shared>(std::move(qv)); // We ignore the return value here as it is ok if this fails. If it fails, it usually means that we are not a // member of the quorum but observed the whole DKG process to have the quorum verification vector. - evoDb.GetRawDB().Read(std::make_pair(DB_QUORUM_SK_SHARE, dbKey), skShare); + db->Read(std::make_pair(DB_QUORUM_SK_SHARE, dbKey), skShare); return true; } CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGSessionManager& _dkgManager, - CEvoDB& _evoDb, CQuorumBlockProcessor& _quorumBlockProcessor, const std::unique_ptr& mn_sync) : + CEvoDB& _evoDb, CQuorumBlockProcessor& _quorumBlockProcessor, const std::unique_ptr& mn_sync, + bool unit_tests, bool wipe) : + db(std::make_unique(unit_tests ? "" : (GetDataDir() / "llmq/quorumdb"), 1 << 20, unit_tests, wipe)), blsWorker(_blsWorker), m_chainstate(chainstate), connman(_connman), dkgManager(_dkgManager), - m_evoDb(_evoDb), quorumBlockProcessor(_quorumBlockProcessor), m_mn_sync(mn_sync) { utils::InitQuorumsCache(mapQuorumsCache, false); quorumThreadInterrupt.reset(); + MigrateOldQuorumDB(_evoDb); } void CQuorumManager::Start() @@ -392,11 +394,11 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l quorum->Init(std::move(qc), pQuorumBaseBlockIndex, minedBlockHash, members); bool hasValidVvec = false; - if (quorum->ReadContributions(m_evoDb)) { + if (WITH_LOCK(cs_db, return quorum->ReadContributions(db))) { hasValidVvec = true; } else { if (BuildQuorumContributions(quorum->qc, quorum)) { - quorum->WriteContributions(m_evoDb); + WITH_LOCK(cs_db, quorum->WriteContributions(db)); hasValidVvec = true; } else { LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- llmqType[%d] quorumIndex[%d] quorum.ReadContributions and BuildQuorumContributions for quorumHash[%s] failed\n", __func__, ToUnderlying(llmqType), quorum->qc->quorumIndex, quorum->qc->quorumHash.ToString()); @@ -835,7 +837,7 @@ PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_t return errorHandler("Invalid secret key share received"); } } - pQuorum->WriteContributions(m_evoDb); + WITH_LOCK(cs_db, pQuorum->WriteContributions(db)); return {}; } return {}; @@ -1087,11 +1089,73 @@ void CQuorumManager::StartCleanupOldQuorumDataThread(const CBlockIndex* pIndex) } if (!quorumThreadInterrupt) { - DataCleanupHelper(m_evoDb.GetRawDB(), dbKeysToSkip); + WITH_LOCK(cs_db, DataCleanupHelper(*db, dbKeysToSkip)); } LogPrint(BCLog::LLMQ, "CQuorumManager::StartCleanupOldQuorumDataThread -- done. time=%d\n", t.count()); }); } +void CQuorumManager::MigrateOldQuorumDB(CEvoDB& evoDb) const +{ + LOCK(cs_db); + if (!db->IsEmpty()) return; + + const auto prefixes = {DB_QUORUM_QUORUM_VVEC, DB_QUORUM_SK_SHARE}; + + LogPrint(BCLog::LLMQ, "CQuorumManager::%d -- start\n", __func__); + + CDBBatch batch(*db); + std::unique_ptr pcursor(evoDb.GetRawDB().NewIterator()); + + for (const auto& prefix : prefixes) { + auto start = std::make_tuple(prefix, uint256()); + pcursor->Seek(start); + + int count{0}; + while (pcursor->Valid()) { + decltype(start) k; + CDataStream s(SER_DISK, CLIENT_VERSION); + CBLSSecretKey sk; + + if (!pcursor->GetKey(k) || std::get<0>(k) != prefix) { + break; + } + + if (prefix == DB_QUORUM_QUORUM_VVEC) { + if (!evoDb.GetRawDB().ReadDataStream(k, s)) { + break; + } + batch.Write(k, s); + } + if (prefix == DB_QUORUM_SK_SHARE) { + if (!pcursor->GetValue(sk)) { + break; + } + batch.Write(k, sk); + } + + if (batch.SizeEstimate() >= (1 << 24)) { + db->WriteBatch(batch); + batch.Clear(); + } + + ++count; + pcursor->Next(); + } + + db->WriteBatch(batch); + + LogPrint(BCLog::LLMQ, "CQuorumManager::%d -- %s moved %d\n", __func__, prefix, count); + } + + pcursor.reset(); + db->CompactFull(); + + DataCleanupHelper(evoDb.GetRawDB(), {}); + evoDb.CommitRootTransaction(); + + LogPrint(BCLog::LLMQ, "CQuorumManager::%d -- done\n", __func__); +} + } // namespace llmq diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index 1605271b77a398..d090a98c409a43 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -201,8 +201,8 @@ class CQuorum CBLSSecretKey GetSkShare() const; private: - void WriteContributions(CEvoDB& evoDb) const; - bool ReadContributions(CEvoDB& evoDb); + void WriteContributions(const std::unique_ptr& db) const; + bool ReadContributions(const std::unique_ptr& db); }; /** @@ -214,11 +214,13 @@ class CQuorum class CQuorumManager { private: + mutable Mutex cs_db; + std::unique_ptr db GUARDED_BY(cs_db) {nullptr}; + CBLSWorker& blsWorker; CChainState& m_chainstate; CConnman& connman; CDKGSessionManager& dkgManager; - CEvoDB& m_evoDb; CQuorumBlockProcessor& quorumBlockProcessor; const std::unique_ptr& m_mn_sync; @@ -234,7 +236,8 @@ class CQuorumManager public: CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGSessionManager& _dkgManager, - CEvoDB& _evoDb, CQuorumBlockProcessor& _quorumBlockProcessor, const std::unique_ptr& mn_sync); + CEvoDB& _evoDb, CQuorumBlockProcessor& _quorumBlockProcessor, const std::unique_ptr& mn_sync, + bool unit_tests, bool wipe); ~CQuorumManager() { Stop(); }; void Start(); @@ -274,6 +277,7 @@ class CQuorumManager void StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMask) const; void StartCleanupOldQuorumDataThread(const CBlockIndex* pIndex) const; + void MigrateOldQuorumDB(CEvoDB& evoDb) const; }; extern std::unique_ptr quorumManager;