diff --git a/src/cmd_hash.cc b/src/cmd_hash.cc index a90b3050e..60f2e3b01 100644 --- a/src/cmd_hash.cc +++ b/src/cmd_hash.cc @@ -151,7 +151,7 @@ void HGetAllCmd::DoCmd(PClient* client) { int64_t total_fv = 0; int64_t cursor = 0; int64_t next_cursor = 0; - size_t raw_limit = g_config.GetMaxClientResponseSize(); + size_t raw_limit = g_config.max_client_response_size.load(); std::string raw; std::vector fvs; storage::Status s; diff --git a/src/config.cc b/src/config.cc index d570c3a14..e2b6e62a9 100644 --- a/src/config.cc +++ b/src/config.cc @@ -81,9 +81,9 @@ Status StringValue::SetValue(const std::string& value) { Status BoolValue::SetValue(const std::string& value) { if (pstd::StringEqualCaseInsensitive(value, "yes")) { - *value_ = true; + value_->store(true); } else { - *value_ = false; + value_->store(false); } return Status::OK(); } @@ -101,7 +101,7 @@ Status NumberValue::SetValue(const std::string& value) { if (v > value_max_) { v = value_max_; } - *value_ = v; + value_->store(v); return Status::OK(); } @@ -109,24 +109,24 @@ PConfig::PConfig() { AddBool("daemonize", &CheckYesNo, false, &daemonize); AddString("ip", false, {&ip}); AddNumberWihLimit("port", false, &port, PORT_LIMIT_MIN, PORT_LIMIT_MAX); - AddNumber("timeout", true, &timeout_); - AddString("db-path", false, std::vector{&dbpath}); - AddStrinWithFunc("loglevel", &CheckLogLevel, true, {&loglevel}); - AddString("logfile", true, {&logdir}); + AddNumber("timeout", true, &timeout); + AddString("db-path", false, std::vector{&db_path}); + AddStrinWithFunc("loglevel", &CheckLogLevel, false, {&log_level}); + AddString("logfile", false, {&log_dir}); AddNumberWihLimit("databases", false, &databases, 1, DBNUMBER_MAX); AddString("requirepass", true, {&password_}); - AddNumber("maxclients", true, &maxclients_); + AddNumber("maxclients", true, &max_clients); AddNumberWihLimit("worker-threads", false, &worker_threads_num, 1, THREAD_MAX); AddNumberWihLimit("slave-threads", false, &worker_threads_num, 1, THREAD_MAX); - AddNumber("slowlog-log-slower-than", true, &slowlogtime_); - AddNumber("slowlog-max-len", true, &slowlogmaxlen_); + AddNumber("slowlog-log-slower-than", true, &slow_log_time); + AddNumber("slowlog-max-len", true, &slow_log_max_len); AddNumberWihLimit("db-instance-num", true, &db_instance_num, 1, ROCKSDB_INSTANCE_NUMBER_MAX); - AddNumberWihLimit("fast-cmd-threads-num", false, &fast_cmd_threads_num_, 1, THREAD_MAX); - AddNumberWihLimit("slow-cmd-threads-num", false, &slow_cmd_threads_num_, 1, THREAD_MAX); - AddNumber("max-client-response-size", true, &max_client_response_size_); - AddString("runid", false, {&runid}); - AddNumber("small-compaction-threshold", true, &small_compaction_threshold_); - AddNumber("small-compaction-duration-threshold", true, &small_compaction_duration_threshold_); + AddNumberWihLimit("fast-cmd-threads-num", false, &fast_cmd_threads_num, 1, THREAD_MAX); + AddNumberWihLimit("slow-cmd-threads-num", false, &slow_cmd_threads_num, 1, THREAD_MAX); + AddNumber("max-client-response-size", true, &max_client_response_size); + AddString("runid", false, {&run_id}); + AddNumber("small-compaction-threshold", true, &small_compaction_threshold); + AddNumber("small-compaction-duration-threshold", true, &small_compaction_duration_threshold); // rocksdb config AddNumber("rocksdb-max-subcompactions", false, &rocksdb_max_subcompactions); @@ -148,6 +148,7 @@ bool PConfig::LoadFromFile(const std::string& file_name) { return false; } + // During the initialization phase, so there is no need to hold a lock. for (auto& [key, value] : parser_.GetMap()) { if (auto iter = config_map_.find(key); iter != config_map_.end()) { auto& v = config_map_[key]; @@ -161,8 +162,8 @@ bool PConfig::LoadFromFile(const std::string& file_name) { // Handle separately std::vector master(SplitString(parser_.GetData("slaveof"), ' ')); if (master.size() == 2) { - masterIp_ = master[0]; - masterPort_ = static_cast(std::stoi(master[1])); + master_ip_ = master[0]; + master_port = static_cast(std::stoi(master[1])); } std::vector alias(SplitString(parser_.GetData("rename-command"), ' ')); @@ -170,7 +171,7 @@ bool PConfig::LoadFromFile(const std::string& file_name) { for (auto it(alias.begin()); it != alias.end();) { const PString& oldCmd = *(it++); const PString& newCmd = *(it++); - aliases_[oldCmd] = newCmd; + aliases[oldCmd] = newCmd; } } @@ -179,23 +180,31 @@ bool PConfig::LoadFromFile(const std::string& file_name) { void PConfig::Get(const std::string& key, std::vector* values) const { values->clear(); - std::shared_lock sharedLock(mutex_); for (const auto& [k, v] : config_map_) { if (key == "*" || pstd::StringMatch(key.c_str(), k.c_str(), 1)) { values->emplace_back(k); + if (v->NeedLock()) { + std::shared_lock l(mutex_); + values->emplace_back(v->Value()); + continue; + } values->emplace_back(v->Value()); } } } -Status PConfig::Set(std::string key, const std::string& value, bool force) { +Status PConfig::Set(std::string key, const std::string& value, bool init_stage) { std::transform(key.begin(), key.end(), key.begin(), ::tolower); auto iter = config_map_.find(key); if (iter == config_map_.end()) { return Status::NotFound("Non-existent configuration items."); } - std::lock_guard Lock(mutex_); - return iter->second->Set(value, force); + if (iter->second->NeedLock()) { + std::lock_guard l(mutex_); + auto s = iter->second->Set(value, init_stage); + return s; + } + return iter->second->Set(value, init_stage); } rocksdb::Options PConfig::GetRocksDBOptions() { diff --git a/src/config.h b/src/config.h index 3a7a9acdc..ef42cb605 100644 --- a/src/config.h +++ b/src/config.h @@ -25,6 +25,8 @@ namespace pikiwidb { using Status = rocksdb::Status; using CheckFunc = std::function; +class PConfig; +extern PConfig g_config; class BaseValue { public: @@ -39,7 +41,8 @@ class BaseValue { Status Set(const std::string& value, bool force); - bool ReWritable() { return rewritable_; } + // default false,Only rewritable string types require lock protection. + virtual bool NeedLock() { return false; } protected: virtual Status SetValue(const std::string&) = 0; @@ -67,45 +70,47 @@ class StringValue : public BaseValue { std::string Value() const override { return MergeString(values_, delimiter_); }; + bool NeedLock() override { return rewritable_; } + private: Status SetValue(const std::string& value) override; std::vector values_; - char delimiter_; + char delimiter_ = 0; }; template class NumberValue : public BaseValue { public: - NumberValue(const std::string& key, CheckFunc check_func_ptr, bool rewritable, T* value_ptr, + NumberValue(const std::string& key, CheckFunc check_func_ptr, bool rewritable, std::atomic* value_ptr, T min = std::numeric_limits::min(), T max = std::numeric_limits::max()) : BaseValue(key, check_func_ptr, rewritable), value_(value_ptr), value_min_(min), value_max_(max) { assert(value_ != nullptr); assert(value_min_ <= value_max_); }; - std::string Value() const override { return std::to_string(*value_); } + std::string Value() const override { return std::to_string(value_->load()); } private: Status SetValue(const std::string& value) override; - T* value_; + std::atomic* value_ = nullptr; T value_min_; T value_max_; }; class BoolValue : public BaseValue { public: - BoolValue(const std::string& key, CheckFunc check_func_ptr, bool rewritable, bool* value_ptr) + BoolValue(const std::string& key, CheckFunc check_func_ptr, bool rewritable, std::atomic* value_ptr) : BaseValue(key, check_func_ptr, rewritable), value_(value_ptr) { assert(value_ != nullptr); }; - std::string Value() const override { return *value_ ? "yes" : "no"; }; + std::string Value() const override { return value_->load() ? "yes" : "no"; }; private: Status SetValue(const std::string& value) override; - bool* value_; + std::atomic* value_ = nullptr; }; using ValuePrt = std::unique_ptr; @@ -120,71 +125,76 @@ class PConfig { void Get(const std::string&, std::vector*) const; Status Set(std::string, const std::string&, bool force = false); - public: - int GetTimeout() const { - std::shared_lock SharedLock(mutex_); - return timeout_; - } - std::string GetPassword() const { std::shared_lock SharedLock(mutex_); return password_; } - int GetSlowlogTime() const { - std::shared_lock SharedLock(mutex_); - return slowlogtime_; - } - - int GetSlowlogMaxLen() const { - std::shared_lock SharedLock(mutex_); - return slowlogmaxlen_; - } - - int GetFastCmdThreadsNumber() const { - std::shared_lock SharedLock(mutex_); - return fast_cmd_threads_num_; - } - - int GetSlowCmdThreadsNumber() const { - std::shared_lock SharedLock(mutex_); - return slow_cmd_threads_num_; - } - - int64_t GetMaxClientResponseSize() const { - std::shared_lock SharedLock(mutex_); - return max_client_response_size_; - } - - uint64_t GetSmallCompactionThreshold() const { - std::shared_lock SharedLock(mutex_); - return small_compaction_threshold_; - } - - uint64_t GetSmallCompactionDurationThreshold() const { - std::shared_lock SharedLock(mutex_); - return small_compaction_duration_threshold_; - } - std::string GetMasterAuth() const { std::shared_lock SharedLock(mutex_); - return masterauth_; + return master_auth_; } - std::string GetMasterIP() { + std::string GetMasterIP() const { std::shared_lock SharedLock(mutex_); - return masterIp_; + return master_ip_; } - uint16_t GetMasterPort() { - std::shared_lock SharedLock(mutex_); - return masterPort_; - } + public: + std::atomic timeout = 0; + // auth + + std::map aliases; + std::atomic max_clients = 10000; // 10000 + std::atomic slow_log_time = 1000; // 1000 microseconds + std::atomic slow_log_max_len = 128; // 128 + std::atomic master_port; // replication + std::string include_file; // the template config + std::vector modules; // modules + std::atomic fast_cmd_threads_num = 4; + std::atomic slow_cmd_threads_num = 4; + std::atomic max_client_response_size = 1073741824; + std::atomic small_compaction_threshold = 604800; + std::atomic small_compaction_duration_threshold = 259200; + + std::atomic daemonize = false; + std::string pid_file = "./pikiwidb.pid"; + std::string ip = "127.0.0.1"; + std::atomic port = 9221; + std::string db_path = "./db/"; + std::string log_dir = "stdout"; // the log directory, differ from redis + std::string log_level = "warning"; + std::string run_id; + std::atomic databases = 3; + std::atomic worker_threads_num = 2; + std::atomic slave_threads_num = 2; + std::atomic db_instance_num = 3; + + std::atomic rocksdb_max_subcompactions = 0; + // default 2 + std::atomic rocksdb_max_background_jobs = 4; + // default 2 + std::atomic rocksdb_max_write_buffer_number = 2; + // default 2 + std::atomic rocksdb_min_write_buffer_number_to_merge = 2; + // default 64M + std::atomic rocksdb_write_buffer_size = 64 << 20; + std::atomic rocksdb_level0_file_num_compaction_trigger = 4; + std::atomic rocksdb_num_levels = 7; + std::atomic rocksdb_enable_pipelined_write = false; + std::atomic rocksdb_level0_slowdown_writes_trigger = 20; + std::atomic rocksdb_level0_stop_writes_trigger = 36; rocksdb::Options GetRocksDBOptions(); rocksdb::BlockBasedTableOptions GetRocksDBBlockBasedTableOptions(); + private: + mutable std::shared_mutex mutex_; + std::string password_; + std::string master_auth_; + std::string master_ip_; + private: inline void AddString(const std::string& key, bool rewritable, std::vector values_ptr_vector) { config_map_.emplace(key, std::make_unique(key, nullptr, rewritable, values_ptr_vector)); @@ -193,73 +203,22 @@ class PConfig { std::vector values_ptr_vector) { config_map_.emplace(key, std::make_unique(key, checkfunc, rewritable, values_ptr_vector)); } - inline void AddBool(const std::string& key, const CheckFunc& checkfunc, bool rewritable, bool* value_ptr) { + inline void AddBool(const std::string& key, const CheckFunc& checkfunc, bool rewritable, + std::atomic* value_ptr) { config_map_.emplace(key, std::make_unique(key, checkfunc, rewritable, value_ptr)); } template - inline void AddNumber(const std::string& key, bool rewritable, T* value_ptr) { + inline void AddNumber(const std::string& key, bool rewritable, std::atomic* value_ptr) { config_map_.emplace(key, std::make_unique>(key, nullptr, rewritable, value_ptr)); } template - inline void AddNumberWihLimit(const std::string& key, bool rewritable, T* value_ptr, T min, T max) { + inline void AddNumberWihLimit(const std::string& key, bool rewritable, std::atomic* value_ptr, T min, T max) { config_map_.emplace(key, std::make_unique>(key, nullptr, rewritable, value_ptr, min, max)); } - public: - bool daemonize = false; - std::string pidfile = "./pikiwidb.pid"; - std::string ip = "127.0.0.1"; - uint16_t port = 9221; - std::string dbpath = "./db/"; - std::string logdir = "stdout"; // the log directory, differ from redis - std::string loglevel = "warning"; - std::string runid; - size_t databases = 3; - uint32_t worker_threads_num = 2; - uint32_t slave_threads_num = 2; - size_t db_instance_num = 3; - - uint32_t rocksdb_max_subcompactions = 0; - // default 2 - int rocksdb_max_background_jobs = 4; - // default 2 - size_t rocksdb_max_write_buffer_number = 2; - // default 2 - int rocksdb_min_write_buffer_number_to_merge = 2; - // default 64M - size_t rocksdb_write_buffer_size = 64 << 20; - int rocksdb_level0_file_num_compaction_trigger = 4; - int rocksdb_num_levels = 7; - bool rocksdb_enable_pipelined_write = false; - int rocksdb_level0_slowdown_writes_trigger = 20; - int rocksdb_level0_stop_writes_trigger = 36; - private: - mutable std::shared_mutex mutex_; - uint32_t timeout_ = 0; - // auth - std::string password_; - std::map aliases_; - uint32_t maxclients_ = 10000; // 10000 - uint32_t slowlogtime_ = 1000; // 1000 microseconds - uint32_t slowlogmaxlen_ = 128; // 128 - std::string masterIp_; - std::string test; - uint16_t masterPort_; // replication - std::string masterauth_; - std::string includefile_; // the template config - std::vector modules_; // modules - int32_t fast_cmd_threads_num_ = 4; - int32_t slow_cmd_threads_num_ = 4; - uint64_t max_client_response_size_ = 1073741824; - uint64_t small_compaction_threshold_ = 604800; - uint64_t small_compaction_duration_threshold_ = 259200; - ConfigParser parser_; ConfigMap config_map_; std::string config_file_name_; }; - -extern PConfig g_config; - } // namespace pikiwidb diff --git a/src/db.cc b/src/db.cc index 55f886b59..fe1a2b4f9 100644 --- a/src/db.cc +++ b/src/db.cc @@ -18,8 +18,8 @@ DB::DB(int db_id, const std::string &db_path) : db_id_(db_id), db_path_(db_path // some options obj for all RocksDB in one DB. storage_options.options.write_buffer_manager = std::make_shared(10000000); storage_options.table_options = g_config.GetRocksDBBlockBasedTableOptions(); - storage_options.small_compaction_threshold = g_config.GetSmallCompactionThreshold(); - storage_options.small_compaction_duration_threshold = g_config.GetSmallCompactionDurationThreshold(); + storage_options.small_compaction_threshold = g_config.small_compaction_threshold.load(); + storage_options.small_compaction_duration_threshold = g_config.small_compaction_duration_threshold.load(); storage_options.db_instance_num = g_config.db_instance_num; storage_options.db_id = db_id; diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index 760f13b36..8388f0470 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -28,6 +28,7 @@ #include "pstd_util.h" std::unique_ptr g_pikiwidb; +using namespace pikiwidb; static void IntSigHandle(const int sig) { INFO("Catch Signal {}, cleanup...", sig); @@ -119,8 +120,6 @@ void PikiwiDB::OnNewConnection(pikiwidb::TcpConnection* obj) { } bool PikiwiDB::Init() { - using namespace pikiwidb; - char runid[kRunidSize + 1] = ""; getRandomHexChars(runid, kRunidSize); g_config.Set("runid", {runid, kRunidSize}, true); @@ -134,22 +133,22 @@ bool PikiwiDB::Init() { } NewTcpConnectionCallback cb = std::bind(&PikiwiDB::OnNewConnection, this, std::placeholders::_1); - if (!worker_threads_.Init(g_config.ip.c_str(), g_config.port, cb)) { - ERROR("worker_threads Init failed. IP = {} Port = {}", g_config.ip, g_config.port); + if (!worker_threads_.Init(g_config.ip.c_str(), g_config.port.load(), cb)) { + ERROR("worker_threads Init failed. IP = {} Port = {}", g_config.ip, g_config.port.load()); return false; } - auto num = g_config.worker_threads_num + g_config.slave_threads_num; + auto num = g_config.worker_threads_num.load() + g_config.slave_threads_num.load(); auto kMaxWorkerNum = IOThreadPool::GetMaxWorkerNum(); if (num > kMaxWorkerNum) { ERROR("number of threads can't exceeds {}, now is {}", kMaxWorkerNum, num); return false; } - worker_threads_.SetWorkerNum(static_cast(g_config.worker_threads_num)); - slave_threads_.SetWorkerNum(static_cast(g_config.slave_threads_num)); + worker_threads_.SetWorkerNum(static_cast(g_config.worker_threads_num.load())); + slave_threads_.SetWorkerNum(static_cast(g_config.slave_threads_num.load())); // now we only use fast cmd thread pool - auto status = cmd_threads_.Init(g_config.GetFastCmdThreadsNumber(), 0, "pikiwidb-cmd"); + auto status = cmd_threads_.Init(g_config.fast_cmd_threads_num.load(), 0, "pikiwidb-cmd"); if (!status.ok()) { ERROR("init cmd thread pool failed: {}", status.ToString()); return false; @@ -157,16 +156,16 @@ bool PikiwiDB::Init() { PSTORE.Init(g_config.databases); - PSlowLog::Instance().SetThreshold(g_config.GetSlowlogTime()); - PSlowLog::Instance().SetLogLimit(static_cast(g_config.GetSlowlogMaxLen())); + PSlowLog::Instance().SetThreshold(g_config.slow_log_time.load()); + PSlowLog::Instance().SetLogLimit(static_cast(g_config.slow_log_max_len.load())); // init base loop auto loop = worker_threads_.BaseLoop(); loop->ScheduleRepeatedly(1000, &PReplication::Cron, &PREPL); // master ip - if (!g_config.GetMasterIP().empty()) { - PREPL.SetMasterAddr(g_config.GetMasterIP().c_str(), g_config.GetMasterPort()); + if (!g_config.ip.empty()) { + PREPL.SetMasterAddr(g_config.GetMasterIP().c_str(), g_config.master_port.load()); } // cmd_table_manager_.InitCmdTable(); @@ -239,7 +238,7 @@ int main(int ac, char* av[]) { } if (!g_pikiwidb->GetConfigName().empty()) { - if (!pikiwidb::g_config.LoadFromFile(g_pikiwidb->GetConfigName())) { + if (!g_config.LoadFromFile(g_pikiwidb->GetConfigName())) { std::cerr << "Load config file [" << g_pikiwidb->GetConfigName() << "] failed!\n"; return -1; } @@ -248,10 +247,10 @@ int main(int ac, char* av[]) { // output logo to console char logo[512] = ""; snprintf(logo, sizeof logo - 1, pikiwidbLogo, KPIKIWIDB_VERSION, static_cast(sizeof(void*)) * 8, - static_cast(pikiwidb::g_config.port)); + static_cast(g_config.port)); std::cout << logo; - if (pikiwidb::g_config.daemonize) { + if (g_config.daemonize.load()) { daemonize(); } @@ -259,7 +258,7 @@ int main(int ac, char* av[]) { SignalSetup(); InitLogs(); - if (pikiwidb::g_config.daemonize) { + if (g_config.daemonize.load()) { closeStd(); } diff --git a/src/replication.cc b/src/replication.cc index 7ba60b728..aab8b17c7 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -232,11 +232,11 @@ void PReplication::Cron() { } else if (master->GetAuth()) { // send replconf char req[128]; - auto len = snprintf(req, sizeof req - 1, "replconf listening-port %hu\r\n", g_config.port); + auto len = snprintf(req, sizeof req - 1, "replconf listening-port %hu\r\n", g_config.port.load()); master->SendPacket(req, len); masterInfo_.state = kPReplStateWaitReplconf; - INFO("Send replconf listening-port {}", g_config.port); + INFO("Send replconf listening-port {}", g_config.port.load()); } else { WARN("Haven't auth to master yet, or check masterauth password"); } diff --git a/src/store.cc b/src/store.cc index 5957ed000..22cf42001 100644 --- a/src/store.cc +++ b/src/store.cc @@ -22,7 +22,7 @@ PStore& PStore::Instance() { void PStore::Init(int dbNum) { backends_.reserve(dbNum); for (int i = 0; i < dbNum; i++) { - auto db = std::make_unique(i, g_config.dbpath); + auto db = std::make_unique(i, g_config.db_path); backends_.push_back(std::move(db)); INFO("Open DB_{} success!", i); }