diff --git a/CMakeLists.txt b/CMakeLists.txt index 511e0b4d6..9f0c09fc8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,11 +19,21 @@ IF (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") ENDIF () ELSEIF (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") # using GCC - IF (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "10.0") - MESSAGE(FATAL_ERROR "GCC G++ version must be greater than 10.0") + IF (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "11.0") + MESSAGE(FATAL_ERROR "GCC G++ version must be greater than 11.0") ENDIF () ENDIF () +# get current date and time +EXECUTE_PROCESS(COMMAND date "+%Y-%m-%d_%H:%M:%S" OUTPUT_VARIABLE BUILD_TIMESTAMP OUTPUT_STRIP_TRAILING_WHITESPACE) +ADD_DEFINITIONS(-DKPIKIWIDB_BUILD_DATE="${BUILD_TIMESTAMP}") + +message(STATUS "Build timestamp: ${BUILD_TIMESTAMP}") + +# get git commit id +EXECUTE_PROCESS(COMMAND git rev-parse HEAD OUTPUT_VARIABLE GIT_COMMIT_ID OUTPUT_STRIP_TRAILING_WHITESPACE) +ADD_DEFINITIONS(-DKPIKIWIDB_GIT_COMMIT_ID="${GIT_COMMIT_ID}") +MESSAGE(STATUS "Git commit id: ${GIT_COMMIT_ID}") ############# You should enable sanitizer if you are developing pika ############# # Uncomment the following two lines to enable AddressSanitizer to detect memory leaks and other memory-related bugs. diff --git a/README.md b/README.md index 6bce1e508..69fdc0cce 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,42 @@ ![](docs/images/pikiwidb-logo.png) [中文](README_CN.md) -A C++11 implementation of Redis Server, use RocksDB for persist storage.(not including cluster yet) +A C++20 implementation of Redis Server, use RocksDB for persist storage.(not including cluster yet) ## Requirements -* C++11 + +* C++20 * Linux or OS X +## compile + +**It is recommended to use the latest version of Ubuntu or Debian for Linux systems** + +Execute compilation + +If the machine's GCC version is less than 11, especially on CentOS6 or CentOS7, you need to upgrade the gcc version firstly. + +Execute the following commands on CentOS: + +```bash +sudo yum -y install centos-release-scl +sudo yum -y install devtoolset-11-gcc devtoolset-11-gcc-c++ +scl enable devtoolset-11 bash +``` + +Execute this command to start compiling Pikiwidb: + +```bash +./build.sh +``` + +Pikiwidb is compiled by default in release mode, which does not support debugging. If debugging is needed, compile in debug mode. + +```bash +./clean.sh +./build.sh --debug +``` + ## Support module for write your own extensions PikiwiDB supports module now, still in progress, much work to do. I added three commands(ldel, skeys, hgets) for demonstration. diff --git a/README_CN.md b/README_CN.md index 9f41a66a2..45f29f4f9 100644 --- a/README_CN.md +++ b/README_CN.md @@ -2,79 +2,122 @@ ![](docs/images/pikiwidb-logo.png) [Click me switch to English](README.en.md) -C++11实现的增强版Redis服务器,使用RocksDB作为持久化存储引擎。(集群支持尚正在计划中) +C++20 实现的增强版 Redis 服务器,使用 RocksDB 作为持久化存储引擎。(集群支持尚正在计划中) ## 环境需求 -* C++11、CMake + +* C++20、CMake * Linux 或 MAC OS -## 与Redis完全兼容 - 你可以用redis的各种工具来测试PikiwiDB,比如官方的redis-cli, redis-benchmark。 +## 编译 + +**建议使用最新版本的 Ubuntu 或 Debian Linux 系统** + +执行编译: + +如果机器的 GCC 版本低于 11,特别是在 CentOS 6.x 或 CentOS 7.x 上,你需要先升级 GCC 版本。 + +在 CentOS 上执行以下命令: + +```bash +sudo yum -y install centos-release-scl +sudo yum -y install devtoolset-11-gcc devtoolset-11-gcc-c++ +scl enable devtoolset-11 bash +``` + +执行以下命令开始编译 PikiwiDB: + +```bash +./build.sh +``` + +PikiwiDB 默认以 release 模式编译,不支持调试。如果需要调试,请以 debug 模式编译。 + +```bash +./clean.sh +./build.sh --debug +``` + +## 与 Redis 完全兼容 - PikiwiDB可以和redis之间进行复制,可以读取redis的rdb文件或aof文件。当然,PikiwiDB生成的aof或rdb文件也可以被redis读取。 +你可以用 Redis 的各种工具来测试 PikiwiDB,比如官方的 redis-cli, redis-benchmark。 - 你还可以用redis-sentinel来实现PikiwiDB的高可用! +PikiwiDB 可以和 Redis 之间进行复制,可以读取 Redis 的 rdb 文件或 aof 文件。当然,PikiwiDB 生成的 aof 或 rdb 文件也可以被 Redis 读取。 - 总之,PikiwiDB与Redis完全兼容。 +你还可以用 redis-sentinel 来实现 PikiwiDB 的高可用! + +总之,PikiwiDB 与 Redis 完全兼容。 ## 高性能 -- PikiwiDB性能大约比Redis3.2高出20%(使用redis-benchmark测试pipeline请求,比如设置-P=50或更高) -- PikiwiDB的高性能有一部分得益于独立的网络线程处理IO,因此和redis比占了便宜。但PikiwiDB逻辑仍然是单线程的。 -- 另一部分得益于C++ STL的高效率(CLANG的表现比GCC更好)。 -- 在测试前,你要确保std::list的size()是O(1)复杂度,这才遵循C++11的标准。否则list相关命令不可测。 -运行下面这个命令,试试和redis比一比~ +- PikiwiDB 性能大约比 Redis 3.2 高出 20% (使用 redis-benchmark 测试 pipeline 请求,比如设置 -P=50 或更高) +- PikiwiDB 的高性能有一部分得益于独立的网络线程处理 IO,因此和 redis 比占了便宜。但 PikiwiDB 逻辑仍然是单线程的。 +- 另一部分得益于 C++ STL 的高效率(CLANG 的表现比 GCC 更好)。 +- 在测试前,你要确保 std::list 的 size() 是 O(1) 复杂度,这才遵循 C++11 的标准。否则 list 相关命令不可测。 + +运行下面这个命令,试试和 redis 比一比~ ```bash ./redis-benchmark -q -n 1000000 -P 50 -c 50 ``` -## 编写扩展模块 - PikiwiDB支持动态库模块,可以在运行时添加新命令。 - 我添加了三个命令(ldel, skeys, hgets)作为演示。 - ## 支持冷数据淘汰 - 是的,在内存受限的情况下,你可以让PikiwiDB根据简单的LRU算法淘汰一些key以释放内存。 + +是的,在内存受限的情况下,你可以让 PikiwiDB 根据简单的 LRU 算法淘汰一些 key 以释放内存。 ## 主从复制,事务,RDB/AOF持久化,慢日志,发布订阅 - 这些特性PikiwiDB都有:-) + +这些特性 PikiwiDB 都有:-) ## 持久化:内存不再是上限 - RocksDB可以配置为PikiwiDB的持久化存储引擎,可以存储更多的数据。 +RocksDB 可以配置为 PikiwiDB 的持久化存储引擎,可以存储更多的数据。 ## 命令列表 -#### 展示PikiwiDB支持的所有命令 + +#### 展示 PikiwiDB 支持的所有命令 + - cmdlist #### key commands + - type exists del expire pexpire expireat pexpireat ttl pttl persist move keys randomkey rename renamenx scan sort #### server commands + - select dbsize bgsave save lastsave flushdb flushall client debug shutdown bgrewriteaof ping echo info monitor auth #### string commands + - set get getrange setrange getset append bitcount bitop getbit setbit incr incrby incrbyfloat decr decrby mget mset msetnx setnx setex psetex strlen #### list commands + - lpush rpush lpushx rpushx lpop rpop lindex llen lset ltrim lrange linsert lrem rpoplpush blpop brpop brpoplpush #### hash commands + - hget hmget hgetall hset hsetnx hmset hlen hexists hkeys hvals hdel hincrby hincrbyfloat hscan hstrlen #### set commands + - sadd scard srem sismember smembers sdiff sdiffstore sinter sinterstore sunion sunionstore smove spop srandmember sscan #### sorted set commands + - zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore #### pubsub commands + - subscribe unsubscribe publish psubscribe punsubscribe pubsub #### multi commands + - watch unwatch multi exec discard #### replication commands + - sync slaveof + ## Contact Us diff --git a/build.sh b/build.sh index a7d6c28d9..0cf18b5b0 100755 --- a/build.sh +++ b/build.sh @@ -6,12 +6,6 @@ C_GREEN="\033[32m" C_END="\033[0m" -BUILD_TIME=$(git log -1 --format=%ai) -BUILD_TIME=${BUILD_TIME: 0: 10} - -COMMIT_ID=$(git rev-parse HEAD) -SHORT_COMMIT_ID=${COMMIT_ID: 0: 8} - BUILD_TYPE=release VERBOSE=0 CMAKE_FLAGS="" @@ -66,19 +60,11 @@ fi echo "cpu core ${CPU_CORE}" -if [ -z "$SHORT_COMMIT_ID" ]; then - echo "no git commit id" - SHORT_COMMIT_ID="pikiwidb" -fi - -echo "BUILD_TIME:" $BUILD_TIME -echo "COMMIT_ID:" $SHORT_COMMIT_ID - echo "BUILD_TYPE:" $BUILD_TYPE echo "CMAKE_FLAGS:" $CMAKE_FLAGS echo "MAKE_FLAGS:" $MAKE_FLAGS -cmake -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_TIME=$BUILD_TIME -DGIT_COMMIT_ID=$SHORT_COMMIT_ID ${CMAKE_FLAGS} -S . -B ${PREFIX} +cmake -DCMAKE_BUILD_TYPE=${BUILD_TYPE} ${CMAKE_FLAGS} -S . -B ${PREFIX} cmake --build ${PREFIX} -- ${MAKE_FLAGS} -j ${CPU_CORE} if [ $? -eq 0 ]; then diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 79c0892a5..5cd855f56 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -15,8 +15,8 @@ BaseCmd::BaseCmd(std::string name, int16_t arity, uint32_t flag, uint32_t aclCat name_ = std::move(name); arity_ = arity; flag_ = flag; - aclCategory_ = aclCategory; - cmdId_ = g_pikiwidb->GetCmdTableManager().GetCmdId(); + acl_category_ = aclCategory; + cmd_id_ = g_pikiwidb->GetCmdID(); } bool BaseCmd::CheckArg(size_t num) const { @@ -55,13 +55,13 @@ void BaseCmd::SetFlag(uint32_t flag) { flag_ |= flag; } void BaseCmd::ResetFlag(uint32_t flag) { flag_ &= ~flag; } bool BaseCmd::HasSubCommand() const { return false; } BaseCmd* BaseCmd::GetSubCmd(const std::string& cmdName) { return nullptr; } -uint32_t BaseCmd::AclCategory() const { return aclCategory_; } -void BaseCmd::AddAclCategory(uint32_t aclCategory) { aclCategory_ |= aclCategory; } +uint32_t BaseCmd::AclCategory() const { return acl_category_; } +void BaseCmd::AddAclCategory(uint32_t aclCategory) { acl_category_ |= aclCategory; } std::string BaseCmd::Name() const { return name_; } // CmdRes& BaseCommand::Res() { return res_; } // void BaseCommand::SetResp(const std::shared_ptr& resp) { resp_ = resp; } // std::shared_ptr BaseCommand::GetResp() { return resp_.lock(); } -uint32_t BaseCmd::GetCmdId() const { return cmdId_; } +uint32_t BaseCmd::GetCmdID() const { return cmd_id_; } // BaseCmdGroup BaseCmdGroup::BaseCmdGroup(const std::string& name, uint32_t flag) : BaseCmdGroup(name, -2, flag) {} diff --git a/src/base_cmd.h b/src/base_cmd.h index dd67c37f8..f29dc1fe8 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -285,7 +285,7 @@ class BaseCmd : public std::enable_shared_from_this { // void SetResp(const std::shared_ptr& resp); // std::shared_ptr GetResp(); - uint32_t GetCmdId() const; + uint32_t GetCmdID() const; bool isExclusive() { return static_cast(flag_ & kCmdFlagsExclusive); } @@ -303,8 +303,8 @@ class BaseCmd : public std::enable_shared_from_this { // std::weak_ptr resp_; // uint64_t doDuration_ = 0; - uint32_t cmdId_ = 0; - uint32_t aclCategory_ = 0; + uint32_t cmd_id_ = 0; + uint32_t acl_category_ = 0; private: // The function to be executed first before executing `DoCmd` diff --git a/src/client.cc b/src/client.cc index 4296a7b96..e85f7f058 100644 --- a/src/client.cc +++ b/src/client.cc @@ -289,7 +289,7 @@ int PClient::handlePacket(const char* start, int bytes) { return static_cast(ptr - start); } - DEFER { reset(); }; + // DEFER { reset(); }; // handle packet // const auto& params = parser_.GetParams(); @@ -329,10 +329,12 @@ int PClient::handlePacket(const char* start, int bytes) { // const PCommandInfo* info = PCommandTable::GetCommandInfo(cmdName_); // if (!info) { // 如果这个命令不存在,那么就走新的命令处理流程 - executeCommand(); + // executeCommand(); // return static_cast(ptr - start); // } + g_pikiwidb->SubmitFast(std::make_shared(shared_from_this())); + // check transaction // if (IsFlagOn(ClientFlag_multi)) { // if (cmdName_ != kCmdNameMulti && cmdName_ != kCmdNameExec && cmdName_ != kCmdNameWatch && @@ -376,24 +378,24 @@ int PClient::handlePacket(const char* start, int bytes) { // 为了兼容老的命令处理流程,新的命令处理流程在这里 // 后面可以把client这个类重构,完整的支持新的命令处理流程 void PClient::executeCommand() { - auto [cmdPtr, ret] = g_pikiwidb->GetCmdTableManager().GetCommand(CmdName(), this); - - if (!cmdPtr) { - if (ret == CmdRes::kInvalidParameter) { - SetRes(CmdRes::kInvalidParameter); - } else { - SetRes(CmdRes::kSyntaxErr, "unknown command '" + CmdName() + "'"); - } - return; - } + // auto [cmdPtr, ret] = g_pikiwidb->GetCmdTableManager().GetCommand(CmdName(), this); - if (!cmdPtr->CheckArg(params_.size())) { - SetRes(CmdRes::kWrongNum, CmdName()); - return; - } - - // execute a specific command - cmdPtr->Execute(this); + // if (!cmdPtr) { + // if (ret == CmdRes::kInvalidParameter) { + // SetRes(CmdRes::kInvalidParameter); + // } else { + // SetRes(CmdRes::kSyntaxErr, "unknown command '" + CmdName() + "'"); + // } + // return; + // } + // + // if (!cmdPtr->CheckArg(params_.size())) { + // SetRes(CmdRes::kWrongNum, CmdName()); + // return; + // } + // + // // execute a specific command + // cmdPtr->Execute(this); } PClient* PClient::Current() { return s_current; } @@ -420,13 +422,14 @@ int PClient::HandlePackets(pikiwidb::TcpConnection* obj, const char* start, int total += processed; } - obj->SendPacket(Message()); - Clear(); + // obj->SendPacket(Message()); + // Clear(); // reply_.Clear(); return total; } void PClient::OnConnect() { + SetState(ClientState::kOK); if (isPeerMaster()) { PREPL.SetMasterState(kPReplStateConnected); PREPL.SetMaster(std::static_pointer_cast(shared_from_this())); @@ -446,7 +449,7 @@ void PClient::OnConnect() { const std::string& PClient::PeerIP() const { if (auto c = getTcpConnection(); c) { - return c->GetPeerIp(); + return c->GetPeerIP(); } static const std::string kEmpty; @@ -492,7 +495,17 @@ bool PClient::SendPacket(const evbuffer_iovec* iovecs, size_t nvecs) { return false; } +void PClient::WriteReply2Client() { + if (auto c = getTcpConnection(); c) { + c->SendPacket(Message()); + } + Clear(); + reset(); +} + void PClient::Close() { + SetState(ClientState::kClosed); + reset(); if (auto c = getTcpConnection(); c) { c->ActiveClose(); tcp_connection_.reset(); diff --git a/src/client.h b/src/client.h index 8d5025934..7e0940eaa 100644 --- a/src/client.h +++ b/src/client.h @@ -101,6 +101,11 @@ enum ClientFlag { kClientFlagMaster = (1 << 3), }; +enum class ClientState { + kOK, + kClosed, +}; + class DB; struct PSlaveInfo; @@ -121,6 +126,8 @@ class PClient : public std::enable_shared_from_this, public CmdRes { bool SendPacket(UnboundedBuffer& data); bool SendPacket(const evbuffer_iovec* iovecs, size_t nvecs); + void WriteReply2Client(); + void Close(); // dbno @@ -196,6 +203,12 @@ class PClient : public std::enable_shared_from_this, public CmdRes { bool GetAuth() const { return auth_; } void RewriteCmd(std::vector& params) { parser_.SetParams(params); } + inline size_t ParamsSize() const { return params_.size(); } + + inline ClientState State() const { return state_; } + + inline void SetState(ClientState state) { state_ = state; } + // All parameters of this command (including the command itself) // e.g:["set","key","value"] std::span argv_; @@ -245,6 +258,8 @@ class PClient : public std::enable_shared_from_this, public CmdRes { bool auth_ = false; time_t last_auth_ = 0; + ClientState state_; + static thread_local PClient* s_current; }; } // namespace pikiwidb diff --git a/src/cmd_thread_pool.cc b/src/cmd_thread_pool.cc new file mode 100644 index 000000000..03b44b7d4 --- /dev/null +++ b/src/cmd_thread_pool.cc @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "cmd_thread_pool.h" +#include "cmd_thread_pool_worker.h" +#include "log.h" + +namespace pikiwidb { + +void CmdThreadPoolTask::Run(BaseCmd *cmd) { cmd->Execute(client_.get()); } +const std::string &CmdThreadPoolTask::CmdName() { return client_->CmdName(); } +std::shared_ptr CmdThreadPoolTask::Client() { return client_; } + +CmdThreadPool::CmdThreadPool(std::string name) : name_(std::move(name)) {} + +pstd::Status CmdThreadPool::Init(int fast_thread, int slow_thread, std::string name) { + if (fast_thread <= 0) { + return pstd::Status::InvalidArgument("thread num must be positive"); + } + name_ = std::move(name); + fast_thread_num_ = fast_thread; + slow_thread_num_ = slow_thread; + threads_.reserve(fast_thread_num_ + slow_thread_num_); + workers_.reserve(fast_thread_num_ + slow_thread_num_); + return pstd::Status::OK(); +} + +void CmdThreadPool::Start() { + for (int i = 0; i < fast_thread_num_; ++i) { + auto fastWorker = std::make_shared(this, 2, "fast worker" + std::to_string(i)); + std::thread thread(&CmdWorkThreadPoolWorker::Work, fastWorker); + threads_.emplace_back(std::move(thread)); + workers_.emplace_back(fastWorker); + INFO("fast worker [{}] starting ...", i); + } + for (int i = 0; i < slow_thread_num_; ++i) { + auto slowWorker = std::make_shared(this, 2, "slow worker" + std::to_string(i)); + std::thread thread(&CmdWorkThreadPoolWorker::Work, slowWorker); + threads_.emplace_back(std::move(thread)); + workers_.emplace_back(slowWorker); + INFO("slow worker [{}] starting ...", i); + } +} + +void CmdThreadPool::SubmitFast(const std::shared_ptr &runner) { + std::unique_lock rl(fast_mutex_); + fast_tasks_.emplace_back(runner); + fast_condition_.notify_one(); +} + +void CmdThreadPool::SubmitSlow(const std::shared_ptr &runner) { + std::unique_lock rl(slow_mutex_); + slow_tasks_.emplace_back(runner); + slow_condition_.notify_one(); +} + +void CmdThreadPool::Stop() { DoStop(); } + +void CmdThreadPool::DoStop() { + if (stopped_.load()) { + return; + } + stopped_.store(true); + + for (auto &worker : workers_) { + worker->Stop(); + } + + { + std::unique_lock fl(fast_mutex_); + fast_condition_.notify_all(); + } + { + std::unique_lock sl(slow_mutex_); + slow_condition_.notify_all(); + } + + for (auto &thread : threads_) { + if (thread.joinable()) { + thread.join(); + } + } + threads_.clear(); + workers_.clear(); + fast_tasks_.clear(); + slow_tasks_.clear(); +} + +CmdThreadPool::~CmdThreadPool() { DoStop(); } + +} // namespace pikiwidb diff --git a/src/cmd_thread_pool.h b/src/cmd_thread_pool.h new file mode 100644 index 000000000..3b65d6c87 --- /dev/null +++ b/src/cmd_thread_pool.h @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include "base_cmd.h" +#include "pstd_status.h" + +namespace pikiwidb { + +// task interface +// inherit this class and implement the Run method +// then submit the task to the thread pool +class CmdThreadPoolTask { + public: + CmdThreadPoolTask(std::shared_ptr client) : client_(std::move(client)) {} + void Run(BaseCmd *cmd); + const std::string &CmdName(); + std::shared_ptr Client(); + + private: + std::shared_ptr client_; +}; + +class CmdWorkThreadPoolWorker; + +class CmdFastWorker; + +class CmdSlowWorker; + +class CmdThreadPool { + friend CmdWorkThreadPoolWorker; + friend CmdFastWorker; + friend CmdSlowWorker; + + public: + explicit CmdThreadPool() = default; + + explicit CmdThreadPool(std::string name); + + pstd::Status Init(int fast_thread, int slow_thread, std::string name); + + // start the thread pool + void Start(); + + // stop the thread pool + void Stop(); + + // submit a fast task to the thread pool + void SubmitFast(const std::shared_ptr &runner); + + // submit a slow task to the thread pool + void SubmitSlow(const std::shared_ptr &runner); + + // get the fast thread num + inline int FastThreadNum() const { return fast_thread_num_; }; + + // get the slow thread num + inline int SlowThreadNum() const { return slow_thread_num_; }; + + // get the thread pool size + inline int ThreadPollSize() const { return fast_thread_num_ + slow_thread_num_; }; + + ~CmdThreadPool(); + + private: + void DoStop(); + + private: + std::deque> fast_tasks_; // fast task queue + std::deque> slow_tasks_; // slow task queue + + std::vector threads_; + std::vector> workers_; + std::string name_; // thread pool name + int fast_thread_num_ = 0; + int slow_thread_num_ = 0; + std::mutex fast_mutex_; + std::condition_variable fast_condition_; + std::mutex slow_mutex_; + std::condition_variable slow_condition_; + std::atomic_bool stopped_ = false; +}; + +} // namespace pikiwidb diff --git a/src/cmd_thread_pool_worker.cc b/src/cmd_thread_pool_worker.cc new file mode 100644 index 000000000..cafa31a71 --- /dev/null +++ b/src/cmd_thread_pool_worker.cc @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "cmd_thread_pool_worker.h" +#include "log.h" +#include "pikiwidb.h" + +namespace pikiwidb { + +void CmdWorkThreadPoolWorker::Work() { + while (running_) { + LoadWork(); + for (const auto &task : self_task_) { + if (task->Client()->State() != ClientState::kOK) { // the client is closed + continue; + } + auto [cmdPtr, ret] = cmd_table_manager_.GetCommand(task->CmdName(), task->Client().get()); + + if (!cmdPtr) { + if (ret == CmdRes::kInvalidParameter) { + task->Client()->SetRes(CmdRes::kInvalidParameter); + } else { + task->Client()->SetRes(CmdRes::kSyntaxErr, "unknown command '" + task->CmdName() + "'"); + } + g_pikiwidb->PushWriteTask(task->Client()); + continue; + } + + if (!cmdPtr->CheckArg(task->Client()->ParamsSize())) { + task->Client()->SetRes(CmdRes::kWrongNum, task->CmdName()); + g_pikiwidb->PushWriteTask(task->Client()); + continue; + } + task->Run(cmdPtr); + g_pikiwidb->PushWriteTask(task->Client()); + } + self_task_.clear(); + } + INFO("worker [{}] goodbye...", name_); +} + +void CmdWorkThreadPoolWorker::Stop() { running_ = false; } + +void CmdFastWorker::LoadWork() { + std::unique_lock lock(pool_->fast_mutex_); + while (pool_->fast_tasks_.empty()) { + if (!running_) { + return; + } + pool_->fast_condition_.wait(lock); + } + + if (pool_->fast_tasks_.empty()) { + return; + } + const auto num = std::min(static_cast(pool_->fast_tasks_.size()), once_task_); + std::move(pool_->fast_tasks_.begin(), pool_->fast_tasks_.begin() + num, std::back_inserter(self_task_)); + pool_->fast_tasks_.erase(pool_->fast_tasks_.begin(), pool_->fast_tasks_.begin() + num); +} + +void CmdSlowWorker::LoadWork() { + { + std::unique_lock lock(pool_->slow_mutex_); + while (pool_->slow_tasks_.empty() && loop_more_) { // loopMore is used to get the fast worker + if (!running_) { + return; + } + pool_->slow_condition_.wait_for(lock, std::chrono::milliseconds(wait_time_)); + loop_more_ = false; + } + + const auto num = std::min(static_cast(pool_->slow_tasks_.size()), once_task_); + if (num > 0) { + std::move(pool_->slow_tasks_.begin(), pool_->slow_tasks_.begin() + num, std::back_inserter(self_task_)); + pool_->slow_tasks_.erase(pool_->slow_tasks_.begin(), pool_->slow_tasks_.begin() + num); + return; // If the slow task is obtained, the fast task is no longer obtained + } + } + + { + std::unique_lock lock(pool_->fast_mutex_); + loop_more_ = true; + + const auto num = std::min(static_cast(pool_->fast_tasks_.size()), once_task_); + if (num > 0) { + std::move(pool_->fast_tasks_.begin(), pool_->fast_tasks_.begin() + num, std::back_inserter(self_task_)); + pool_->fast_tasks_.erase(pool_->fast_tasks_.begin(), pool_->fast_tasks_.begin() + num); + } + } +} + +} // namespace pikiwidb diff --git a/src/cmd_thread_pool_worker.h b/src/cmd_thread_pool_worker.h new file mode 100644 index 000000000..ecc9361da --- /dev/null +++ b/src/cmd_thread_pool_worker.h @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include +#include + +#include "cmd_table_manager.h" +#include "cmd_thread_pool.h" + +namespace pikiwidb { + +class CmdWorkThreadPoolWorker { + public: + explicit CmdWorkThreadPoolWorker(CmdThreadPool *pool, int onceTask, std::string name) + : pool_(pool), once_task_(onceTask), name_(std::move(name)) { + cmd_table_manager_.InitCmdTable(); + } + + void Work(); + + void Stop(); + + // load the task from the thread pool + virtual void LoadWork() = 0; + + virtual ~CmdWorkThreadPoolWorker() = default; + + protected: + std::vector> self_task_; // the task that the worker get from the thread pool + CmdThreadPool *pool_ = nullptr; + const int once_task_ = 0; // the max task num that the worker can get from the thread pool + const std::string name_; + bool running_ = true; + + pikiwidb::CmdTableManager cmd_table_manager_; +}; + +// fast worker +class CmdFastWorker : public CmdWorkThreadPoolWorker { + public: + explicit CmdFastWorker(CmdThreadPool *pool, int onceTask, std::string name) + : CmdWorkThreadPoolWorker(pool, onceTask, std::move(name)) {} + + void LoadWork() override; +}; + +// slow worker +class CmdSlowWorker : public CmdWorkThreadPoolWorker { + public: + explicit CmdSlowWorker(CmdThreadPool *pool, int onceTask, std::string name) + : CmdWorkThreadPoolWorker(pool, onceTask, std::move(name)) {} + + // when the slow worker queue is empty, it will try to get the fast worker + void LoadWork() override; + + private: + bool loop_more_ = false; // When the slow queue is empty, try to get the fast queue + int wait_time_ = 200; // When the slow queue is empty, wait 200 ms to check again +}; + +} // namespace pikiwidb diff --git a/src/config.cc b/src/config.cc index dbbe20979..6a656817e 100644 --- a/src/config.cc +++ b/src/config.cc @@ -172,6 +172,8 @@ bool LoadPikiwiDBConfig(const char* cfgFile, PConfig& cfg) { // slave threads cfg.slave_threads_num = parser.GetData("slave-threads", 1); + cfg.fast_cmd_threads_num = parser.GetData("fast-threads", 1); + cfg.slow_cmd_threads_num = parser.GetData("slow-threads", 1); // backend cfg.backend = parser.GetData("backend", kBackEndNone); @@ -202,6 +204,8 @@ bool PConfig::CheckArgs() const { RETURN_IF_FAIL(maxmemory >= 512 * 1024 * 1024UL); RETURN_IF_FAIL(maxmemorySamples > 0 && maxmemorySamples < 10); RETURN_IF_FAIL(worker_threads_num > 0 && worker_threads_num < 129); // as redis + RETURN_IF_FAIL(fast_cmd_threads_num > 0 && fast_cmd_threads_num < 16); + RETURN_IF_FAIL(slow_cmd_threads_num > 0 && slow_cmd_threads_num < 16); RETURN_IF_FAIL(backend >= kBackEndNone && backend < kBackEndMax); RETURN_IF_FAIL(backendHz >= 1 && backendHz <= 50); RETURN_IF_FAIL(db_instance_num >= 1); diff --git a/src/config.h b/src/config.h index 14aecf70e..718c47423 100644 --- a/src/config.h +++ b/src/config.h @@ -78,6 +78,9 @@ struct PConfig { // THREADED SLAVE int slave_threads_num; + int fast_cmd_threads_num; + int slow_cmd_threads_num; + int backend; // enum BackEndType PString backendPath; int backendHz; // the frequency of dump to backend diff --git a/src/io_thread_pool.cc b/src/io_thread_pool.cc index a5332e571..0aa0a5015 100644 --- a/src/io_thread_pool.cc +++ b/src/io_thread_pool.cc @@ -60,7 +60,9 @@ void IOThreadPool::Run(int ac, char* av[]) { base_.Run(); for (auto& w : worker_threads_) { - w.join(); + if (w.joinable()) { + w.join(); + } } worker_threads_.clear(); @@ -168,4 +170,72 @@ void IOThreadPool::Reset() { BaseLoop()->Reset(); } +void WorkIOThreadPool::PushWriteTask(std::shared_ptr client) { + auto pos = ++counter_ % worker_num_; + std::unique_lock lock(*writeMutex_[pos]); + + writeQueue_[pos].emplace_back(client); + writeCond_[pos]->notify_one(); +} + +void WorkIOThreadPool::StartWorkers() { + // only called by main thread + assert(state_ == State::kNone); + + IOThreadPool::StartWorkers(); + + writeMutex_.reserve(worker_num_); + writeCond_.reserve(worker_num_); + writeQueue_.reserve(worker_num_); + for (size_t index = 0; index < worker_num_; ++index) { + writeMutex_.emplace_back(std::make_unique()); + writeCond_.emplace_back(std::make_unique()); + writeQueue_.emplace_back(); + + std::thread t([this, index]() { + while (writeRunning_) { + std::unique_lock lock(*writeMutex_[index]); + while (writeQueue_[index].empty()) { + if (!writeRunning_) { + break; + } + writeCond_[index]->wait(lock); + } + if (!writeRunning_) { + break; + } + auto client = writeQueue_[index].front(); + if (client->State() == ClientState::kOK) { + client->WriteReply2Client(); + } + writeQueue_[index].pop_front(); + } + INFO("worker write thread {}, goodbye...", index); + }); + + INFO("worker write thread {}, starting...", index); + writeThreads_.push_back(std::move(t)); + } +} + +void WorkIOThreadPool::Exit() { + IOThreadPool::Exit(); + + writeRunning_ = false; + int i = 0; + for (auto& cond : writeCond_) { + std::unique_lock lock(*writeMutex_[i++]); + cond->notify_all(); + } + for (auto& wt : writeThreads_) { + if (wt.joinable()) { + wt.join(); + } + } + writeThreads_.clear(); + writeCond_.clear(); + writeQueue_.clear(); + writeMutex_.clear(); +} + } // namespace pikiwidb diff --git a/src/io_thread_pool.h b/src/io_thread_pool.h index 1ef6538fd..6f461681e 100644 --- a/src/io_thread_pool.h +++ b/src/io_thread_pool.h @@ -8,10 +8,13 @@ #pragma once #include +#include #include #include #include +#include "client.h" +#include "cmd_thread_pool.h" #include "net/event_loop.h" #include "net/http_client.h" #include "net/http_server.h" @@ -27,7 +30,7 @@ class IOThreadPool { bool Init(const char* ip, int port, const NewTcpConnectionCallback& ccb); void Run(int argc, char* argv[]); - void Exit(); + virtual void Exit(); bool IsExit() const; EventLoop* BaseLoop(); @@ -54,11 +57,13 @@ class IOThreadPool { // HTTP client std::shared_ptr ConnectHTTP(const char* ip, int port, EventLoop* loop = nullptr); + virtual void PushWriteTask(std::shared_ptr){}; + // for unittest only void Reset(); - private: - void StartWorkers(); + protected: + virtual void StartWorkers(); static const size_t kMaxWorkers; @@ -82,4 +87,25 @@ class IOThreadPool { std::atomic state_{State::kNone}; }; +class WorkIOThreadPool : public IOThreadPool { + public: + WorkIOThreadPool() = default; + ~WorkIOThreadPool() = default; + + void Exit() override; + void PushWriteTask(std::shared_ptr client) override; + + private: + void StartWorkers() override; + + private: + std::vector writeThreads_; + std::vector> writeMutex_; + std::vector> writeCond_; + std::vector>> writeQueue_; + std::atomic counter_ = 0; + bool writeRunning_ = true; +}; + } // namespace pikiwidb + diff --git a/src/net/http_client.cc b/src/net/http_client.cc index 916669f41..db5c337f8 100644 --- a/src/net/http_client.cc +++ b/src/net/http_client.cc @@ -10,7 +10,7 @@ HttpClient::HttpClient() : parser_(HTTP_RESPONSE) {} void HttpClient::OnConnect(TcpConnection* conn) { assert(loop_ == conn->GetEventLoop()); - INFO("HttpClient::OnConnect to {}:{} in loop {}", conn->GetPeerIp(), conn->GetPeerPort(), loop_->GetName()); + INFO("HttpClient::OnConnect to {}:{} in loop {}", conn->GetPeerIP(), conn->GetPeerPort(), loop_->GetName()); never_connected_ = false; conn_ = std::static_pointer_cast(conn->shared_from_this()); diff --git a/src/net/tcp_connection.h b/src/net/tcp_connection.h index 7d94ae92a..2a6c7af3d 100644 --- a/src/net/tcp_connection.h +++ b/src/net/tcp_connection.h @@ -56,7 +56,7 @@ class TcpConnection : public EventObject { void ResetEventLoop(EventLoop* new_loop); EventLoop* SelectSlaveEventLoop(); EventLoop* GetEventLoop() const { return loop_; } - const std::string& GetPeerIp() const { return peer_ip_; } + const std::string& GetPeerIP() const { return peer_ip_; } int GetPeerPort() const { return peer_port_; } const sockaddr_in& PeerAddr() const { return peer_addr_; } diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index e0c9f5ba7..42d4473cb 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -63,14 +63,16 @@ bool PikiwiDB::ParseArgs(int ac, char* av[]) { cfg_file_ = av[i]; continue; } else if (strncasecmp(av[i], "-v", 2) == 0 || strncasecmp(av[i], "--version", 9) == 0) { - std::cerr << "PikiwiDB Server v=" << kPIKIWIDB_VERSION << " bits=" << (sizeof(void*) == 8 ? 64 : 32) << std::endl; + std::cerr << "PikiwiDB Server version: " << KPIKIWIDB_VERSION << " bits=" << (sizeof(void*) == 8 ? 64 : 32) + << std::endl; + std::cerr << "PikiwiDB Server Build Type: " << KPIKIWIDB_BUILD_TYPE << std::endl; + std::cerr << "PikiwiDB Server Build Date: " << KPIKIWIDB_BUILD_DATE << std::endl; + std::cerr << "PikiwiDB Server Build GIT SHA: " << KPIKIWIDB_GIT_COMMIT_ID << std::endl; exit(0); - return true; } else if (strncasecmp(av[i], "-h", 2) == 0 || strncasecmp(av[i], "--help", 6) == 0) { Usage(); exit(0); - return true; } else if (strncasecmp(av[i], "--port", 6) == 0) { if (++i == ac) { return false; @@ -164,7 +166,7 @@ static void LoadDBFromDisk() { } void PikiwiDB::OnNewConnection(pikiwidb::TcpConnection* obj) { - INFO("New connection from {}:{}", obj->GetPeerIp(), obj->GetPeerPort()); + INFO("New connection from {}:{}", obj->GetPeerIP(), obj->GetPeerPort()); auto client = std::make_shared(obj); obj->SetContext(client); @@ -174,7 +176,10 @@ void PikiwiDB::OnNewConnection(pikiwidb::TcpConnection* obj) { auto msg_cb = std::bind(&pikiwidb::PClient::HandlePackets, client.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); obj->SetMessageCallback(msg_cb); - obj->SetOnDisconnect([](pikiwidb::TcpConnection* obj) { INFO("disconnect from {}", obj->GetPeerIp()); }); + obj->SetOnDisconnect([](pikiwidb::TcpConnection* obj) { + INFO("disconnect from {}", obj->GetPeerIP()); + obj->GetContext()->SetState(pikiwidb::ClientState::kClosed); + }); obj->SetNodelay(true); obj->SetEventLoopSelector([this]() { return worker_threads_.ChooseNextWorkerEventLoop(); }); obj->SetSlaveEventLoopSelector([this]() { return slave_threads_.ChooseNextWorkerEventLoop(); }); @@ -214,6 +219,13 @@ bool PikiwiDB::Init() { worker_threads_.SetWorkerNum(static_cast(g_config.worker_threads_num)); slave_threads_.SetWorkerNum(static_cast(g_config.slave_threads_num)); + // now we only use fast cmd thread pool + auto status = cmd_threads_.Init(g_config.fast_cmd_threads_num, 0, "pikiwidb-cmd"); + if (!status.ok()) { + ERROR("init cmd thread pool failed: {}", status.ToString()); + return false; + } + PSTORE.Init(g_config.databases); // Only if there is no backend, load rdb @@ -234,7 +246,7 @@ bool PikiwiDB::Init() { PREPL.SetMasterAddr(g_config.masterIp.c_str(), g_config.masterPort); } - cmd_table_manager_.InitCmdTable(); + // cmd_table_manager_.InitCmdTable(); return true; } @@ -243,6 +255,8 @@ void PikiwiDB::Run() { worker_threads_.SetName("pikiwi-main"); slave_threads_.SetName("pikiwi-slave"); + cmd_threads_.Start(); + std::thread t([this]() { auto slave_loop = slave_threads_.BaseLoop(); slave_loop->Init(); @@ -251,16 +265,19 @@ void PikiwiDB::Run() { worker_threads_.Run(0, nullptr); - t.join(); // wait for slave thread exit + if (t.joinable()) { + t.join(); // wait for slave thread exit + } INFO("server exit running"); } void PikiwiDB::Stop() { slave_threads_.Exit(); worker_threads_.Exit(); + cmd_threads_.Stop(); } -pikiwidb::CmdTableManager& PikiwiDB::GetCmdTableManager() { return cmd_table_manager_; } +// pikiwidb::CmdTableManager& PikiwiDB::GetCmdTableManager() { return cmd_table_manager_; } static void InitLogs() { logger::Init("logs/pikiwidb_server.log"); @@ -308,7 +325,7 @@ int main(int ac, char* av[]) { // output logo to console char logo[512] = ""; - snprintf(logo, sizeof logo - 1, pikiwidbLogo, kPIKIWIDB_VERSION, static_cast(sizeof(void*)) * 8, + snprintf(logo, sizeof logo - 1, pikiwidbLogo, KPIKIWIDB_VERSION, static_cast(sizeof(void*)) * 8, static_cast(pikiwidb::g_config.port)); std::cout << logo; diff --git a/src/pikiwidb.h b/src/pikiwidb.h index 4dfdb685b..5201fb875 100644 --- a/src/pikiwidb.h +++ b/src/pikiwidb.h @@ -6,12 +6,19 @@ */ #include "cmd_table_manager.h" +#include "cmd_thread_pool.h" #include "common.h" #include "event_loop.h" #include "io_thread_pool.h" #include "tcp_connection.h" -#define kPIKIWIDB_VERSION "4.0.0" +#define KPIKIWIDB_VERSION "4.0.0" + +#ifdef BUILD_DEBUG +# define KPIKIWIDB_BUILD_TYPE "DEBUG" +#else +# define KPIKIWIDB_BUILD_TYPE "RELEASE" +#endif class PikiwiDB final { public: @@ -28,7 +35,12 @@ class PikiwiDB final { void OnNewConnection(pikiwidb::TcpConnection* obj); - pikiwidb::CmdTableManager& GetCmdTableManager(); + // pikiwidb::CmdTableManager& GetCmdTableManager(); + uint32_t GetCmdID() { return ++cmd_id_; }; + + void SubmitFast(const std::shared_ptr& runner) { cmd_threads_.SubmitFast(runner); } + + void PushWriteTask(const std::shared_ptr& client) { worker_threads_.PushWriteTask(client); } public: PString cfg_file_; @@ -41,9 +53,12 @@ class PikiwiDB final { static const uint32_t kRunidSize; private: - pikiwidb::IOThreadPool worker_threads_; + pikiwidb::WorkIOThreadPool worker_threads_; pikiwidb::IOThreadPool slave_threads_; - pikiwidb::CmdTableManager cmd_table_manager_; + pikiwidb::CmdThreadPool cmd_threads_; + // pikiwidb::CmdTableManager cmd_table_manager_; + + uint32_t cmd_id_ = 0; }; extern std::unique_ptr g_pikiwidb;