diff --git a/.clang-tidy b/.clang-tidy index 7d9297732..8bfa5da0f 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -34,7 +34,9 @@ Checks: '*, -modernize-return-braced-init-list, -cppcoreguidelines-avoid-magic-numbers, -readability-magic-numbers, --cppcoreguidelines-avoid-do-while +-cppcoreguidelines-avoid-do-while, +-llvmlibc-inline-function-decl, +-altera-struct-pack-align ' WarningsAsErrors: '*' HeaderFilterRegex: 'src/*.hpp' diff --git a/cpr/threadpool.cpp b/cpr/threadpool.cpp index bd03d2ab7..0edfa1e4d 100644 --- a/cpr/threadpool.cpp +++ b/cpr/threadpool.cpp @@ -1,5 +1,6 @@ #include "cpr/threadpool.h" #include +#include #include #include #include @@ -8,7 +9,7 @@ namespace cpr { -ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms), status(STOP), cur_thread_num(0), idle_thread_num(0) {} +ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms) {} ThreadPool::~ThreadPool() { Stop(); @@ -32,16 +33,21 @@ int ThreadPool::Start(size_t start_threads) { } int ThreadPool::Stop() { + const std::unique_lock status_lock(status_wait_mutex); if (status == STOP) { return -1; } + status = STOP; + status_wait_cond.notify_all(); task_cond.notify_all(); + for (auto& i : threads) { if (i.thread->joinable()) { i.thread->join(); } } + threads.clear(); cur_thread_num = 0; idle_thread_num = 0; @@ -56,8 +62,10 @@ int ThreadPool::Pause() { } int ThreadPool::Resume() { + const std::unique_lock status_lock(status_wait_mutex); if (status == PAUSE) { status = RUNNING; + status_wait_cond.notify_all(); } return 0; } @@ -79,8 +87,9 @@ bool ThreadPool::CreateThread() { std::thread* thread = new std::thread([this] { bool initialRun = true; while (status != STOP) { - while (status == PAUSE) { - std::this_thread::yield(); + { + std::unique_lock status_lock(status_wait_mutex); + status_wait_cond.wait(status_lock, [this]() { return status != Status::PAUSE; }); } Task task; diff --git a/include/cpr/threadpool.h b/include/cpr/threadpool.h index bb7e7f214..1d727c1b9 100644 --- a/include/cpr/threadpool.h +++ b/include/cpr/threadpool.h @@ -16,7 +16,7 @@ #define CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM std::thread::hardware_concurrency() constexpr size_t CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM = 1; -constexpr std::chrono::milliseconds CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME{60000}; +constexpr std::chrono::milliseconds CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME{250}; namespace cpr { @@ -25,27 +25,38 @@ class ThreadPool { using Task = std::function; explicit ThreadPool(size_t min_threads = CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM, size_t max_threads = CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM, std::chrono::milliseconds max_idle_ms = CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME); + ThreadPool(const ThreadPool& other) = delete; + ThreadPool(ThreadPool&& old) = delete; virtual ~ThreadPool(); + ThreadPool& operator=(const ThreadPool& other) = delete; + ThreadPool& operator=(ThreadPool&& old) = delete; + void SetMinThreadNum(size_t min_threads) { min_thread_num = min_threads; } + void SetMaxThreadNum(size_t max_threads) { max_thread_num = max_threads; } + void SetMaxIdleTime(std::chrono::milliseconds ms) { max_idle_time = ms; } + size_t GetCurrentThreadNum() { return cur_thread_num; } + size_t GetIdleThreadNum() { return idle_thread_num; } + bool IsStarted() { return status != STOP; } + bool IsStopped() { return status == STOP; } @@ -107,14 +118,19 @@ class ThreadPool { time_t stop_time; }; - std::atomic status; - std::atomic cur_thread_num; - std::atomic idle_thread_num; - std::list threads; - std::mutex thread_mutex; - std::queue tasks; - std::mutex task_mutex; - std::condition_variable task_cond; + std::atomic status{Status::STOP}; + std::condition_variable status_wait_cond{}; + std::mutex status_wait_mutex{}; + + std::atomic cur_thread_num{0}; + std::atomic idle_thread_num{0}; + + std::list threads{}; + std::mutex thread_mutex{}; + + std::queue tasks{}; + std::mutex task_mutex{}; + std::condition_variable task_cond{}; }; } // namespace cpr diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d4aa16ef9..49bca5f57 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -64,6 +64,7 @@ add_cpr_test(interceptor_multi) add_cpr_test(multiperform) add_cpr_test(resolve) add_cpr_test(multiasync) +add_cpr_test(threadpool) if (ENABLE_SSL_TESTS) add_cpr_test(ssl) diff --git a/test/threadpool_tests.cpp b/test/threadpool_tests.cpp new file mode 100644 index 000000000..66b42c1d5 --- /dev/null +++ b/test/threadpool_tests.cpp @@ -0,0 +1,106 @@ +#include +#include +#include + + +#include "cpr/threadpool.h" + +TEST(ThreadPoolTests, DISABLED_BasicWorkOneThread) { + std::atomic_uint32_t invCount{0}; + uint32_t invCountExpected{100}; + + { + cpr::ThreadPool tp; + tp.SetMinThreadNum(1); + tp.SetMaxThreadNum(1); + tp.Start(0); + + for (size_t i = 0; i < invCountExpected; ++i) { + tp.Submit([&invCount]() -> void { invCount++; }); + } + + // Wait for the thread pool to finish its work + tp.Wait(); + } + + EXPECT_EQ(invCount, invCountExpected); +} + +TEST(ThreadPoolTests, DISABLED_BasicWorkMultipleThreads) { + std::atomic_uint32_t invCount{0}; + uint32_t invCountExpected{100}; + + { + cpr::ThreadPool tp; + tp.SetMinThreadNum(1); + tp.SetMaxThreadNum(10); + tp.Start(0); + + for (size_t i = 0; i < invCountExpected; ++i) { + tp.Submit([&invCount]() -> void { invCount++; }); + } + + // Wait for the thread pool to finish its work + tp.Wait(); + } + + EXPECT_EQ(invCount, invCountExpected); +} + +TEST(ThreadPoolTests, DISABLED_PauseResumeSingleThread) { + std::atomic_uint32_t invCount{0}; + + uint32_t repCount{100}; + uint32_t invBunchSize{20}; + + cpr::ThreadPool tp; + tp.SetMinThreadNum(1); + tp.SetMaxThreadNum(10); + tp.Start(0); + + for (size_t i = 0; i < repCount; ++i) { + tp.Pause(); + EXPECT_EQ(invCount, i * invBunchSize); + + for (size_t e = 0; e < invBunchSize; ++e) { + tp.Submit([&invCount]() -> void { invCount++; }); + } + tp.Resume(); + // Wait for the thread pool to finish its work + tp.Wait(); + + EXPECT_EQ(invCount, (i + 1) * invBunchSize); + } +} + +TEST(ThreadPoolTests, DISABLED_PauseResumeMultipleThreads) { + std::atomic_uint32_t invCount{0}; + + uint32_t repCount{100}; + uint32_t invBunchSize{20}; + + cpr::ThreadPool tp; + tp.SetMinThreadNum(1); + tp.SetMaxThreadNum(10); + tp.Start(0); + + for (size_t i = 0; i < repCount; ++i) { + tp.Pause(); + EXPECT_EQ(invCount, i * invBunchSize); + + for (size_t e = 0; e < invBunchSize; ++e) { + tp.Submit([&invCount]() -> void { invCount++; }); + } + tp.Resume(); + // Wait for the thread pool to finish its work + tp.Wait(); + + EXPECT_EQ(invCount, (i + 1) * invBunchSize); + } +} + + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}