Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:Support network read/write separation and command thread pool #164

Merged
merged 11 commits into from
Apr 2, 2024

Conversation

lqxhub
Copy link
Collaborator

@lqxhub lqxhub commented Feb 3, 2024

fix issue #100

加了网络的读写分离,和命令在线程池中执行

这个pr只有 线程池和读写分离功能

后续 还有两个功能, 待开发

  • 对象复用池 类似 go中的 sync.pool
  • 移除 libevent 依赖, 使用 最早的 网络库

cmd_thread_pool 是命令的线程池, 目前支持了快慢命令分离, 但是因现在命令中没有加对应的flag,就暂时全都按照快命令执行了

这是目前我想到的问题

  • 写线程
    在写命令线程中, 现在写命令的线程数量和读线程是一致的, 1:1的关系. 但是没有做到读写线程的绑定,
    比如: 一个命令A 是 get a, 这个命令的读线程是线程 R1, 这个线程是 client在连接网络时就确定了. 当这个命令在线程池中执行完成后,会发到写线程中, 这时候, 会轮训 所有写线程, 找到一个写线程, 然后执行写入网络的操作. 可能这次放到 w2 中, 下次会放到 W3 中, 这段代码体现在 WorkIOThreadPool::PushWriteTask 这个函数中

  • 命令线程池

现在的命令线程池中, 所有线程公用了一把锁, 可能会比较重, 如果这部分是瓶颈, 可以和 IO的写线程中那样, 每个写线程 一个独立的队列,然后通过轮训或者别的策略, 向线程池中每个线程的队列加任务,

现在的命令线程池中, 如果慢命令队列是空闲的, 会尝试从 读线程池中偷取 快命令来执行

这段逻辑在

