diff --git a/pikiwidb.conf b/pikiwidb.conf index f80c86d5c..9be37fe90 100644 --- a/pikiwidb.conf +++ b/pikiwidb.conf @@ -10,7 +10,7 @@ port 9221 # If you want you can bind a single interface, if the bind option is not # specified all the interfaces will listen for incoming connections. # -# bind 127.0.0.1 +ip 127.0.0.1 # Close the connection after a client is idle for N seconds (0 to disable) @@ -35,7 +35,7 @@ logfile stdout # Set the number of databases. The default database is DB 0, you can select # a different one on a per-connection basis using SELECT where # dbid is a number between 0 and 'databases'-1 -databases 3 +databases 16 ################################ SNAPSHOTTING ################################# # @@ -315,37 +315,27 @@ slowlog-log-slower-than 10000 # You can reclaim memory used by the slow log with SLOWLOG RESET. slowlog-max-len 128 -############################### ADVANCED CONFIG ############################### - -# Redis calls an internal function to perform many background tasks, like -# closing connections of clients in timeot, purging expired keys that are -# never requested, and so forth. -# -# Not all tasks are perforemd with the same frequency, but Redis checks for -# tasks to perform accordingly to the specified "hz" value. -# -# By default "hz" is set to 10. Raising the value will use more CPU when -# Redis is idle, but at the same time will make Redis more responsive when -# there are many keys expiring at the same time, and timeouts may be -# handled with more precision. -# -# The range is between 1 and 500, however a value over 100 is usually not -# a good idea. Most users should use the default of 10 and raise this up to -# 100 only in environments where very low latency is required. -hz 10 ############################### BACKENDS CONFIG ############################### -# PikiwiDB is a in memory database, though it has aof and rdb for dump data to disk, it -# is very limited. Try use leveldb for real storage, pikiwidb as cache. The cache algorithm -# is like linux page cache, please google or read your favorite linux book -# 0 is default, no backend -# 1 is RocksDB, currently only support RocksDB -backend 1 -backendpath dump -# the frequency of dump to backend per second -backendhz 10 -# the rocksdb number per db -db-instance-num 5 -# default 86400 * 7 -rocksdb-ttl-second 604800 -# default 86400 * 3 -rocksdb-periodic-second 259200 +# PikiwiDB uses RocksDB as the underlying storage engine, and the data belonging +# to the same DB is distributed among several RocksDB instances. + +# RocksDB instances number per DB +db-instance-num 3 +# default is 86400 * 7 +small-compaction-threshold 604800 +# default is 86400 * 3 +small-compaction-duration-threshold 259200 + +############################### ROCKSDB CONFIG ############################### +rocksdb-max-subcompactions 2 +rocksdb-max-background-jobs 4 +rocksdb-max-write-buffer-number 2 +rocksdb-min-write-buffer-number-to-merge 2 +# default is 64M +rocksdb-write-buffer-size 67108864 +rocksdb-level0-file-num-compaction-trigger 4 +rocksdb-number-levels 7 +rocksdb-enable-pipelined-write no +rocksdb-level0-slowdown-writes-trigger 20 +rocksdb-level0-stop-writes-trigger 36 + diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 5cd855f56..679c7c34b 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -30,7 +30,7 @@ std::vector BaseCmd::CurrentKey(PClient* client) const { return std void BaseCmd::Execute(PClient* client) { auto dbIndex = client->GetCurrentDB(); - if (!isExclusive()) { + if (!HasFlag(kCmdFlagsExclusive)) { PSTORE.GetBackend(dbIndex)->LockShared(); } @@ -39,7 +39,7 @@ void BaseCmd::Execute(PClient* client) { } DoCmd(client); - if (!isExclusive()) { + if (!HasFlag(kCmdFlagsExclusive)) { PSTORE.GetBackend(dbIndex)->UnLockShared(); } } diff --git a/src/base_cmd.h b/src/base_cmd.h index 356603992..07d3cb6d7 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -293,8 +293,6 @@ class BaseCmd : public std::enable_shared_from_this { uint32_t GetCmdID() const; - bool isExclusive() { return static_cast(flag_ & kCmdFlagsExclusive); } - protected: // Execute a specific command virtual void DoCmd(PClient* client) = 0; diff --git a/src/client.cc b/src/client.cc index e85f7f058..749cc2380 100644 --- a/src/client.cc +++ b/src/client.cc @@ -437,7 +437,7 @@ void PClient::OnConnect() { SetName("MasterConnection"); SetFlag(kClientFlagMaster); - if (g_config.masterauth.empty()) { + if (g_config.master_auth.empty()) { SetAuth(); } } else { diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index 197372ac5..906f82e2d 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -20,14 +20,27 @@ CmdConfigGet::CmdConfigGet(const std::string& name, int16_t arity) bool CmdConfigGet::DoInitial(PClient* client) { return true; } -void CmdConfigGet::DoCmd(PClient* client) { client->AppendString("config cmd in development"); } +void CmdConfigGet::DoCmd(PClient* client) { + std::vector results; + for (int i = 0; i < client->argv_.size() - 2; i++) { + g_config.Get(client->argv_[i + 2], &results); + } + client->AppendStringVector(results); +} CmdConfigSet::CmdConfigSet(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsAdmin, kAclCategoryAdmin) {} bool CmdConfigSet::DoInitial(PClient* client) { return true; } -void CmdConfigSet::DoCmd(PClient* client) { client->AppendString("config cmd in development"); } +void CmdConfigSet::DoCmd(PClient* client) { + auto s = g_config.Set(client->argv_[2], client->argv_[3]); + if (!s.ok()) { + client->SetRes(CmdRes::kInvalidParameter); + } else { + client->SetRes(CmdRes::kOK); + } +} FlushdbCmd::FlushdbCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryAdmin) {} @@ -85,7 +98,7 @@ ShutdownCmd::ShutdownCmd(const std::string& name, int16_t arity) bool ShutdownCmd::DoInitial(PClient* client) { // For now, only shutdown need check local if (client->PeerIP().find("127.0.0.1") == std::string::npos && - client->PeerIP().find(g_config.ip.c_str()) == std::string::npos) { + client->PeerIP().find(g_config.ip.ToString()) == std::string::npos) { client->SetRes(CmdRes::kErrOther, kCmdNameShutdown + " should be localhost"); return false; } diff --git a/src/cmd_hash.cc b/src/cmd_hash.cc index 694907e6c..60f2e3b01 100644 --- a/src/cmd_hash.cc +++ b/src/cmd_hash.cc @@ -151,7 +151,7 @@ void HGetAllCmd::DoCmd(PClient* client) { int64_t total_fv = 0; int64_t cursor = 0; int64_t next_cursor = 0; - size_t raw_limit = g_config.max_client_response_size; + size_t raw_limit = g_config.max_client_response_size.load(); std::string raw; std::vector fvs; storage::Status s; diff --git a/src/common.cc b/src/common.cc index 8fd483c80..e8f72bb0e 100644 --- a/src/common.cc +++ b/src/common.cc @@ -199,4 +199,23 @@ std::vector SplitString(const PString& str, char seperator) { return results; } +std::string MergeString(const std::vector& values, char delimiter) { + std::string result(*values.at(0)); + for (int i = 0; i < values.size() - 1; i++) { + result += delimiter; + result += *values.at(i + 1); + } + return result; +} + +std::string MergeString(const std::vector& values, char delimiter) { + std::string result(*values.at(0)); + for (int i = 0; i < values.size() - 1; i++) { + result += delimiter; + std::string s(*values.at(i + 1)); + result += s; + } + return result; +} + } // namespace pikiwidb diff --git a/src/common.h b/src/common.h index 29b0a7249..bb093975c 100644 --- a/src/common.h +++ b/src/common.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -76,8 +77,67 @@ enum class PParseResult : int8_t { PParseResult GetIntUntilCRLF(const char*& ptr, std::size_t nBytes, int& val); +class AtomicString { + public: + AtomicString() = default; + ~AtomicString() = default; + AtomicString(std::string str) { + std::lock_guard lock(mutex_); + str_ = std::move(str); + } + AtomicString(std::string&& str) { + std::lock_guard lock(mutex_); + str_ = std::move(str); + } + AtomicString(const std::string& str) { + std::lock_guard lock(mutex_); + str_ = str; + } + AtomicString(const char* c) { + std::lock_guard lock(mutex_); + str_ = std::string(c); + }; + AtomicString& operator=(const std::string& str) { + std::lock_guard lock(mutex_); + str_ = str; + return *this; + } + AtomicString& operator=(std::string&& str) { + std::lock_guard lock(mutex_); + str_ = std::move(str); + return *this; + } + operator std::string() { + std::shared_lock lock(mutex_); + return str_; + } + + operator std::string() const { + std::shared_lock lock(mutex_); + return str_; + } + + bool empty() const { + std::shared_lock lock(mutex_); + return str_.empty(); + } + + std::string ToString() const { + std::shared_lock lock(mutex_); + return str_; + } + + private: + mutable std::shared_mutex mutex_; + std::string str_; +}; + std::vector SplitString(const PString& str, char seperator); +std::string MergeString(const std::vector& values, char delimiter); + +std::string MergeString(const std::vector& values, char delimiter); + // The defer class for C++11 class ExecuteOnScopeExit { public: diff --git a/src/config.cc b/src/config.cc index 6a656817e..e8f1e03ff 100644 --- a/src/config.cc +++ b/src/config.cc @@ -6,14 +6,26 @@ */ #include +#include +#include #include #include "config.h" -#include "config_parser.h" +#include "pstd/pstd_string.h" +#include "store.h" namespace pikiwidb { -static void EraseQuotes(PString& str) { +constexpr uint16_t PORT_LIMIT_MAX = 65535; +constexpr uint16_t PORT_LIMIT_MIN = 1; +constexpr int DBNUMBER_MAX = 16; +constexpr int THREAD_MAX = 129; +constexpr int ROCKSDB_INSTANCE_NUMBER_MAX = 10; + +PConfig g_config; + +// preprocess func +static void EraseQuotes(std::string& str) { // convert "hello" to hello if (str.size() < 2) { return; @@ -24,200 +36,184 @@ static void EraseQuotes(PString& str) { } } -extern std::vector SplitString(const PString& str, char seperator); - -PConfig g_config; - -PConfig::PConfig() { - daemonize = false; - pidfile = "/var/run/pikiwidb.pid"; - - ip = "127.0.0.1"; - port = 9221; - timeout = 0; - dbpath = "./db"; - - loglevel = "notice"; - logdir = "stdout"; - - databases = 16; - - // rdb - saveseconds = 999999999; - savechanges = 999999999; - rdbcompression = true; - rdbchecksum = true; - rdbfullname = "./dump.rdb"; - - maxclients = 10000; - - // slow log - slowlogtime = 0; - slowlogmaxlen = 128; - - hz = 10; - - includefile = ""; - - maxmemory = 2 * 1024 * 1024 * 1024UL; - maxmemorySamples = 5; - noeviction = true; - - backend = kBackEndRocksDB; - backendPath = "dump"; - backendHz = 10; - - max_client_response_size = 1073741824; +static Status CheckYesNo(const std::string& value) { + if (!pstd::StringEqualCaseInsensitive(value, "yes") && !pstd::StringEqualCaseInsensitive(value, "no")) { + return Status::InvalidArgument("The value must be yes or no."); + } + return Status::OK(); +} - db_instance_num = 3; +static Status CheckLogLevel(const std::string& value) { + if (!pstd::StringEqualCaseInsensitive(value, "debug") && !pstd::StringEqualCaseInsensitive(value, "verbose") && + !pstd::StringEqualCaseInsensitive(value, "notice") && !pstd::StringEqualCaseInsensitive(value, "warning")) { + return Status::InvalidArgument("The value must be debug / verbose / notice / warning."); + } + return Status::OK(); +} - rocksdb_ttl_second = 0; - rocksdb_periodic_second = 0; +Status BaseValue::Set(const std::string& value, bool init_stage) { + if (!init_stage && !rewritable_) { + return Status::NotSupported("Dynamic modification is not supported."); + } + auto value_copy = value; + EraseQuotes(value_copy); + auto s = check(value_copy); + if (!s.ok()) { + return s; + } + // TODO(dingxiaoshuai) Support RocksDB config change Dynamically + return SetValue(value_copy); } -bool LoadPikiwiDBConfig(const char* cfgFile, PConfig& cfg) { - ConfigParser parser; - if (!parser.Load(cfgFile)) { - return false; +Status StringValue::SetValue(const std::string& value) { + auto values = SplitString(value, delimiter_); + if (values.size() != values_.size()) { + return Status::InvalidArgument("The number of parameters does not match."); + } + for (int i = 0; i < values_.size(); i++) { + *values_[i] = std::move(values[i]); } + return Status::OK(); +} - if (parser.GetData("daemonize") == "yes") { - cfg.daemonize = true; +Status BoolValue::SetValue(const std::string& value) { + if (pstd::StringEqualCaseInsensitive(value, "yes")) { + value_->store(true); } else { - cfg.daemonize = false; + value_->store(false); } + return Status::OK(); +} - cfg.pidfile = parser.GetData("pidfile", cfg.pidfile); +template +Status NumberValue::SetValue(const std::string& value) { + T v; + auto [ptr, ec] = std::from_chars(value.data(), value.data() + value.length(), v); + if (ec != std::errc()) { + return Status::InvalidArgument("Failed to convert to a number."); + } + if (v < value_min_) { + v = value_min_; + } + if (v > value_max_) { + v = value_max_; + } + value_->store(v); + return Status::OK(); +} - cfg.ip = parser.GetData("bind", cfg.ip); - cfg.port = parser.GetData("port"); - cfg.timeout = parser.GetData("timeout"); - cfg.dbpath = parser.GetData("db-path"); +PConfig::PConfig() { + AddBool("daemonize", &CheckYesNo, false, &daemonize); + AddString("ip", false, {&ip}); + AddNumberWihLimit("port", false, &port, PORT_LIMIT_MIN, PORT_LIMIT_MAX); + AddNumber("timeout", true, &timeout); + AddString("db-path", false, {&db_path}); + AddStrinWithFunc("loglevel", &CheckLogLevel, false, {&log_level}); + AddString("logfile", false, {&log_dir}); + AddNumberWihLimit("databases", false, &databases, 1, DBNUMBER_MAX); + AddString("requirepass", true, {&password}); + AddNumber("maxclients", true, &max_clients); + AddNumberWihLimit("worker-threads", false, &worker_threads_num, 1, THREAD_MAX); + AddNumberWihLimit("slave-threads", false, &worker_threads_num, 1, THREAD_MAX); + AddNumber("slowlog-log-slower-than", true, &slow_log_time); + AddNumber("slowlog-max-len", true, &slow_log_max_len); + AddNumberWihLimit("db-instance-num", true, &db_instance_num, 1, ROCKSDB_INSTANCE_NUMBER_MAX); + AddNumberWihLimit("fast-cmd-threads-num", false, &fast_cmd_threads_num, 1, THREAD_MAX); + AddNumberWihLimit("slow-cmd-threads-num", false, &slow_cmd_threads_num, 1, THREAD_MAX); + AddNumber("max-client-response-size", true, &max_client_response_size); + AddString("runid", false, {&run_id}); + AddNumber("small-compaction-threshold", true, &small_compaction_threshold); + AddNumber("small-compaction-duration-threshold", true, &small_compaction_duration_threshold); + + // rocksdb config + AddNumber("rocksdb-max-subcompactions", false, &rocksdb_max_subcompactions); + AddNumber("rocksdb-max-background-jobs", false, &rocksdb_max_background_jobs); + AddNumber("rocksdb-max-write-buffer-number", false, &rocksdb_max_write_buffer_number); + AddNumber("rocksdb-min-write-buffer-number-to-merge", false, &rocksdb_min_write_buffer_number_to_merge); + AddNumber("rocksdb-write-buffer-size", false, &rocksdb_write_buffer_size); + AddNumber("rocksdb-level0-file-num-compaction-trigger", false, &rocksdb_level0_file_num_compaction_trigger); + AddNumber("rocksdb-number-levels", true, &rocksdb_num_levels); + AddBool("rocksdb-enable-pipelined-write", CheckYesNo, false, &rocksdb_enable_pipelined_write); + AddNumber("rocksdb-level0-slowdown-writes-trigger", false, &rocksdb_level0_slowdown_writes_trigger); + AddNumber("rocksdb-level0-stop-writes-trigger", false, &rocksdb_level0_stop_writes_trigger); + AddNumber("rocksdb-level0-slowdown-writes-trigger", false, &rocksdb_level0_slowdown_writes_trigger); +} - cfg.loglevel = parser.GetData("loglevel", cfg.loglevel); - cfg.logdir = parser.GetData("logfile", cfg.logdir); - EraseQuotes(cfg.logdir); - if (cfg.logdir.empty()) { - cfg.logdir = "stdout"; +bool PConfig::LoadFromFile(const std::string& file_name) { + config_file_name_ = file_name; + if (!parser_.Load(file_name.c_str())) { + return false; } - cfg.databases = parser.GetData("databases", cfg.databases); - cfg.password = parser.GetData("requirepass"); - EraseQuotes(cfg.password); - - // alias command - { - std::vector alias(SplitString(parser.GetData("rename-command"), ' ')); - if (alias.size() % 2 == 0) { - for (auto it(alias.begin()); it != alias.end();) { - const PString& oldCmd = *(it++); - const PString& newCmd = *(it++); - cfg.aliases[oldCmd] = newCmd; + // During the initialization phase, so there is no need to hold a lock. + for (auto& [key, value] : parser_.GetMap()) { + if (auto iter = config_map_.find(key); iter != config_map_.end()) { + auto& v = config_map_[key]; + auto s = v->Set(value.at(0), true); + if (!s.ok()) { + return false; } } } - // load rdb config - std::vector saveInfo(SplitString(parser.GetData("save"), ' ')); - if (!saveInfo.empty() && saveInfo.size() != 2) { - EraseQuotes(saveInfo[0]); - if (!(saveInfo.size() == 1 && saveInfo[0].empty())) { - std::cerr << "bad format save rdb interval, bad string " << parser.GetData("save") << std::endl; - return false; - } - } else if (!saveInfo.empty()) { - cfg.saveseconds = std::stoi(saveInfo[0]); - cfg.savechanges = std::stoi(saveInfo[1]); + // Handle separately + std::vector master(SplitString(parser_.GetData("slaveof"), ' ')); + if (master.size() == 2) { + master_ip = std::move(master[0]); + master_port = static_cast(std::stoi(master[1])); } - if (cfg.saveseconds == 0) { - cfg.saveseconds = 999999999; - } - if (cfg.savechanges == 0) { - cfg.savechanges = 999999999; + std::vector alias(SplitString(parser_.GetData("rename-command"), ' ')); + if (alias.size() % 2 == 0) { + for (auto it(alias.begin()); it != alias.end();) { + const PString& oldCmd = *(it++); + const PString& newCmd = *(it++); + aliases[oldCmd] = newCmd; + } } - cfg.rdbcompression = (parser.GetData("rdbcompression") == "yes"); - cfg.rdbchecksum = (parser.GetData("rdbchecksum") == "yes"); - - cfg.rdbfullname = parser.GetData("dir", "./") + parser.GetData("dbfilename", "dump.rdb"); - - cfg.maxclients = parser.GetData("maxclients", 10000); - - cfg.slowlogtime = parser.GetData("slowlog-log-slower-than", 0); - cfg.slowlogmaxlen = parser.GetData("slowlog-max-len", cfg.slowlogmaxlen); - - cfg.hz = parser.GetData("hz", 10); + return true; +} - // load master ip port - std::vector master(SplitString(parser.GetData("slaveof"), ' ')); - if (master.size() == 2) { - cfg.masterIp = std::move(master[0]); - cfg.masterPort = static_cast(std::stoi(master[1])); +void PConfig::Get(const std::string& key, std::vector* values) const { + values->clear(); + for (const auto& [k, v] : config_map_) { + if (key == "*" || pstd::StringMatch(key.c_str(), k.c_str(), 1)) { + values->emplace_back(k); + values->emplace_back(v->Value()); + } } - cfg.masterauth = parser.GetData("masterauth"); - - // load modules' names - cfg.modules = parser.GetDataVector("loadmodule"); - - cfg.includefile = parser.GetData("include"); // TODO multi files include - - // lru cache - cfg.maxmemory = parser.GetData("maxmemory", 2 * 1024 * 1024 * 1024UL); - cfg.maxmemorySamples = parser.GetData("maxmemory-samples", 5); - cfg.noeviction = (parser.GetData("maxmemory-policy", "noeviction") == "noeviction"); - - // worker threads - cfg.worker_threads_num = parser.GetData("worker-threads", 1); - - // slave threads - cfg.slave_threads_num = parser.GetData("slave-threads", 1); - cfg.fast_cmd_threads_num = parser.GetData("fast-threads", 1); - cfg.slow_cmd_threads_num = parser.GetData("slow-threads", 1); - - // backend - cfg.backend = parser.GetData("backend", kBackEndNone); - cfg.backendPath = parser.GetData("backendpath", cfg.backendPath); - EraseQuotes(cfg.backendPath); - cfg.backendHz = parser.GetData("backendhz", 10); - - cfg.max_client_response_size = parser.GetData("max-client-response-size", 1073741824); - - cfg.db_instance_num = parser.GetData("db-instance-num", 3); - cfg.rocksdb_ttl_second = parser.GetData("rocksdb-ttl-second"); - cfg.rocksdb_periodic_second = parser.GetData("rocksdb-periodic-second"); - - return cfg.CheckArgs(); } -bool PConfig::CheckArgs() const { -#define RETURN_IF_FAIL(cond) \ - if (!(cond)) { \ - std::cerr << #cond " failed\n"; \ - return false; \ - } - - RETURN_IF_FAIL(port > 0); - RETURN_IF_FAIL(databases > 0); - RETURN_IF_FAIL(maxclients > 0); - RETURN_IF_FAIL(hz > 0 && hz < 500); - RETURN_IF_FAIL(maxmemory >= 512 * 1024 * 1024UL); - RETURN_IF_FAIL(maxmemorySamples > 0 && maxmemorySamples < 10); - RETURN_IF_FAIL(worker_threads_num > 0 && worker_threads_num < 129); // as redis - RETURN_IF_FAIL(fast_cmd_threads_num > 0 && fast_cmd_threads_num < 16); - RETURN_IF_FAIL(slow_cmd_threads_num > 0 && slow_cmd_threads_num < 16); - RETURN_IF_FAIL(backend >= kBackEndNone && backend < kBackEndMax); - RETURN_IF_FAIL(backendHz >= 1 && backendHz <= 50); - RETURN_IF_FAIL(db_instance_num >= 1); - RETURN_IF_FAIL(rocksdb_ttl_second > 0); - RETURN_IF_FAIL(rocksdb_periodic_second > 0); - RETURN_IF_FAIL(max_client_response_size > 0); - -#undef RETURN_IF_FAIL +Status PConfig::Set(std::string key, const std::string& value, bool init_stage) { + std::transform(key.begin(), key.end(), key.begin(), ::tolower); + auto iter = config_map_.find(key); + if (iter == config_map_.end()) { + return Status::NotFound("Non-existent configuration items."); + } + return iter->second->Set(value, init_stage); +} - return true; +rocksdb::Options PConfig::GetRocksDBOptions() { + rocksdb::Options options; + options.create_if_missing = true; + options.create_missing_column_families = true; + options.max_subcompactions = rocksdb_max_subcompactions; + options.max_background_jobs = rocksdb_max_background_jobs; + options.max_write_buffer_number = rocksdb_max_write_buffer_number; + options.min_write_buffer_number_to_merge = rocksdb_min_write_buffer_number_to_merge; + options.write_buffer_size = rocksdb_write_buffer_size; + options.level0_file_num_compaction_trigger = rocksdb_level0_file_num_compaction_trigger; + options.num_levels = rocksdb_num_levels; + options.enable_pipelined_write = rocksdb_enable_pipelined_write; + options.level0_slowdown_writes_trigger = rocksdb_level0_slowdown_writes_trigger; + options.level0_stop_writes_trigger = rocksdb_level0_stop_writes_trigger; + return options; } -bool PConfig::CheckPassword(const PString& pwd) const { return password.empty() || password == pwd; } +rocksdb::BlockBasedTableOptions PConfig::GetRocksDBBlockBasedTableOptions() { + rocksdb::BlockBasedTableOptions options; + return options; +} } // namespace pikiwidb diff --git a/src/config.h b/src/config.h index 718c47423..d0dd2b041 100644 --- a/src/config.h +++ b/src/config.h @@ -7,97 +7,195 @@ #pragma once -#include +#include +#include +#include +#include #include +#include +#include #include +#include "rocksdb/options.h" +#include "rocksdb/table.h" + #include "common.h" +#include "net/config_parser.h" namespace pikiwidb { -enum BackEndType { - kBackEndNone = 0, - kBackEndRocksDB = 1, - kBackEndMax = 2, -}; - -struct PConfig { - bool daemonize; - PString pidfile; - - PString ip; - unsigned short port; - - int timeout; +using Status = rocksdb::Status; +using CheckFunc = std::function; +class PConfig; +extern PConfig g_config; - PString dbpath; +class BaseValue { + public: + BaseValue(const std::string& key, CheckFunc check_func_ptr, bool rewritable = false) + : key_(key), custom_check_func_ptr_(check_func_ptr), rewritable_(rewritable) {} - PString loglevel; - PString logdir; // the log directory, differ from redis + virtual ~BaseValue() = default; - int databases; + const std::string& Key() const { return key_; } - // auth - PString password; + virtual std::string Value() const = 0; - std::map aliases; + Status Set(const std::string& value, bool force); - // @ rdb - // save seconds changes - int saveseconds; - int savechanges; - bool rdbcompression; // yes - bool rdbchecksum; // yes - PString rdbfullname; // ./dump.rdb + protected: + virtual Status SetValue(const std::string&) = 0; + Status check(const std::string& value) { + if (!custom_check_func_ptr_) { + return Status::OK(); + } + return custom_check_func_ptr_(value); + } - int maxclients; // 10000 + protected: + std::string key_; + CheckFunc custom_check_func_ptr_ = nullptr; + bool rewritable_ = false; +}; - int slowlogtime; // 1000 microseconds - int slowlogmaxlen; // 128 +class StringValue : public BaseValue { + public: + StringValue(const std::string& key, CheckFunc check_func_ptr, bool rewritable, + const std::vector& value_ptr_vec, char delimiter = ' ') + : BaseValue(key, check_func_ptr, rewritable), values_(value_ptr_vec), delimiter_(delimiter) { + assert(!values_.empty()); + } + ~StringValue() override = default; - int hz; // 10 [1,500] + std::string Value() const override { return MergeString(values_, delimiter_); }; - PString masterIp; - unsigned short masterPort; // replication - PString masterauth; + private: + Status SetValue(const std::string& value) override; - PString runid; + std::vector values_; + char delimiter_ = 0; +}; - PString includefile; // the template config +template +class NumberValue : public BaseValue { + public: + NumberValue(const std::string& key, CheckFunc check_func_ptr, bool rewritable, std::atomic* value_ptr, + T min = std::numeric_limits::min(), T max = std::numeric_limits::max()) + : BaseValue(key, check_func_ptr, rewritable), value_(value_ptr), value_min_(min), value_max_(max) { + assert(value_ != nullptr); + assert(value_min_ <= value_max_); + }; - std::vector modules; // modules + std::string Value() const override { return std::to_string(value_->load()); } - // use redis as cache, level db as backup - uint64_t maxmemory; // default 2GB - int maxmemorySamples; // default 5 - bool noeviction; // default true + private: + Status SetValue(const std::string& value) override; - // THREADED I/O - int worker_threads_num; + std::atomic* value_ = nullptr; + T value_min_; + T value_max_; +}; - // THREADED SLAVE - int slave_threads_num; +class BoolValue : public BaseValue { + public: + BoolValue(const std::string& key, CheckFunc check_func_ptr, bool rewritable, std::atomic* value_ptr) + : BaseValue(key, check_func_ptr, rewritable), value_(value_ptr) { + assert(value_ != nullptr); + }; - int fast_cmd_threads_num; - int slow_cmd_threads_num; + std::string Value() const override { return value_->load() ? "yes" : "no"; }; - int backend; // enum BackEndType - PString backendPath; - int backendHz; // the frequency of dump to backend + private: + Status SetValue(const std::string& value) override; + std::atomic* value_ = nullptr; +}; - int64_t max_client_response_size; +using ValuePrt = std::unique_ptr; +using ConfigMap = std::unordered_map; - int db_instance_num; - uint64_t rocksdb_ttl_second; - uint64_t rocksdb_periodic_second; +class PConfig { + public: PConfig(); - - bool CheckArgs() const; - bool CheckPassword(const PString& pwd) const; + ~PConfig() = default; + bool LoadFromFile(const std::string& file_name); + const std::string& ConfigFileName() const { return config_file_name_; } + void Get(const std::string&, std::vector*) const; + Status Set(std::string, const std::string&, bool force = false); + + public: + std::atomic_uint32_t timeout = 0; + // auth + AtomicString password; + AtomicString master_auth; + AtomicString master_ip; + std::map aliases; + std::atomic_uint32_t max_clients = 10000; // 10000 + std::atomic_uint32_t slow_log_time = 1000; // 1000 microseconds + std::atomic_uint32_t slow_log_max_len = 128; // 128 + std::atomic_uint32_t master_port; // replication + AtomicString include_file; // the template config + std::vector modules; // modules + std::atomic_int32_t fast_cmd_threads_num = 4; + std::atomic_int32_t slow_cmd_threads_num = 4; + std::atomic_uint64_t max_client_response_size = 1073741824; + std::atomic_uint64_t small_compaction_threshold = 604800; + std::atomic_uint64_t small_compaction_duration_threshold = 259200; + + std::atomic_bool daemonize = false; + AtomicString pid_file = "./pikiwidb.pid"; + AtomicString ip = "127.0.0.1"; + std::atomic_uint16_t port = 9221; + AtomicString db_path = "./db/"; + AtomicString log_dir = "stdout"; // the log directory, differ from redis + AtomicString log_level = "warning"; + AtomicString run_id; + std::atomic databases = 16; + std::atomic_uint32_t worker_threads_num = 2; + std::atomic_uint32_t slave_threads_num = 2; + std::atomic db_instance_num = 3; + + std::atomic_uint32_t rocksdb_max_subcompactions = 0; + // default 2 + std::atomic_int rocksdb_max_background_jobs = 4; + // default 2 + std::atomic rocksdb_max_write_buffer_number = 2; + // default 2 + std::atomic_int rocksdb_min_write_buffer_number_to_merge = 2; + // default 64M + std::atomic rocksdb_write_buffer_size = 64 << 20; + std::atomic_int rocksdb_level0_file_num_compaction_trigger = 4; + std::atomic_int rocksdb_num_levels = 7; + std::atomic_bool rocksdb_enable_pipelined_write = false; + std::atomic_int rocksdb_level0_slowdown_writes_trigger = 20; + std::atomic_int rocksdb_level0_stop_writes_trigger = 36; + + rocksdb::Options GetRocksDBOptions(); + + rocksdb::BlockBasedTableOptions GetRocksDBBlockBasedTableOptions(); + + private: + inline void AddString(const std::string& key, bool rewritable, std::vector values_ptr_vector) { + config_map_.emplace(key, std::make_unique(key, nullptr, rewritable, values_ptr_vector)); + } + inline void AddStrinWithFunc(const std::string& key, const CheckFunc& checkfunc, bool rewritable, + std::vector values_ptr_vector) { + config_map_.emplace(key, std::make_unique(key, checkfunc, rewritable, values_ptr_vector)); + } + inline void AddBool(const std::string& key, const CheckFunc& checkfunc, bool rewritable, + std::atomic* value_ptr) { + config_map_.emplace(key, std::make_unique(key, checkfunc, rewritable, value_ptr)); + } + template + inline void AddNumber(const std::string& key, bool rewritable, std::atomic* value_ptr) { + config_map_.emplace(key, std::make_unique>(key, nullptr, rewritable, value_ptr)); + } + template + inline void AddNumberWihLimit(const std::string& key, bool rewritable, std::atomic* value_ptr, T min, T max) { + config_map_.emplace(key, std::make_unique>(key, nullptr, rewritable, value_ptr, min, max)); + } + + private: + ConfigParser parser_; + ConfigMap config_map_; + std::string config_file_name_; }; - -extern PConfig g_config; - -extern bool LoadPikiwiDBConfig(const char* cfgFile, PConfig& cfg); - } // namespace pikiwidb diff --git a/src/db.cc b/src/db.cc index 6c1fc3576..6fdbfe5ee 100644 --- a/src/db.cc +++ b/src/db.cc @@ -14,20 +14,25 @@ namespace pikiwidb { DB::DB(int db_id, const std::string &db_path) : db_id_(db_id), db_path_(db_path + std::to_string(db_id) + '/') { storage::StorageOptions storage_options; - storage_options.options.create_if_missing = true; + storage_options.options = g_config.GetRocksDBOptions(); + // some options obj for all RocksDB in one DB. + auto cap = storage_options.db_instance_num * kColumnNum * storage_options.options.write_buffer_size * + storage_options.options.max_write_buffer_number; + storage_options.options.write_buffer_manager = std::make_shared(cap); + + storage_options.table_options = g_config.GetRocksDBBlockBasedTableOptions(); + + storage_options.small_compaction_threshold = g_config.small_compaction_threshold.load(); + storage_options.small_compaction_duration_threshold = g_config.small_compaction_duration_threshold.load(); storage_options.db_instance_num = g_config.db_instance_num; storage_options.db_id = db_id; - // options for CF - storage_options.options.ttl = g_config.rocksdb_ttl_second; - storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second; storage_ = std::make_unique(); if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) { ERROR("Storage open failed! {}", s.ToString()); abort(); } opened_ = true; - INFO("Open DB{} success!", db_id); } } // namespace pikiwidb diff --git a/src/db.h b/src/db.h index a4d603de7..24e983171 100644 --- a/src/db.h +++ b/src/db.h @@ -14,6 +14,7 @@ #include "storage/storage.h" namespace pikiwidb { +constexpr int kColumnNum = 10; class DB { public: DB(int db_id, const std::string& db_path); diff --git a/src/net/config_parser.h b/src/net/config_parser.h index 2b4373cff..7783ecba9 100644 --- a/src/net/config_parser.h +++ b/src/net/config_parser.h @@ -18,6 +18,8 @@ class ConfigParser { public: + using Data = std::map>; + bool Load(const char* FileName); template @@ -25,6 +27,8 @@ class ConfigParser { const std::vector& GetDataVector(const char* key) const; + const Data& GetMap() { return data_; } + #ifdef CONFIG_DEBUG void Print() { std::cout << "//////////////////" << std::endl; @@ -37,8 +41,6 @@ class ConfigParser { #endif private: - typedef std::map > Data; - Data data_; template diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index f20ff12cf..87b51747c 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -15,7 +15,6 @@ #include #include "log.h" -#include "rocksdb/db.h" #include "client.h" #include "store.h" @@ -29,6 +28,7 @@ #include "pstd_util.h" std::unique_ptr g_pikiwidb; +using namespace pikiwidb; static void IntSigHandle(const int sig) { INFO("Catch Signal {}, cleanup...", sig); @@ -77,7 +77,7 @@ bool PikiwiDB::ParseArgs(int ac, char* av[]) { if (++i == ac) { return false; } - port_ = static_cast(std::atoi(av[i])); + port_ = static_cast(std::atoi(av[i])); } else if (strncasecmp(av[i], "--loglevel", 10) == 0) { if (++i == ac) { return false; @@ -99,72 +99,6 @@ bool PikiwiDB::ParseArgs(int ac, char* av[]) { return true; } -static void PdbCron() { - // using namespace pikiwidb; - // - // if (g_qdbPid != -1) { - // return; - // } - // - // if (Now() > (g_lastPDBSave + static_cast(g_config.saveseconds)) * 1000UL) - // { - // int ret = fork(); - // if (ret == 0) { - // { - // PDBSaver qdb; - // qdb.Save(g_config.rdbfullname.c_str()); - // std::cerr << "ServerCron child save rdb done, exiting child\n"; - // } // make qdb to be destructed before exit - // _exit(0); - // } else if (ret == -1) { - // ERROR("fork qdb save process failed"); - // } else { - // g_qdbPid = ret; - // } - // - // INFO("ServerCron save rdb file {}", g_config.rdbfullname); - // } -} - -static void LoadDBFromDisk() { - // using namespace pikiwidb; - // - // PDBLoader loader; - // loader.Load(g_config.rdbfullname.c_str()); - //} - // - // static void CheckChild() { - // using namespace pikiwidb; - // - // if (g_qdbPid == -1) { - // return; - // } - // - // int statloc = 0; - // pid_t pid = wait3(&statloc, WNOHANG, nullptr); - // - // if (pid != 0 && pid != -1) { - // int exit = WEXITSTATUS(statloc); - // int signal = 0; - // - // if (WIFSIGNALED(statloc)) { - // signal = WTERMSIG(statloc); - // } - // - // if (pid == g_qdbPid) { - // PDBSaver::SaveDoneHandler(exit, signal); - // if (PREPL.IsBgsaving()) { - // PREPL.OnRdbSaveDone(); - // } else { - // PREPL.TryBgsave(); - // } - // } else { - // ERROR("{} is not rdb process", pid); - // assert(!!!"Is there any back process except rdb?"); - // } - // } -} - void PikiwiDB::OnNewConnection(pikiwidb::TcpConnection* obj) { INFO("New connection from {}:{}", obj->GetPeerIP(), obj->GetPeerPort()); @@ -186,41 +120,35 @@ void PikiwiDB::OnNewConnection(pikiwidb::TcpConnection* obj) { } bool PikiwiDB::Init() { - using namespace pikiwidb; - char runid[kRunidSize + 1] = ""; getRandomHexChars(runid, kRunidSize); - g_config.runid.assign(runid, kRunidSize); + g_config.Set("runid", {runid, kRunidSize}, true); if (port_ != 0) { - g_config.port = port_; + g_config.Set("port", std::to_string(port_), true); } if (!log_level_.empty()) { - g_config.loglevel = log_level_; - } - - if (!master_.empty()) { - g_config.masterIp = master_; - g_config.masterPort = master_port_; + g_config.Set("log-level", log_level_, true); } NewTcpConnectionCallback cb = std::bind(&PikiwiDB::OnNewConnection, this, std::placeholders::_1); - if (!worker_threads_.Init(g_config.ip.c_str(), g_config.port, cb)) { + if (!worker_threads_.Init(g_config.ip.ToString().c_str(), g_config.port.load(), cb)) { + ERROR("worker_threads Init failed. IP = {} Port = {}", g_config.ip.ToString(), g_config.port.load()); return false; } - auto num = g_config.worker_threads_num + g_config.slave_threads_num; + auto num = g_config.worker_threads_num.load() + g_config.slave_threads_num.load(); auto kMaxWorkerNum = IOThreadPool::GetMaxWorkerNum(); if (num > kMaxWorkerNum) { ERROR("number of threads can't exceeds {}, now is {}", kMaxWorkerNum, num); return false; } - worker_threads_.SetWorkerNum(static_cast(g_config.worker_threads_num)); - slave_threads_.SetWorkerNum(static_cast(g_config.slave_threads_num)); + worker_threads_.SetWorkerNum(static_cast(g_config.worker_threads_num.load())); + slave_threads_.SetWorkerNum(static_cast(g_config.slave_threads_num.load())); // now we only use fast cmd thread pool - auto status = cmd_threads_.Init(g_config.fast_cmd_threads_num, 0, "pikiwidb-cmd"); + auto status = cmd_threads_.Init(g_config.fast_cmd_threads_num.load(), 0, "pikiwidb-cmd"); if (!status.ok()) { ERROR("init cmd thread pool failed: {}", status.ToString()); return false; @@ -228,22 +156,16 @@ bool PikiwiDB::Init() { PSTORE.Init(g_config.databases); - // Only if there is no backend, load rdb - if (g_config.backend == pikiwidb::kBackEndNone) { - LoadDBFromDisk(); - } - - PSlowLog::Instance().SetThreshold(g_config.slowlogtime); - PSlowLog::Instance().SetLogLimit(static_cast(g_config.slowlogmaxlen)); + PSlowLog::Instance().SetThreshold(g_config.slow_log_time.load()); + PSlowLog::Instance().SetLogLimit(static_cast(g_config.slow_log_max_len.load())); // init base loop auto loop = worker_threads_.BaseLoop(); - loop->ScheduleRepeatedly(1000 / pikiwidb::g_config.hz, PdbCron); loop->ScheduleRepeatedly(1000, &PReplication::Cron, &PREPL); // master ip - if (!g_config.masterIp.empty()) { - PREPL.SetMasterAddr(g_config.masterIp.c_str(), g_config.masterPort); + if (!g_config.ip.empty()) { + PREPL.SetMasterAddr(g_config.master_ip.ToString().c_str(), g_config.master_port.load()); } // cmd_table_manager_.InitCmdTable(); @@ -308,7 +230,6 @@ static void closeStd() { } int main(int ac, char* av[]) { - [[maybe_unused]] rocksdb::DB* db; g_pikiwidb = std::make_unique(); if (!g_pikiwidb->ParseArgs(ac - 1, av + 1)) { @@ -317,24 +238,19 @@ int main(int ac, char* av[]) { } if (!g_pikiwidb->GetConfigName().empty()) { - if (!LoadPikiwiDBConfig(g_pikiwidb->GetConfigName().c_str(), pikiwidb::g_config)) { + if (!g_config.LoadFromFile(g_pikiwidb->GetConfigName())) { std::cerr << "Load config file [" << g_pikiwidb->GetConfigName() << "] failed!\n"; return -1; } - } else { - pikiwidb::g_config.fast_cmd_threads_num = int(1); - pikiwidb::g_config.slow_cmd_threads_num = int(1); - pikiwidb::g_config.worker_threads_num = int(2); - pikiwidb::g_config.slave_threads_num = int(2); } // output logo to console char logo[512] = ""; snprintf(logo, sizeof logo - 1, pikiwidbLogo, KPIKIWIDB_VERSION, static_cast(sizeof(void*)) * 8, - static_cast(pikiwidb::g_config.port)); + static_cast(g_config.port)); std::cout << logo; - if (pikiwidb::g_config.daemonize) { + if (g_config.daemonize.load()) { daemonize(); } @@ -342,7 +258,7 @@ int main(int ac, char* av[]) { SignalSetup(); InitLogs(); - if (pikiwidb::g_config.daemonize) { + if (g_config.daemonize.load()) { closeStd(); } diff --git a/src/replication.cc b/src/replication.cc index b7633c1f6..3c10aac92 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -60,7 +60,7 @@ void PReplication::OnRdbSaveDone() { if (cli->GetSlaveInfo()->state == kPSlaveStateWaitBgsaveEnd) { cli->GetSlaveInfo()->state = kPSlaveStateOnline; - if (!rdb.IsOpen() && !rdb.Open(g_config.rdbfullname.c_str())) { + if (!rdb.IsOpen()) { ERROR("can not open rdb when replication\n"); return; // fatal error; } @@ -175,7 +175,7 @@ void PReplication::Cron() { if (masterInfo_.addr.IsValid()) { switch (masterInfo_.state) { case kPReplStateNone: { - if (masterInfo_.addr.GetIP() == g_config.ip && masterInfo_.addr.GetPort() == g_config.port) { + if (masterInfo_.addr.GetIP() == g_config.ip.ToString() && masterInfo_.addr.GetPort() == g_config.port) { ERROR("Fix config, master addr is self addr!"); assert(!!!"wrong config for master addr"); } @@ -208,14 +208,14 @@ void PReplication::Cron() { } break; case kPReplStateConnected: - if (!g_config.masterauth.empty()) { + if (!g_config.master_auth.empty()) { if (auto master = master_.lock()) { UnboundedBuffer req; req.PushData("auth "); - req.PushData(g_config.masterauth.data(), g_config.masterauth.size()); + req.PushData(g_config.master_auth.ToString().data(), g_config.master_auth.ToString().size()); req.PushData("\r\n"); master->SendPacket(req); - INFO("send auth with password {}", g_config.masterauth); + INFO("send auth with password {}", g_config.master_auth.ToString()); masterInfo_.state = kPReplStateWaitAuth; break; @@ -232,11 +232,11 @@ void PReplication::Cron() { } else if (master->GetAuth()) { // send replconf char req[128]; - auto len = snprintf(req, sizeof req - 1, "replconf listening-port %hu\r\n", g_config.port); + auto len = snprintf(req, sizeof req - 1, "replconf listening-port %hu\r\n", g_config.port.load()); master->SendPacket(req, len); masterInfo_.state = kPReplStateWaitReplconf; - INFO("Send replconf listening-port {}", g_config.port); + INFO("Send replconf listening-port {}", g_config.port.load()); } else { WARN("Haven't auth to master yet, or check masterauth password"); } @@ -318,7 +318,7 @@ PReplState PReplication::GetMasterState() const { return masterInfo_.state; } SocketAddr PReplication::GetMasterAddr() const { return masterInfo_.addr; } -void PReplication::SetMasterAddr(const char* ip, unsigned short port) { +void PReplication::SetMasterAddr(const char* ip, uint16_t port) { if (ip) { masterInfo_.addr.Init(ip, port); } else { @@ -351,7 +351,7 @@ PError replconf(const std::vector& params, UnboundedBuffer* reply) { info = client->GetSlaveInfo(); PREPL.AddSlave(client); } - info->listenPort = static_cast(port); + info->listenPort = static_cast(port); } else { break; } @@ -431,7 +431,7 @@ PError slaveof(const std::vector& params, UnboundedBuffer* reply) { } else { long tmpPort = 0; pstd::String2int(params[2].c_str(), params[2].size(), &tmpPort); - unsigned short port = static_cast(tmpPort); + uint16_t port = static_cast(tmpPort); SocketAddr reqMaster(params[1].c_str(), port); diff --git a/src/replication.h b/src/replication.h index 142656788..3274c625e 100644 --- a/src/replication.h +++ b/src/replication.h @@ -68,7 +68,7 @@ enum PSlaveState { struct PSlaveInfo { PSlaveState state; - unsigned short listenPort; // slave listening port + uint16_t listenPort; // slave listening port PSlaveInfo() : state(kPSlaveStateNone), listenPort(0) {} }; @@ -129,7 +129,7 @@ class PReplication { void SaveTmpRdb(const char* data, std::size_t& len); void SetMaster(const std::shared_ptr& cli); void SetMasterState(PReplState s); - void SetMasterAddr(const char* ip, unsigned short port); + void SetMasterAddr(const char* ip, uint16_t port); void SetRdbSize(std::size_t s); PReplState GetMasterState() const; SocketAddr GetMasterAddr() const; diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 5203e3bb3..c1a0daf16 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -58,8 +58,8 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_ rocksdb::BlockBasedTableOptions table_ops(storage_options.table_options); table_ops.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, true)); + // Set up separate configuration for RocksDB rocksdb::DBOptions db_ops(storage_options.options); - db_ops.create_missing_column_families = true; // string column-family options rocksdb::ColumnFamilyOptions string_cf_ops(storage_options.options); diff --git a/src/store.cc b/src/store.cc index bd2b6c684..22cf42001 100644 --- a/src/store.cc +++ b/src/store.cc @@ -20,20 +20,13 @@ PStore& PStore::Instance() { } void PStore::Init(int dbNum) { - if (g_config.backend == kBackEndNone) { - return; - } - backends_.reserve(dbNum); - - if (g_config.backend == kBackEndRocksDB) { - for (int i = 0; i < dbNum; i++) { - auto db = std::make_unique(i, g_config.dbpath); - backends_.push_back(std::move(db)); - } - } else { - ERROR("unsupport backend!"); + for (int i = 0; i < dbNum; i++) { + auto db = std::make_unique(i, g_config.db_path); + backends_.push_back(std::move(db)); + INFO("Open DB_{} success!", i); } + INFO("STORE Init success!"); } } // namespace pikiwidb diff --git a/tests/admin_test.go b/tests/admin_test.go index f476a0b6a..f9f33c901 100644 --- a/tests/admin_test.go +++ b/tests/admin_test.go @@ -122,4 +122,29 @@ var _ = Describe("Admin", Ordered, func() { Expect(eDel).NotTo(HaveOccurred()) Expect(rDel).To(Equal(int64(1))) }) + + It("Cmd Config", func() { + res := client.ConfigGet(ctx, "timeout") + Expect(res.Err()).NotTo(HaveOccurred()) + Expect(res.Val()).To(Equal(map[string]string{"timeout": "0"})) + + res = client.ConfigGet(ctx, "daemonize") + Expect(res.Err()).NotTo(HaveOccurred()) + Expect(res.Val()).To(Equal(map[string]string{"daemonize": "no"})) + + resSet := client.ConfigSet(ctx, "timeout", "60") + Expect(resSet.Err()).NotTo(HaveOccurred()) + Expect(resSet.Val()).To(Equal("OK")) + + resSet = client.ConfigSet(ctx, "daemonize", "yes") + Expect(resSet.Err()).To(MatchError("ERR Invalid Argument")) + + res = client.ConfigGet(ctx, "timeout") + Expect(res.Err()).NotTo(HaveOccurred()) + Expect(res.Val()).To(Equal(map[string]string{"timeout": "60"})) + + res = client.ConfigGet(ctx, "time*") + Expect(res.Err()).NotTo(HaveOccurred()) + Expect(res.Val()).To(Equal(map[string]string{"timeout": "60"})) + }) }) diff --git a/tests/hash_test.go b/tests/hash_test.go index c9714f355..7dffb6306 100644 --- a/tests/hash_test.go +++ b/tests/hash_test.go @@ -355,10 +355,10 @@ var _ = Describe("Hash", Ordered, func() { hSet = client.HSet(ctx, "hScanTest", "key3", "value3") Expect(hSet.Err()).NotTo(HaveOccurred()) - hScan := client.HScan(ctx, "hScanTest", 0,"key*",3) + hScan := client.HScan(ctx, "hScanTest", 0, "key*", 3) Expect(hScan.Err()).NotTo(HaveOccurred()) keys, cursor := hScan.Val() Expect(cursor).To(Equal(uint64(0))) - Expect(keys).To(ConsistOf([]string{"key1", "value1","key2","value2", "key3","value3"})) + Expect(keys).To(ConsistOf([]string{"key1", "value1", "key2", "value2", "key3", "value3"})) }) }) diff --git a/tests/set_test.go b/tests/set_test.go index af784e936..d9191c3e3 100644 --- a/tests/set_test.go +++ b/tests/set_test.go @@ -227,18 +227,17 @@ var _ = Describe("Set", Ordered, func() { sCard := client.SCard(ctx, "setScard") Expect(sCard.Err()).NotTo(HaveOccurred()) Expect(sCard.Val()).To(Equal(int64(2))) - }) - - - It("should SPop", func() { - sAdd := client.SAdd(ctx, "setSpop", "one") - Expect(sAdd.Err()).NotTo(HaveOccurred()) - sAdd = client.SAdd(ctx, "setSpop", "two") - Expect(sAdd.Err()).NotTo(HaveOccurred()) - sAdd = client.SAdd(ctx, "setSpop", "three") - Expect(sAdd.Err()).NotTo(HaveOccurred()) - sAdd = client.SAdd(ctx, "setSpop", "four") - Expect(sAdd.Err()).NotTo(HaveOccurred()) + }) + + It("should SPop", func() { + sAdd := client.SAdd(ctx, "setSpop", "one") + Expect(sAdd.Err()).NotTo(HaveOccurred()) + sAdd = client.SAdd(ctx, "setSpop", "two") + Expect(sAdd.Err()).NotTo(HaveOccurred()) + sAdd = client.SAdd(ctx, "setSpop", "three") + Expect(sAdd.Err()).NotTo(HaveOccurred()) + sAdd = client.SAdd(ctx, "setSpop", "four") + Expect(sAdd.Err()).NotTo(HaveOccurred()) sAdd = client.SAdd(ctx, "setSpop", "five") Expect(sAdd.Err()).NotTo(HaveOccurred()) @@ -246,12 +245,11 @@ var _ = Describe("Set", Ordered, func() { Expect(sPopN.Err()).NotTo(HaveOccurred()) Expect(sPopN.Val()).To(HaveLen(3)) /* - sMembers := client.SMembers(ctx, "setSpop") - Expect(sMembers.Err()).NotTo(HaveOccurred()) - Expect(sMembers.Val()).To(HaveLen(2)) + sMembers := client.SMembers(ctx, "setSpop") + Expect(sMembers.Err()).NotTo(HaveOccurred()) + Expect(sMembers.Val()).To(HaveLen(2)) */ - }) It("should SMove", func() { @@ -292,27 +290,27 @@ var _ = Describe("Set", Ordered, func() { Expect(sRem.Err()).NotTo(HaveOccurred()) Expect(sRem.Val()).To(Equal(int64(0))) - // sMembers := client.SMembers(ctx, "set") - // Expect(sMembers.Err()).NotTo(HaveOccurred()) - // Expect(sMembers.Val()).To(ConsistOf([]string{"three", "two"})) + // sMembers := client.SMembers(ctx, "set") + // Expect(sMembers.Err()).NotTo(HaveOccurred()) + // Expect(sMembers.Val()).To(ConsistOf([]string{"three", "two"})) }) It("should SRandmember", func() { - sAdd := client.SAdd(ctx, "set", "one") - Expect(sAdd.Err()).NotTo(HaveOccurred()) - sAdd = client.SAdd(ctx, "set", "two") - Expect(sAdd.Err()).NotTo(HaveOccurred()) - sAdd = client.SAdd(ctx, "set", "three") - Expect(sAdd.Err()).NotTo(HaveOccurred()) - - member, err := client.SRandMember(ctx, "set").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(member).NotTo(Equal("")) - - members, err := client.SRandMemberN(ctx, "set", 2).Result() - Expect(err).NotTo(HaveOccurred()) + sAdd := client.SAdd(ctx, "set", "one") + Expect(sAdd.Err()).NotTo(HaveOccurred()) + sAdd = client.SAdd(ctx, "set", "two") + Expect(sAdd.Err()).NotTo(HaveOccurred()) + sAdd = client.SAdd(ctx, "set", "three") + Expect(sAdd.Err()).NotTo(HaveOccurred()) + + member, err := client.SRandMember(ctx, "set").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(member).NotTo(Equal("")) + + members, err := client.SRandMemberN(ctx, "set", 2).Result() + Expect(err).NotTo(HaveOccurred()) Expect(members).To(HaveLen(2)) - }) + }) It("should SMembers", func() { sAdd := client.SAdd(ctx, "setSMembers", "Hello") @@ -393,27 +391,24 @@ var _ = Describe("Set", Ordered, func() { Expect(sAdd.Val()).To(Equal(int64(2))) // func (c Client) SScan(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd - sScan:=client.SScan(ctx,"setSScan1",0,"*",5) + sScan := client.SScan(ctx, "setSScan1", 0, "*", 5) Expect(sScan.Err()).NotTo(HaveOccurred()) - Expect(sScan.Val()).To(ConsistOf([]string{"user1", "user2","user3","Hello","World"})) - - sScan=client.SScan(ctx,"setSScan1",0,"user*",5) + Expect(sScan.Val()).To(ConsistOf([]string{"user1", "user2", "user3", "Hello", "World"})) + + sScan = client.SScan(ctx, "setSScan1", 0, "user*", 5) Expect(sScan.Err()).NotTo(HaveOccurred()) - Expect(sScan.Val()).To(ConsistOf([]string{"user1", "user2","user3"})) + Expect(sScan.Val()).To(ConsistOf([]string{"user1", "user2", "user3"})) - sScan=client.SScan(ctx,"setSScan1",0,"He*",5) + sScan = client.SScan(ctx, "setSScan1", 0, "He*", 5) Expect(sScan.Err()).NotTo(HaveOccurred()) Expect(sScan.Val()).To(ConsistOf([]string{"Hello"})) - + // sScan=client.SScan(ctx,"setSScan1",0,"*",-1) // Expect(sScan.Err()).To(HaveOccurred()) - //del del := client.Del(ctx, "setSScan1") Expect(del.Err()).NotTo(HaveOccurred()) }) - - })