Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(skymp5-server): add local cache for mongodb #2026

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions skymp5-server/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ find_package(OpenSSL REQUIRED)

find_package(bsa CONFIG REQUIRED)

find_package(libzippp REQUIRED)

foreach(target ${VCPKG_DEPENDENT})
target_include_directories(${target} PUBLIC ${CATCH_INCLUDE_DIR})
target_include_directories(${target} PUBLIC ${SPARSEPP_INCLUDE_DIR})
Expand All @@ -122,6 +124,7 @@ foreach(target ${VCPKG_DEPENDENT})
target_link_libraries(${target} PUBLIC mongo::mongocxx_static mongo::bsoncxx_static mongo::mongoc_static mongo::bson_static)
target_link_libraries(${target} PUBLIC OpenSSL::SSL OpenSSL::Crypto)
target_link_libraries(${target} PUBLIC bsa::bsa)
target_link_libraries(${target} PUBLIC libzippp::libzippp)

if(UNIX)
target_link_libraries(${target} PUBLIC pthread resolv rt m)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
#include "ArchiveDatabase.h"

#include "FileUtils.h"
#include "MappedBuffer.h"
#include "libzippp/libzippp.h"
#include <filesystem>
#include <fstream>
#include <nlohmann/json.hpp>
#include <zlib.h>

namespace {
inline uint32_t ZlibGetCRC32Checksum(const void* readBuffer, z_size_t length)
{
uLong hash = crc32_z(0L, Z_NULL, 0);
hash = crc32_z(hash, static_cast<const Bytef*>(readBuffer), length);
return static_cast<uint32_t>(hash);
}
}

constexpr auto kDbHashFileName = "_dbhash";
constexpr auto kChecksumExtension = ".checksum";
constexpr auto kCrc32Key = "crc32_checksum";

struct ArchiveDatabase::Impl
{
Impl(std::string filePath_, std::shared_ptr<spdlog::logger> logger_)
: filePath(std::move(filePath_))
, logger(std::move(logger_))
{
}

std::string filePath;
std::shared_ptr<spdlog::logger> logger;
};

ArchiveDatabase::ArchiveDatabase(std::string filePath_,
std::shared_ptr<spdlog::logger> logger_)
: pImpl(std::make_shared<Impl>(std::move(filePath_), std::move(logger_)))
{
}

ArchiveDatabase::~ArchiveDatabase() = default;

UpsertResult ArchiveDatabase::Upsert(
std::vector<std::optional<MpChangeForm>>&& changeForms)
{
auto filePathAbsolute = std::filesystem::absolute(pImpl->filePath).string();

libzippp::ZipArchive archive(filePathAbsolute.data());

archive.open(libzippp::ZipArchive::Write);

std::vector<std::unique_ptr<std::string>> buffers;

for (auto& changeForm : changeForms) {
if (changeForm == std::nullopt) {
continue;
}

// Data to be added or updated
std::string data = MpChangeForm::ToJson(*changeForm).dump(2);
std::string fileName = changeForm->formDesc.ToString('_') + ".json";
std::string filePath = fileName;

buffers.push_back(std::make_unique<std::string>(data));

// Add new file or replace existing one
archive.addData(filePath, buffers.back()->data(), buffers.back()->size());
}

archive.close();

return { changeForms.size(), std::nullopt };
}

void ArchiveDatabase::Iterate(const IterateCallback& iterateCallback)
{
auto filePathAbsolute = std::filesystem::absolute(pImpl->filePath).string();

libzippp::ZipArchive archive(filePathAbsolute.data());

archive.open(libzippp::ZipArchive::ReadOnly);

std::vector<libzippp::ZipEntry> entries = archive.getEntries();
std::vector<libzippp::ZipEntry>::iterator it;

simdjson::dom::parser p;

for (it = entries.begin(); it != entries.end(); ++it) {
libzippp::ZipEntry entry = *it;
std::string name = entry.getName();
int size = entry.getSize();

if (name == kDbHashFileName) {
continue;
}

std::string textData = entry.readAsText();

try {
auto result = p.parse(textData).value();
iterateCallback(MpChangeForm::JsonToChangeForm(result));
} catch (std::exception& e) {
pImpl->logger->error("Parsing or loading of {} failed with {}", name,
e.what());
}
}
}