void CmdSlowWorker::LoadWork() {
  {
    std::unique_lock lock(pool_->slowMutex_);
    while (pool_->slowTasks_.empty() && loopMore) {  // loopMore is used to get the fast worker, 定时唤醒去快命令队列中找任务
      pool_->slowCondition_.wait_for(lock, std::chrono::milliseconds(waitTime));
      loopMore = false;
    }

    const auto num = std::min(static_cast<int>(pool_->slowTasks_.size()), onceTask_);
    std::move(pool_->slowTasks_.begin(), pool_->slowTasks_.begin() + num, std::back_inserter(selfTask));
    pool_->slowTasks_.erase(pool_->slowTasks_.begin(), pool_->slowTasks_.begin() + num);
  }

  • 命令对象

现在还没做对象缓存池, 现在用的做法是 在命令线程池中, 每个线程单独维护一个 cmdTable, 来做命令的隔离

后面设想的是,在读线程中, 解析出 本次命令的参数, 根据第一个参数区分这个命令是 / 命令, 并把所有参数 包装成一个 task 放到对应的线程池中. 在对应的线程池中, 获取这个执行这个命令的对象, 然后执行对应的命令

这样做的考虑是, 在 读线程中就确定命令,并且获取命令的对象, 这样就能做到在向 命令线程池中添加任务时就知道, 当前命令时快命令还是慢命令, 这样就能比较好的做到快慢分离了

每个线程单独维护一个 cmdTable 这个逻辑在

class CmdWorkThreadPoolWorker {
 public:
  explicit CmdWorkThreadPoolWorker(CmdThreadPool *pool, int onceTask, std::string name)
      : pool_(pool), onceTask_(onceTask), name_(std::move(name)) {
    cmd_table_manager_.InitCmdTable();
  }

@github-actions github-actions bot added the ✏️ Feature New feature or request label Feb 3, 2024
@lqxhub lqxhub marked this pull request as ready for review February 26, 2024 01:41
CmdThreadPool::CmdThreadPool(std::string name) : name_(std::move(name)) {}

pstd::Status CmdThreadPool::Init(int fastThread, int slowThread, std::string name) {
if (fastThread < 0 || slowThread < 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为0不可以吧?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

当时想的简单处理,判断了两个, 如果要放松检查的话, 只有 fastThread > 0就行了, slowThread 可以为0

src/base_cmd.cc Outdated Show resolved Hide resolved
CmdThreadPool::CmdThreadPool(std::string name) : name_(std::move(name)) {}

pstd::Status CmdThreadPool::Init(int fastThread, int slowThread, std::string name) {
if (fastThread < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 name 不用检测吗?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个我看了一下, 不检查问题也不大

src/cmd_thread_pool.cc Outdated Show resolved Hide resolved
src/cmd_thread_pool.cc Outdated Show resolved Hide resolved
src/cmd_thread_pool.h Outdated Show resolved Hide resolved
src/io_thread_pool.cc Outdated Show resolved Hide resolved
src/pikiwidb.cc Outdated Show resolved Hide resolved
src/cmd_thread_pool.cc Outdated Show resolved Hide resolved
CmdThreadPool::CmdThreadPool(std::string name) : name_(std::move(name)) {}

pstd::Status CmdThreadPool::Init(int fastThread, int slowThread, std::string name) {
if (fastThread < 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe :fastThread <= 0 是个更好的判断

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fastThread不符合项目中下划线的命名法

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe :fastThread <= 0 是个更好的判断

是这样

fastThread不符合项目中下划线的命名法

fastThread_? 但是这个不是成员变量

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

又看了一下,在目前pikiwidb里面,部分代码是驼峰命名,部分又是下划线命名,感觉有点混乱。
驼峰命名:
BaseCmd::BaseCmd(std::string name, int16_t arity, uint32_t flag, uint32_t aclCategory) {
下划线命名:
} else if (pstd::String2d(end_score.data(), end_score.size(), max_score) == 0) {

Copy link
Collaborator

@578223592 578223592 Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fastThread?fast_thread?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

src/cmd_thread_pool.cc Outdated Show resolved Hide resolved
src/cmd_thread_pool_worker.cc Outdated Show resolved Hide resolved
namespace pikiwidb {

void CmdWorkThreadPoolWorker::Work() {
while (running_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有没有可能出现这么一种情况,在worker拿到任务之后,调用了

void CmdWorkThreadPoolWorker::Stop() { running_ = false; }

,然后拿到的任务就无法再执行了。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

也许do---while更好?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

running_是不是改成stop_好一些?通用一点

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有没有可能出现这么一种情况,在worker拿到任务之后,调用了

void CmdWorkThreadPoolWorker::Stop() { running_ = false; }

,然后拿到的任务就无法再执行了。

能stop的情况一般是停止服务了,所以我感觉这里任务有没有处理已经没什么区别了

src/cmd_thread_pool_worker.cc Outdated Show resolved Hide resolved
}

const auto num = std::min(static_cast<int>(pool_->fastTasks_.size()), onceTask_);
if (num > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉判断有点多余,到这里的是不是一定能保证pool_->fastTasks_不为空,只要保证onceTask_>0,那么就不用判断num了

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看了后面发现统一都是这个写法。
后面需要对num进行判断。
因此为了统一这里加一个判断感觉也不错。

class CmdWorkThreadPoolWorker {
public:
explicit CmdWorkThreadPoolWorker(CmdThreadPool *pool, int onceTask, std::string name)
: pool_(pool), onceTask_(onceTask), name_(std::move(name)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

也许应该对onceTask_(onceTask)进行入参判断,要>0且在一定合理的范围内

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个感觉意义不大,这个参数不会动态获取(比如配置文件或者用户输入), 只会在代码里写死

src/cmd_thread_pool_worker.h Outdated Show resolved Hide resolved
const auto num = std::min(static_cast<int>(pool_->slowTasks_.size()), onceTask_);
if (num > 0) {
std::move(pool_->slowTasks_.begin(), pool_->slowTasks_.begin() + num, std::back_inserter(selfTask_));
pool_->slowTasks_.erase(pool_->slowTasks_.begin(), pool_->slowTasks_.begin() + num);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不同于快任务,可能会出现到这里但是 慢任务队列依然为空的情况,此时if (num > 0) {的判断是有意义的。

但是个人觉得也许在line 74 的位置添加一个判空返回更简单直接一些?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fastCmd 那里提前判断,return了, slowCmd感觉还是用num判断清晰一些

void CmdSlowWorker::LoadWork() {
{
std::unique_lock lock(pool_->slowMutex_);
while (pool_->slowTasks_.empty() && loopMore_) { // loopMore is used to get the fast worker
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.loopMore的注释是不是应该添加到loopMore_的声明处好一些。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

src/base_cmd.cc Outdated
@@ -16,7 +16,7 @@ BaseCmd::BaseCmd(std::string name, int16_t arity, uint32_t flag, uint32_t aclCat
arity_ = arity;
flag_ = flag;
aclCategory_ = aclCategory;
cmdId_ = g_pikiwidb->GetCmdTableManager().GetCmdId();
cmdID_ = g_pikiwidb->GetCmdId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetCmdId 这里不改下吗?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


CmdThreadPool::~CmdThreadPool() { DoStop(); }

} // namespace pikiwidb
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在文件最后加一个空行

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private:
void DoStop();

std::deque<std::shared_ptr<CmdThreadPoolTask>> fastTasks_; // fast task queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改成 fast_tasks_ 这种形式吧,下面几个 var 同样处理下,辛苦

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

} // namespace pikiwidb
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

添加一个空行

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

virtual ~CmdWorkThreadPoolWorker() = default;

protected:
std::vector<std::shared_ptr<CmdThreadPoolTask>> selfTask_; // the task that the worker get from the thread pool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

变量名风格处理下

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


protected:
std::vector<std::shared_ptr<CmdThreadPoolTask>> selfTask_; // the task that the worker get from the thread pool
CmdThreadPool *pool_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

初始化为 nullptr

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

src/base_cmd.h Outdated
@@ -275,7 +275,7 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {
// std::weak_ptr<std::string> resp_;
// uint64_t doDuration_ = 0;

uint32_t cmdId_ = 0;
uint32_t cmdID_ = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint32_t cmd_id_ = 0;
uint32_t acl_category_ = 0;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

src/pikiwidb.h Outdated
pikiwidb::CmdThreadPool cmd_threads_;
// pikiwidb::CmdTableManager cmd_table_manager_;

uint32_t cmdId_ = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改为 cmd_id_

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

@578223592 578223592 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.


CmdThreadPool::~CmdThreadPool() { DoStop(); }

} // namespace pikiwidb
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

最后加一个空行

}
}

} // namespace pikiwidb
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加一下空行吧

std::atomic_bool stopped_ = false;
};

} // namespace pikiwidb
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加一下空行

int wait_time_ = 200; // When the slow queue is empty, wait 200 ms to check again
};

} // namespace pikiwidb
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加一下空行

@AlexStocks AlexStocks merged commit 2ec79ed into OpenAtomFoundation:unstable Apr 2, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
✏️ Feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants