Skip to content

Commit

Permalink
Merge pull request #1039 from libcpr/threadpool_fix_high_cpu_load
Browse files Browse the repository at this point in the history
Thread Pool Fix High CPU Load When Paused
  • Loading branch information
COM8 authored Apr 13, 2024
2 parents 11e5954 + 5e48a27 commit 481c047
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 13 deletions.
4 changes: 3 additions & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ Checks: '*,
-modernize-return-braced-init-list,
-cppcoreguidelines-avoid-magic-numbers,
-readability-magic-numbers,
-cppcoreguidelines-avoid-do-while
-cppcoreguidelines-avoid-do-while,
-llvmlibc-inline-function-decl,
-altera-struct-pack-align
'
WarningsAsErrors: '*'
HeaderFilterRegex: 'src/*.hpp'
Expand Down
15 changes: 12 additions & 3 deletions cpr/threadpool.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "cpr/threadpool.h"
#include <chrono>
#include <cstddef>
#include <ctime>
#include <memory>
#include <mutex>
Expand All @@ -8,7 +9,7 @@

namespace cpr {

ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms), status(STOP), cur_thread_num(0), idle_thread_num(0) {}
ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms) {}

ThreadPool::~ThreadPool() {
Stop();
Expand All @@ -32,16 +33,21 @@ int ThreadPool::Start(size_t start_threads) {
}

int ThreadPool::Stop() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == STOP) {
return -1;
}

status = STOP;
status_wait_cond.notify_all();
task_cond.notify_all();

for (auto& i : threads) {
if (i.thread->joinable()) {
i.thread->join();
}
}

threads.clear();
cur_thread_num = 0;
idle_thread_num = 0;
Expand All @@ -56,8 +62,10 @@ int ThreadPool::Pause() {
}

int ThreadPool::Resume() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == PAUSE) {
status = RUNNING;
status_wait_cond.notify_all();
}
return 0;
}
Expand All @@ -79,8 +87,9 @@ bool ThreadPool::CreateThread() {
std::thread* thread = new std::thread([this] {
bool initialRun = true;
while (status != STOP) {
while (status == PAUSE) {
std::this_thread::yield();
{
std::unique_lock status_lock(status_wait_mutex);
status_wait_cond.wait(status_lock, [this]() { return status != Status::PAUSE; });
}

Task task;
Expand Down
34 changes: 25 additions & 9 deletions include/cpr/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#define CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM std::thread::hardware_concurrency()

constexpr size_t CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM = 1;
constexpr std::chrono::milliseconds CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME{60000};
constexpr std::chrono::milliseconds CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME{250};

namespace cpr {

Expand All @@ -25,27 +25,38 @@ class ThreadPool {
using Task = std::function<void()>;

explicit ThreadPool(size_t min_threads = CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM, size_t max_threads = CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM, std::chrono::milliseconds max_idle_ms = CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME);
ThreadPool(const ThreadPool& other) = delete;
ThreadPool(ThreadPool&& old) = delete;

virtual ~ThreadPool();

ThreadPool& operator=(const ThreadPool& other) = delete;
ThreadPool& operator=(ThreadPool&& old) = delete;

void SetMinThreadNum(size_t min_threads) {
min_thread_num = min_threads;
}

void SetMaxThreadNum(size_t max_threads) {
max_thread_num = max_threads;
}

void SetMaxIdleTime(std::chrono::milliseconds ms) {
max_idle_time = ms;
}

size_t GetCurrentThreadNum() {
return cur_thread_num;
}

size_t GetIdleThreadNum() {
return idle_thread_num;
}

bool IsStarted() {
return status != STOP;
}

bool IsStopped() {
return status == STOP;
}
Expand Down Expand Up @@ -107,14 +118,19 @@ class ThreadPool {
time_t stop_time;
};

std::atomic<Status> status;
std::atomic<size_t> cur_thread_num;
std::atomic<size_t> idle_thread_num;
std::list<ThreadData> threads;
std::mutex thread_mutex;
std::queue<Task> tasks;
std::mutex task_mutex;
std::condition_variable task_cond;
std::atomic<Status> status{Status::STOP};
std::condition_variable status_wait_cond{};
std::mutex status_wait_mutex{};

std::atomic<size_t> cur_thread_num{0};
std::atomic<size_t> idle_thread_num{0};

std::list<ThreadData> threads{};
std::mutex thread_mutex{};

std::queue<Task> tasks{};
std::mutex task_mutex{};
std::condition_variable task_cond{};
};

} // namespace cpr
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ add_cpr_test(interceptor_multi)
add_cpr_test(multiperform)
add_cpr_test(resolve)
add_cpr_test(multiasync)
add_cpr_test(threadpool)

if (ENABLE_SSL_TESTS)
add_cpr_test(ssl)
Expand Down
106 changes: 106 additions & 0 deletions test/threadpool_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include <atomic>
#include <cstddef>
#include <gtest/gtest.h>


#include "cpr/threadpool.h"

TEST(ThreadPoolTests, DISABLED_BasicWorkOneThread) {
std::atomic_uint32_t invCount{0};
uint32_t invCountExpected{100};

{
cpr::ThreadPool tp;
tp.SetMinThreadNum(1);
tp.SetMaxThreadNum(1);
tp.Start(0);

for (size_t i = 0; i < invCountExpected; ++i) {
tp.Submit([&invCount]() -> void { invCount++; });
}

// Wait for the thread pool to finish its work
tp.Wait();
}

EXPECT_EQ(invCount, invCountExpected);
}

TEST(ThreadPoolTests, DISABLED_BasicWorkMultipleThreads) {
std::atomic_uint32_t invCount{0};
uint32_t invCountExpected{100};

{
cpr::ThreadPool tp;
tp.SetMinThreadNum(1);
tp.SetMaxThreadNum(10);
tp.Start(0);

for (size_t i = 0; i < invCountExpected; ++i) {
tp.Submit([&invCount]() -> void { invCount++; });
}

// Wait for the thread pool to finish its work
tp.Wait();
}

EXPECT_EQ(invCount, invCountExpected);
}

TEST(ThreadPoolTests, DISABLED_PauseResumeSingleThread) {
std::atomic_uint32_t invCount{0};

uint32_t repCount{100};
uint32_t invBunchSize{20};

cpr::ThreadPool tp;
tp.SetMinThreadNum(1);
tp.SetMaxThreadNum(10);
tp.Start(0);

for (size_t i = 0; i < repCount; ++i) {
tp.Pause();
EXPECT_EQ(invCount, i * invBunchSize);

for (size_t e = 0; e < invBunchSize; ++e) {
tp.Submit([&invCount]() -> void { invCount++; });
}
tp.Resume();
// Wait for the thread pool to finish its work
tp.Wait();

EXPECT_EQ(invCount, (i + 1) * invBunchSize);
}
}

TEST(ThreadPoolTests, DISABLED_PauseResumeMultipleThreads) {
std::atomic_uint32_t invCount{0};

uint32_t repCount{100};
uint32_t invBunchSize{20};

cpr::ThreadPool tp;
tp.SetMinThreadNum(1);
tp.SetMaxThreadNum(10);
tp.Start(0);

for (size_t i = 0; i < repCount; ++i) {
tp.Pause();
EXPECT_EQ(invCount, i * invBunchSize);

for (size_t e = 0; e < invBunchSize; ++e) {
tp.Submit([&invCount]() -> void { invCount++; });
}
tp.Resume();
// Wait for the thread pool to finish its work
tp.Wait();

EXPECT_EQ(invCount, (i + 1) * invBunchSize);
}
}


int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit 481c047

Please sign in to comment.