void ArchiveDatabase::SetDbHash(const std::string& data)
{
auto filePathAbsolute = std::filesystem::absolute(pImpl->filePath).string();

libzippp::ZipArchive archive(filePathAbsolute.data());

archive.open(libzippp::ZipArchive::Write);

// Add new file or replace existing one
archive.addData(kDbHashFileName, data.data(), data.size());

// needs to be done before buffer invalidation
archive.close();
}

std::optional<std::string> ArchiveDatabase::GetDbHash() const
{
auto filePathAbsolute = std::filesystem::absolute(pImpl->filePath).string();

libzippp::ZipArchive archive(filePathAbsolute.data());

archive.open(libzippp::ZipArchive::ReadOnly);

std::vector<libzippp::ZipEntry> entries = archive.getEntries();
std::vector<libzippp::ZipEntry>::iterator it;

for (it = entries.begin(); it != entries.end(); ++it) {
libzippp::ZipEntry entry = *it;
std::string name = entry.getName();

if (name == kDbHashFileName) {
return entry.readAsText();
}
}

return std::nullopt;
}

uint32_t ArchiveDatabase::GetArchiveChecksum() const
{
if (std::filesystem::exists(pImpl->filePath)) {
Viet::MappedBuffer mappedBuffer(pImpl->filePath);
return ZlibGetCRC32Checksum(mappedBuffer.GetData(),
mappedBuffer.GetLength());
}
return 0;
}

std::optional<uint32_t> ArchiveDatabase::ReadArchiveChecksumExpected() const
{
auto checksumFilePath = pImpl->filePath + kChecksumExtension;
std::string fileContents;

try {
fileContents = Viet::ReadFileIntoString(checksumFilePath);
} catch (std::exception& e) {
pImpl->logger->info("Unable to read archive checksum: {}", e.what());
return std::nullopt;
}

nlohmann::json document;

try {
document = nlohmann::json::parse(fileContents);
} catch (nlohmann::json::parse_error& e) {
pImpl->logger->warn("Failed to parse archive checksum: {}", e.what());
return std::nullopt;
}

uint32_t checksum = 0;

try {
checksum = document[kCrc32Key].get<uint32_t>();
} catch (nlohmann::json::exception& e) {
pImpl->logger->warn("Failed to read archive checksum: {}", e.what());
return std::nullopt;
}

return checksum;
}

void ArchiveDatabase::WriteArchiveChecksumExpected(uint32_t checksum)
{
nlohmann::json document{ { kCrc32Key, checksum } };

auto filePathAbsolute =
std::filesystem::absolute(pImpl->filePath + kChecksumExtension).string();

std::fstream file(filePathAbsolute, std::ios::out | std::ios::binary);
if (!file.is_open()) {
pImpl->logger->error("Failed to open file {}", filePathAbsolute);
return;
}

file << document.dump(2);
}

void ArchiveDatabase::Unlink()
{
auto filePathAbsolute = std::filesystem::absolute(pImpl->filePath).string();

libzippp::ZipArchive archive(filePathAbsolute.data());

archive.open(libzippp::ZipArchive::Write);

archive.unlink();

archive.close();
}

