C++11 提供了两种条件变量:
- condition_variable: 配合
std::unique_lock<std::mutex>
进行 wait 操作。 - condition_variable_any: 和任意带有 lock, unlock 语义的 mutex 搭配使用,比较灵活,但效率比 condition_variable 差一些。
有时,一个 thread 需要等待某种外部事件,如另一个 thread 完成了一个任务或是已经过去了一段时间。
通过外部事件实现线程间通信的基本方法是使用 condition_variable,它提供了一种机制,允许一个 thread 等待另一个 thread。特别地,它允许一个 thread 等待某个条件(通常称为一个事件)发生,这种条件通常是其他 thread 完成工作产生的结果。
条件变量需要和互斥量配合起来使用。
考虑两个 thread 通过一个 queue 传递消息来通信的经典例子。
template <typename T> struct MessageQueue
{
public:
template <typename U> void push(U &&arg) {
std::unique_lock<std::mutex> guard(mtx);
q.push_back(std::forward<U>(arg));
cv.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [=]() { return !q.empty(); });
T out = std::move(q.front());
q.pop_front();
return out;
}
bool tryPop(T &out, std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mtx);
if (!cv.wait_for(lock, timeout, [=]() { return !q.empty(); }))
return false;
out = std::move(q.front());
q.pop_front();
return true;
}
private:
std::deque<T> q;
std::mutex mtx;
std::condition_variable cv;
};
- push() 方法用于将消息入队等待处理。在入队一个消息之前,必须保证获得对应于此队列的锁,入队之后通知其他线程并析构释放该锁。
- pop() 方法读取并处理消息。通过一个 mutex 上的 unique_lock 显式保护对 queue 和 condition_variable 的操作。线程在 condition_variable 上等待时会释放已持有的锁,直到被唤醒后(此时队列非空)重新获取锁。
std::mutex responseLock;
std::vector<std::thread> workers;
std::condition_variable cv;
std::queue<std::function<void()>> jobs;
std::mutex jobLock;
std::atomic<size_t> jobCounter(0);
constexpr size_t N_WORKERS = 5;
constexpr size_t JOB_LIMIT = SET_REPEATS * N_LETTERS;
for (size_t j = 0; j < N_WORKERS; ++j) {
workers.push_back(std::thread([&jobCounter, &cv, &jobLock, &jobs]() {
while (jobCounter < JOB_LIMIT) {
std::unique_lock<std::mutex> l(jobLock);
cv.wait(l, [&jobCounter, &jobs] {
return jobs.size() || !(jobCounter < JOB_LIMIT);
});
if (!jobs.empty()) {
auto f = std::move(jobs.front());
jobs.pop();
l.unlock();
f();
++jobCounter;
l.lock();
}
}
cv.notify_all();
}));
}
auto stream = response.stream(Http::Code::Ok);
const char letter = 'A';
for (size_t s = 0; s < SET_REPEATS; ++s) {
for (size_t i = 0; i < N_LETTERS; ++i) {
auto job = [&stream, &responseLock, i]() -> void {
constexpr size_t nchunks = 10;
constexpr size_t chunk_size = LETTER_REPEATS / nchunks;
const std::string payload(chunk_size, letter + i);
{
std::lock_guard<std::mutex> guard(responseLock);
for (size_t chunk = 0; chunk < nchunks; ++chunk) {
stream.write(payload.c_str(), chunk_size);
stream.flush();
}
}
};
std::unique_lock<std::mutex> l(jobLock);
jobs.push(std::move(job));
l.unlock();
cv.notify_all();
}
}
for (auto &w : workers) {
w.join();
}
stream.ends();
不必多言。
这里简单补充一下这二者的使用场景区别,以作备忘。
以生产消费模型为例。如果有多个消费者。生产者锁保护生产一条消息后,通过 notify_one 通知一条消费者线程进行消费。
如果所有的线程都等待退出,结束线程通过 notify_all 通知所有线程可以退出。如果这时通过 notify_one 通知,则只能随机退出一条线程。