Skip to content

Commit

Permalink
format code,variable name
Browse files Browse the repository at this point in the history
  • Loading branch information
lqxhub committed Mar 31, 2024
1 parent 89424f6 commit 4dfbde7
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 65 deletions.
2 changes: 1 addition & 1 deletion src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ BaseCmd::BaseCmd(std::string name, int16_t arity, uint32_t flag, uint32_t aclCat
arity_ = arity;
flag_ = flag;
aclCategory_ = aclCategory;
cmdID_ = g_pikiwidb->GetCmdId();
cmdID_ = g_pikiwidb->GetCmdID();
}

bool BaseCmd::CheckArg(size_t num) const {
Expand Down
42 changes: 21 additions & 21 deletions src/cmd_thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ std::shared_ptr<PClient> CmdThreadPoolTask::Client() { return client_; }

CmdThreadPool::CmdThreadPool(std::string name) : name_(std::move(name)) {}

pstd::Status CmdThreadPool::Init(int fastThread, int slowThread, std::string name) {
if (fastThread <= 0) {
pstd::Status CmdThreadPool::Init(int fast_thread, int slow_thread, std::string name) {
if (fast_thread <= 0) {
return pstd::Status::InvalidArgument("thread num must be positive");
}
name_ = std::move(name);
fastThreadNum_ = fastThread;
slowThreadNum_ = slowThread;
threads_.reserve(fastThreadNum_ + slowThreadNum_);
workers_.reserve(fastThreadNum_ + slowThreadNum_);
fast_thread_num_ = fast_thread;
slow_thread_num_ = slow_thread;
threads_.reserve(fast_thread_num_ + slow_thread_num_);
workers_.reserve(fast_thread_num_ + slow_thread_num_);
return pstd::Status::OK();
}

void CmdThreadPool::Start() {
for (int i = 0; i < fastThreadNum_; ++i) {
for (int i = 0; i < fast_thread_num_; ++i) {
auto fastWorker = std::make_shared<CmdFastWorker>(this, 2, "fast worker" + std::to_string(i));
std::thread thread(&CmdWorkThreadPoolWorker::Work, fastWorker);
threads_.emplace_back(std::move(thread));
workers_.emplace_back(fastWorker);
INFO("fast worker [{}] starting ...", i);
}
for (int i = 0; i < slowThreadNum_; ++i) {
for (int i = 0; i < slow_thread_num_; ++i) {
auto slowWorker = std::make_shared<CmdSlowWorker>(this, 2, "slow worker" + std::to_string(i));
std::thread thread(&CmdWorkThreadPoolWorker::Work, slowWorker);
threads_.emplace_back(std::move(thread));
Expand All @@ -47,15 +47,15 @@ void CmdThreadPool::Start() {
}

void CmdThreadPool::SubmitFast(const std::shared_ptr<CmdThreadPoolTask> &runner) {
std::unique_lock rl(fastMutex_);
fastTasks_.emplace_back(runner);
fastCondition_.notify_one();
std::unique_lock rl(fast_mutex_);
fast_tasks_.emplace_back(runner);
fast_condition_.notify_one();
}

void CmdThreadPool::SubmitSlow(const std::shared_ptr<CmdThreadPoolTask> &runner) {
std::unique_lock rl(slowMutex_);
slowTasks_.emplace_back(runner);
slowCondition_.notify_one();
std::unique_lock rl(slow_mutex_);
slow_tasks_.emplace_back(runner);
slow_condition_.notify_one();
}

void CmdThreadPool::Stop() { DoStop(); }
Expand All @@ -71,12 +71,12 @@ void CmdThreadPool::DoStop() {
}

{
std::unique_lock fl(fastMutex_);
fastCondition_.notify_all();
std::unique_lock fl(fast_mutex_);
fast_condition_.notify_all();
}
{
std::unique_lock sl(slowMutex_);
slowCondition_.notify_all();
std::unique_lock sl(slow_mutex_);
slow_condition_.notify_all();
}

for (auto &thread : threads_) {
Expand All @@ -86,10 +86,10 @@ void CmdThreadPool::DoStop() {
}
threads_.clear();
workers_.clear();
fastTasks_.clear();
slowTasks_.clear();
fast_tasks_.clear();
slow_tasks_.clear();
}

CmdThreadPool::~CmdThreadPool() { DoStop(); }

} // namespace pikiwidb
} // namespace pikiwidb
27 changes: 14 additions & 13 deletions src/cmd_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class CmdThreadPool {

explicit CmdThreadPool(std::string name);

pstd::Status Init(int fastThread, int slowThread, std::string name);
pstd::Status Init(int fast_thread, int slow_thread, std::string name);

// start the thread pool
void Start();
Expand All @@ -63,32 +63,33 @@ class CmdThreadPool {
void SubmitSlow(const std::shared_ptr<CmdThreadPoolTask> &runner);

// get the fast thread num
inline int FastThreadNum() const { return fastThreadNum_; };
inline int FastThreadNum() const { return fast_thread_num_; };

// get the slow thread num
inline int SlowThreadNum() const { return slowThreadNum_; };
inline int SlowThreadNum() const { return slow_thread_num_; };

// get the thread pool size
inline int ThreadPollSize() const { return fastThreadNum_ + slowThreadNum_; };
inline int ThreadPollSize() const { return fast_thread_num_ + slow_thread_num_; };

~CmdThreadPool();

private:
void DoStop();

std::deque<std::shared_ptr<CmdThreadPoolTask>> fastTasks_; // fast task queue
std::deque<std::shared_ptr<CmdThreadPoolTask>> slowTasks_; // slow task queue
private:
std::deque<std::shared_ptr<CmdThreadPoolTask>> fast_tasks_; // fast task queue
std::deque<std::shared_ptr<CmdThreadPoolTask>> slow_tasks_; // slow task queue

std::vector<std::thread> threads_;
std::vector<std::shared_ptr<CmdWorkThreadPoolWorker>> workers_;
std::string name_; // thread pool name
int fastThreadNum_ = 0;
int slowThreadNum_ = 0;
std::mutex fastMutex_;
std::condition_variable fastCondition_;
std::mutex slowMutex_;
std::condition_variable slowCondition_;
int fast_thread_num_ = 0;
int slow_thread_num_ = 0;
std::mutex fast_mutex_;
std::condition_variable fast_condition_;
std::mutex slow_mutex_;
std::condition_variable slow_condition_;
std::atomic_bool stopped_ = false;
};

} // namespace pikiwidb
} // namespace pikiwidb
44 changes: 22 additions & 22 deletions src/cmd_thread_pool_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace pikiwidb {
void CmdWorkThreadPoolWorker::Work() {
while (running_) {
LoadWork();
for (const auto &task : selfTask_) {
for (const auto &task : self_task_) {
if (task->Client()->State() != ClientState::kOK) { // the client is closed
continue;
}
Expand All @@ -38,59 +38,59 @@ void CmdWorkThreadPoolWorker::Work() {
task->Run(cmdPtr);
g_pikiwidb->PushWriteTask(task->Client());
}
selfTask_.clear();
self_task_.clear();
}
INFO("worker [{}] goodbye...", name_);
}

void CmdWorkThreadPoolWorker::Stop() { running_ = false; }

void CmdFastWorker::LoadWork() {
std::unique_lock lock(pool_->fastMutex_);
while (pool_->fastTasks_.empty()) {
std::unique_lock lock(pool_->fast_mutex_);
while (pool_->fast_tasks_.empty()) {
if (!running_) {
return;
}
pool_->fastCondition_.wait(lock);
pool_->fast_condition_.wait(lock);
}

if (pool_->fastTasks_.empty()) {
if (pool_->fast_tasks_.empty()) {
return;
}
const auto num = std::min(static_cast<int>(pool_->fastTasks_.size()), onceTask_);
std::move(pool_->fastTasks_.begin(), pool_->fastTasks_.begin() + num, std::back_inserter(selfTask_));
pool_->fastTasks_.erase(pool_->fastTasks_.begin(), pool_->fastTasks_.begin() + num);
const auto num = std::min(static_cast<int>(pool_->fast_tasks_.size()), once_task_);
std::move(pool_->fast_tasks_.begin(), pool_->fast_tasks_.begin() + num, std::back_inserter(self_task_));
pool_->fast_tasks_.erase(pool_->fast_tasks_.begin(), pool_->fast_tasks_.begin() + num);
}

void CmdSlowWorker::LoadWork() {
{
std::unique_lock lock(pool_->slowMutex_);
while (pool_->slowTasks_.empty() && loopMore_) { // loopMore is used to get the fast worker
std::unique_lock lock(pool_->slow_mutex_);
while (pool_->slow_tasks_.empty() && loop_more_) { // loopMore is used to get the fast worker
if (!running_) {
return;
}
pool_->slowCondition_.wait_for(lock, std::chrono::milliseconds(waitTime_));
loopMore_ = false;
pool_->slow_condition_.wait_for(lock, std::chrono::milliseconds(wait_time_));
loop_more_ = false;
}

const auto num = std::min(static_cast<int>(pool_->slowTasks_.size()), onceTask_);
const auto num = std::min(static_cast<int>(pool_->slow_tasks_.size()), once_task_);
if (num > 0) {
std::move(pool_->slowTasks_.begin(), pool_->slowTasks_.begin() + num, std::back_inserter(selfTask_));
pool_->slowTasks_.erase(pool_->slowTasks_.begin(), pool_->slowTasks_.begin() + num);
std::move(pool_->slow_tasks_.begin(), pool_->slow_tasks_.begin() + num, std::back_inserter(self_task_));
pool_->slow_tasks_.erase(pool_->slow_tasks_.begin(), pool_->slow_tasks_.begin() + num);
return; // If the slow task is obtained, the fast task is no longer obtained
}
}

{
std::unique_lock lock(pool_->fastMutex_);
loopMore_ = true;
std::unique_lock lock(pool_->fast_mutex_);
loop_more_ = true;

const auto num = std::min(static_cast<int>(pool_->fastTasks_.size()), onceTask_);
const auto num = std::min(static_cast<int>(pool_->fast_tasks_.size()), once_task_);
if (num > 0) {
std::move(pool_->fastTasks_.begin(), pool_->fastTasks_.begin() + num, std::back_inserter(selfTask_));
pool_->fastTasks_.erase(pool_->fastTasks_.begin(), pool_->fastTasks_.begin() + num);
std::move(pool_->fast_tasks_.begin(), pool_->fast_tasks_.begin() + num, std::back_inserter(self_task_));
pool_->fast_tasks_.erase(pool_->fast_tasks_.begin(), pool_->fast_tasks_.begin() + num);
}
}
}

} // namespace pikiwidb
} // namespace pikiwidb
14 changes: 7 additions & 7 deletions src/cmd_thread_pool_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace pikiwidb {
class CmdWorkThreadPoolWorker {
public:
explicit CmdWorkThreadPoolWorker(CmdThreadPool *pool, int onceTask, std::string name)
: pool_(pool), onceTask_(onceTask), name_(std::move(name)) {
: pool_(pool), once_task_(onceTask), name_(std::move(name)) {
cmd_table_manager_.InitCmdTable();
}

Expand All @@ -32,9 +32,9 @@ class CmdWorkThreadPoolWorker {
virtual ~CmdWorkThreadPoolWorker() = default;

protected:
std::vector<std::shared_ptr<CmdThreadPoolTask>> selfTask_; // the task that the worker get from the thread pool
CmdThreadPool *pool_;
const int onceTask_ = 0; // the max task num that the worker can get from the thread pool
std::vector<std::shared_ptr<CmdThreadPoolTask>> self_task_; // the task that the worker get from the thread pool
CmdThreadPool *pool_ = nullptr;
const int once_task_ = 0; // the max task num that the worker can get from the thread pool
const std::string name_;
bool running_ = true;

Expand All @@ -60,8 +60,8 @@ class CmdSlowWorker : public CmdWorkThreadPoolWorker {
void LoadWork() override;

private:
bool loopMore_ = false; // When the slow queue is empty, try to get the fast queue
int waitTime_ = 200; // When the slow queue is empty, wait 200 ms to check again
bool loop_more_ = false; // When the slow queue is empty, try to get the fast queue
int wait_time_ = 200; // When the slow queue is empty, wait 200 ms to check again
};

} // namespace pikiwidb
} // namespace pikiwidb
2 changes: 1 addition & 1 deletion src/pikiwidb.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class PikiwiDB final {
void OnNewConnection(pikiwidb::TcpConnection* obj);

// pikiwidb::CmdTableManager& GetCmdTableManager();
uint32_t GetCmdId() { return ++cmdId_; };
uint32_t GetCmdID() { return ++cmdId_; };

void SubmitFast(const std::shared_ptr<pikiwidb::CmdThreadPoolTask>& runner) { cmd_threads_.SubmitFast(runner); }

Expand Down

0 comments on commit 4dfbde7

Please sign in to comment.