本文地址: https://www.cnblogs.com/TssiNG-Z/p/16930283.html

通过源码理解C++11中condition_variable的内部逻辑

在<<C++并发编程实战>>中有提到std::condition_variable仅限于与std::mutex一起使用,
其原因在于std::condition_variable的工作原理类似下述代码

bool flag;
std::mutex m;
void wait_for_flag()
    std::unique_lock<std::mutex> lk(m);
    while (!flag)
        lk.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        lk.lock();

通过上述代码我们可以了解到, 条件变量的实际工作原理大致为 :
对一个互斥量加锁后, 检查条件是否成立, 如果不成立, 则对互斥量解锁并休眠一定时间,
在休眠结束后, 再次对互斥量加锁, 并重复上述步骤.

在上述逻辑中, 通过对互斥量加锁保证了条件变化的同步操作, 下面看下condition_variable的实际应用例子

std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread()
    while (more_data_to_prepare())
        data_chunk const data = prepare_data();
            std::lock_guard<std::mutex> lk(mut);
            data_queue.push(data);
        data_cond.notify_one();
void data_processing_thread()
    while (true)
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [] {return !data_queue.empty();});
        data_chunk data = data_queue.front();
        data_queue.pop();
        lk.unlock();
        process(data);
        if (is_last_chunk(data)) break;

上例包含了两个线程函数, 一个用于生产数据, 一个用于消费数据, 两线程间通过条件变量和互斥量来进行同步操作,
具体过程为:

  • 生产线程data_preparation_thread在对互斥量加锁后, 向队列推入数据并触发一个条件变量信号, 此处我们看一下MSVC19中notifiy_one的声明与实现
  • void notify_one() noexcept { // wake up one waiter
        _Cnd_signal(_Mycnd());
    

    下面再看下cnd_signal在threads.h中的释义:

    Unblocks one thread that currently waits on condition variable pointed to by cond.
    If no threads are blocked, does nothing and returns thrd_success. 
    

    可以了解到, notify_one只会唤醒当前等待对应条件变量的某一个线程, 具体是哪个线程, 我用下述代码做了个验证

    std::condition_variable g_cond;
    std::mutex g_mtx;
    bool flag = false;
    void foo(int idx)
        while (true)
            std::unique_lock<std::mutex> lk(g_mtx);
            g_cond.wait(lk, [] { return flag; });
            flag = false;
            printf("thread %d awake\n", idx);
    int main()
        std::future<void> a1 = std::async(foo, 1);
        std::future<void> a2 = std::async(foo, 2);
        std::future<void> a3 = std::async(foo, 3);
        while (true)
                std::lock_guard<std::mutex> lk(g_mtx);
                flag = true;
            g_cond.notify_one();
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return 0;
    

    结果输出如下:

    thread 1 awake
    thread 1 awake
    thread 2 awake
    thread 3 awake
    thread 1 awake
    thread 2 awake
    thread 3 awake
    thread 1 awake
    thread 2 awake
    thread 3 awake
    thread 1 awake
    thread 2 awake
    thread 3 awake
    thread 1 awake
    thread 2 awake
    thread 3 awake
    thread 1 awake
    thread 2 awake
    thread 3 awake
    

    后面的输出也全都一致, 除开第一次唤醒, 后面所有的唤醒都是顺序的, 这里或许跟我设置的休眠时间有关, 跟平台也应该有关系.
    那么参考上面这种方式, 就可以实现一个低cpu占用且带统一启停的线程池, 本文暂不介绍, 不过这里简单介绍一下我的思路:

    线程池中所有线程通过等待条件变量来获取任务队列中的任务, 任务派发则通过条件变量notify_one来进行, 当线程池释放时, 通过notify_all来唤醒所有线程并设置结束标志, 之后循环join所有线程等待线程退出即可.

  • 接上文, 继续对样例代码进行解析, 在消费线程data_processing_thread中, 首先声明了一个unique_lock, 这里要注意, unique_lock在初始化时就会对互斥量上锁, 这个上锁动作在没有手动解开时, 会一直持续到生命周期结束; 而紧接着, 条件变量就带着这把锁进入了等待状态, 而对标while循环中条件的flag则是一个匿名函数, 其中以队列是否为空作为条件是否成立的标准(防止假醒, 即没有人为notify的情况下, wait到了不干净的东西); 在这之后, 则是对队列的出队列操作以及数据的处理, 这里需要注意的是, 通过封装, 锁的释放动作被隐藏在wait里了, 在wait成功时, unique_lock仍处在上锁状态, 所以在操作完互斥资源后, 手动调用了unlock来释放互斥量, 提高了效率, 那么此处我们再次引用MSVC19的源码实现, 看下wait中发生了什么:
  • void wait(unique_lock<mutex>& _Lck) { // wait for signal
        // Nothing to do to comply with LWG-2135 because std::mutex lock/unlock are nothrow
        _Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx());
    template <class _Predicate>
    void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // wait for signal and test predicate
        while (!_Pred()) {
            wait(_Lck);
    

    可以看到, wait内部的实现和本文开始的描述是一致的, 在循环判断条件是否成立的情况下, 通过cnd_wait等待信号, 这里我们看下threads.h中cnd_wait的解释:

    Atomically unlocks the mutex pointed to by mutex and blocks on the
    condition variable pointed to by cond until the thread is signalled
    by cnd_signal or cnd_broadcast, or until a spurious wake-up occurs.
    The mutex is locked again before the function returns.
    The behavior is undefined if the mutex is not already locked by the calling thread. 
    

    在cnd_wait函数入口, 互斥量会被释放, 然后持续等待信号, 当信号接收到之后, 互斥量被重新加锁, 函数返回; 此处强调了互斥量必须处在已被加锁的状态.

    PS:我还查阅了一下带超时的wait_for和wait_until, wait_for内部是对wait_until的封装, wait_until内部则是调用的cnd_timedwait, 在函数返回前, 互斥量也同wait一样被还原为上锁状态了.

    综合上述内容, 在实际应用中, 需要注意两点:

  • 在条件变量等待前后, 注意互斥量的上锁状态, 因为实际应用场景下, 互斥资源可能要比例子中要复杂.
  • 在条件变量等待期间, 一定要将对互斥资源实际状况的检查加入到判断条件中, 以此来避免假醒和错过信号.
  • 参考文献:

  • <<C++并发编程实战>>
  • Microsoft Visual Studio/2019/Community/VC/Tools/MSVC/14.29.30133/include/mutex
  • 以上, 如有错误疏漏或疑问, 欢迎指正讨论, 转载请注明.