bool ArchiveDatabase::Exists() const
{
return std::filesystem::exists(pImpl->filePath);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once
#include "IDatabase.h"
#include <spdlog/spdlog.h>

// This is for internal use only and not referenced in the server settings at
// this time.
class ArchiveDatabase : public IDatabase
{
public:
ArchiveDatabase(std::string filePath_,
std::shared_ptr<spdlog::logger> logger_);
~ArchiveDatabase();

UpsertResult Upsert(
std::vector<std::optional<MpChangeForm>>&& changeForms) override;
void Iterate(const IterateCallback& iterateCallback) override;

// See MongoWithCacheDatabase.cpp
void SetDbHash(const std::string& data);
std::optional<std::string> GetDbHash() const;

uint32_t GetArchiveChecksum() const;

std::optional<uint32_t> ReadArchiveChecksumExpected() const;
void WriteArchiveChecksumExpected(uint32_t checksum);

void Unlink();

bool Exists() const;

private:
struct Impl;
std::shared_ptr<Impl> pImpl;
};
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <nlohmann/json.hpp>
#include <spdlog/spdlog.h>

#include "ArchiveDatabase.h"
#include "FileDatabase.h"
#include "MigrationDatabase.h"
#include "MongoDatabase.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ FileDatabase::FileDatabase(std::string directory_,
std::filesystem::create_directories(p);
}

size_t FileDatabase::Upsert(
UpsertResult FileDatabase::Upsert(
std::vector<std::optional<MpChangeForm>>&& changeForms)
{
try {
Expand Down Expand Up @@ -63,7 +63,7 @@ size_t FileDatabase::Upsert(
}
}

return nUpserted;
return { nUpserted, std::nullopt };
} catch (std::exception& e) {
throw UpsertFailedException(std::move(changeForms), e.what());
}
Expand Down Expand Up @@ -92,7 +92,7 @@ void FileDatabase::Iterate(const IterateCallback& iterateCallback)
auto result = parser.parse(jsonDump).value();
iterateCallback(MpChangeForm::JsonToChangeForm(result));
} catch (std::exception& e) {
pImpl->logger->error("Parsing of {} failed with {}",
pImpl->logger->error("Parsing or loading of {} failed with {}",
entry.path().string(), e.what());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class FileDatabase : public IDatabase
FileDatabase(std::string directory_,
std::shared_ptr<spdlog::logger> logger_);

size_t Upsert(
UpsertResult Upsert(
std::vector<std::optional<MpChangeForm>>&& changeForms) override;
void Iterate(const IterateCallback& iterateCallback) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class UpsertFailedException : public std::runtime_error
const std::vector<std::optional<MpChangeForm>> affectedForms;
};

struct UpsertResult
{
size_t numUpserted = 0;
std::optional<std::string> changeFormsCollectionHash;
};

class IDatabase
{
public:
Expand All @@ -36,7 +42,7 @@ class IDatabase
// Returns numbers of change forms inserted or updated successfully (Suitable
// for logging). In practice, it should be equal to `changeForms.size()` when
// saving succeed.
virtual size_t Upsert(
virtual UpsertResult Upsert(
std::vector<std::optional<MpChangeForm>>&& changeForms) = 0;

virtual void Iterate(const IterateCallback& iterateCallback) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ MigrationDatabase::MigrationDatabase(std::shared_ptr<IDatabase> newDatabase,
std::back_inserter(chunk));

// Upsert the current chunk
size_t numUpserted = newDatabase->Upsert(std::move(chunk));
size_t numUpserted = newDatabase->Upsert(std::move(chunk)).numUpserted;
totalUpserted += numUpserted;

spdlog::info("MigrationDatabase: upserted chunk {}/{} ({} changeForms)",
Expand All @@ -110,12 +110,12 @@ MigrationDatabase::MigrationDatabase(std::shared_ptr<IDatabase> newDatabase,
pImpl->exit();
}

size_t MigrationDatabase::Upsert(
UpsertResult MigrationDatabase::Upsert(
std::vector<std::optional<MpChangeForm>>&& changeForms)
{
spdlog::error("MigrationDatabase::Upsert - should never be reached");
pImpl->terminate();
return 0;
return { 0, std::nullopt };
}

void MigrationDatabase::Iterate(const IterateCallback& iterateCallback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class MigrationDatabase : public IDatabase
std::shared_ptr<IDatabase> oldDatabase,
std::function<void()> exit = [] { std::exit(0); },
std::function<void()> terminate = [] { std::terminate(); });
size_t Upsert(
UpsertResult Upsert(
std::vector<std::optional<MpChangeForm>>&& changeForms) override;
void Iterate(const IterateCallback& iterateCallback) override;

Expand Down
Loading
Loading