C++ Concurrency in Action

Managing threads

Basic thread

std::thread可以使用函数和callable对象创建

code
 1#include <iostream>
 2#include <thread>
 3#include <mutex>
 4
 5using namespace std;
 6mutex print_lock;
 7
 8void hello_func()
 9{
10    lock_guard<mutex> lg{print_lock};
11    cout << "hello function: " << std::this_thread::get_id() << endl;
12}
13
14class hello_class
15{
16public:
17    void operator()()
18    {
19        lock_guard<mutex> lg{print_lock};
20        cout << "hello callable: " << std::this_thread::get_id() << endl;
21    }
22};
23
24int main()
25{
26    std::thread thread_func(hello_func);
27    hello_class c;
28    std::thread thread_class(c);
29    std::thread thread_lambda([] {
30        lock_guard<mutex> lg{print_lock};
31        cout << "hello lambda: " << std::this_thread::get_id() << endl;
32    });
33    {
34        lock_guard<mutex> lg{print_lock};
35        cout << "main: " << std::this_thread::get_id() << endl;
36    }
37    thread_func.join();
38    thread_class.join();
39    thread_lambda.join();
40    return 0;
41}

使用detach分离线程和当前线程, 使用join等待线程完成, 如果在调用join时线程是非joinable的就会出现异常, 首先要判断是否joinable

**RAII(Resource Acquisition Is Initialization)**方式管理线程

C++构造函数获取资源, 析构函数释放资源, 利用C++对象必定会析构的特征

code
 1#include <thread>
 2#include <chrono>
 3#include <iostream>
 4
 5//using namespace std;
 6using namespace std::chrono_literals;
 7
 8class thread_guard
 9{
10    std::thread &t;
11public:
12    explicit thread_guard(std::thread &t_) : t(t_) {}
13    ~thread_guard() {
14        if (t.joinable()) {
15            t.join();
16        }
17    }
18    thread_guard(thread_guard const &) = delete;
19    thread_guard &operator=(thread_guard const &)=delete;
20};
21
22void func(int n) {
23    for (int i = 0; i < n; ++i) {
24        std::this_thread::sleep_for(1s);
25        std::cout << "sleep for 1s" << std::endl;
26    }
27}
28
29int main()
30{
31    std::thread t1(func, 3);
32    thread_guard guard(t1);
33    return 0;
34}

std::thread对象在detach后就进入后台运行, 没有任何手段获取该对象的引用, 也不能join, 该对象从此由C++ Runtime Library管理. 在Unix中分离的线程成为daemon threads, 同样的进程称为daemon process, 代表在后台运行没有显示的用户界面. 通常这种后台线程用于监视文件系统, 清理缓存中无用的记录, 优化数据结构. 同时也用于验证在fire and forget(收发隔离系统)任务中

只有在joinable()为真时才能detach()

例子:

code
 1void edit_document(std::string const& filename)
 2{
 3	open_document_and_display_gui(filename);
 4	while(!done_editing())
 5	{
 6		user_command cmd=get_user_input();
 7		if(cmd.type==open_new_document)
 8		{
 9			std::string const new_name=get_filename_from_user();
10			std::thread t(edit_document,new_name);
11			t.detach();
12		}
13		else
14		{
15			process_user_input(cmd);
16		}
17	}
18}

在新建文件时创建新的线程, 后台处理文本编辑命令, command pattern

Passing arguments to a thread function

传入线程的参数是引用和指针时要考虑变量的生存周期, 但有时候线程需要更新数据就要传输引用进去

就算函数声明的是引用, 在构造线程传参时依旧会变成拷贝, 例子:

code
 1#include <thread>
 2#include <string>
 3#include <iostream>
 4
 5using namespace std;
 6
 7struct people
 8{
 9    int age;
10    string name;
11};
12
13void happy_birthday(people &p)
14{
15    cout  << p.name << " is " << p.age++
16    << " years old, next year will be " << p.age << endl;
17}
18
19int main()
20{
21    people p{18, "Bob"};
22    std::thread t(happy_birthday, p);
23    t.join();
24    cout << "age :" << p.age << endl;
25}
26
27//结果
28Bob is 18 years old, next year will be 19
29age :18

Transferring ownership of a thread

传递引用

为了传引用必须使用std::ref(p)才能成功

绑定成员函数

1struct cat
2{ void sleep() {}; }
3cat c;
4std::thread t(&cat::sleep, &c);

第一个参数是类的指针

传递右值(变量转移)

1void process_big_object(std::unique_ptr<big_object>);
2std::unique_ptr<big_object> p(new big_object);
3p->prepare_data(42);
4std::thread t(process_big_object,std::move(p));

thread是moveable的, unique_ptr, 和ifstream也是, 可以用std::move, 可以用thread::move

code
 1#include <thread>
 2#include <iostream>
 3#include <chrono>
 4
 5using namespace std;
 6using namespace std::chrono_literals;
 7
 8void some_function1()
 9{
10    for (int i = 0; i < 100; ++i) {
11        this_thread::sleep_for(1s);
12        cout << "func 1, this id: " << this_thread::get_id() << endl;
13    }
14}
15
16void some_function2()
17{
18    for (int i = 0; i < 100; ++i) {
19        this_thread::sleep_for(1s);
20        cout << "func 2, this id: " << this_thread::get_id() << endl;
21    }
22}
23
24int main()
25{
26    thread t1(some_function1);
27    this_thread::sleep_for(3s);
28    thread t2 = std::move(t1);
29    this_thread::sleep_for(3s);
30    t1 = thread(some_function2);
31    this_thread::sleep_for(3s);
32    thread t3 = std::move(t2);
33    this_thread::sleep_for(3s);
34    t1 = std::move(t3);     // std::terminate()会发生
35}

在已经运行函数的线程中用另外一个线程赋值会崩溃, move线程不会改变线程id

Choosing the number of threads at runtime

使用std::thread::hardware_concurrency()可以得到cpu并发能力

std::accumulate累加算法, std::distance迭代器直接的距离

Identifying threads

使用std::thread::id来标识线程, 对于当前线程使用std::this_thread::get_id()

Sharing data between threads

涉及线程间共享数据, 用锁保护数据, 其他保护共享数据的方法

Problems with sharing data between threads

双向链表的移除, 先修改两边的指针, 然后再删除, 然而修改指针分两步, 左边和右边, 与此同时如果线程进行读取将会出问题

解决办法有两种:

  • 采用数据结构保护机制
  • 采用无锁编程, memory model需要设计
  • 采用STM(software transactional memory), 参考事务内存

Protecting shared data with mutexes

不推荐直接使用std::mutex, 而是使用std::lock_guard

可以把mutex写在结构体里面, 但是要确定结构体里面的正确的数据被保护起来了

原则: 不要在mutex保护范围外传递被保护数据的引用和指针, 或者通过函数返回, 或者在可见的额外内存存储, 或者把他们当变量传递给其他用户提供的函数

设计接口问题:

对于总体需要进行保护的数据, 比如双向链表, 单保护被删除的对象不行, 其左右两边的对象也需要加锁;

对于类似stack这种数据, 其empty(),top()size()方法虽然调用的时候是正确的, 但是在实际使用的时候, 一旦返回完这两个函数的结果, 另一个线程对数据进行改变, 那么这两个结果可能就失去了正确性, 就算在数据结构内部增加mutex防止读写问题也没用

对于大量数据的操作, 比如stack<vector<int>>, 如果stack在pop的时候由于申请新的内存失败, 然后没能把数据拷贝出去就删除了数据, 那么就会造成数据的永久丢失, 所以stack的接口设计了toppop, 把数据拷贝和数据删除分开, 防止出现该问题

解决方案1:

传递一个引用到pop里面去, 确保内存申请操作在pop函数里面得到确认

1std::vector<int> result;
2some_stack.pop(result);

缺点是需要额外的构造变量, 而且有些对象没有默认构造只有拷贝构造或者构造的时候就初始化, 不支持默认构造

解决方案2:

使用移动构造, 或者无异常构造, 缺点就是有些用户定义的对象不可避免在构造函数抛出异常

解决方案3:

返回被pop的对象的指针, 缺点就是对于int这种数据返回指针不划算, 同时返回的指针容易产生内存泄露, 也可以用共享指针来管理内存

实现代码:

code
 1class empty_stack : public std::exception
 2{
 3public:
 4    empty_stack() noexcept
 5            : exception("empty stack", 1)
 6    {}
 7};
 8
 9template<typename T>
10class threadsafe_stack
11{
12private:
13    std::stack<T> data;
14    mutable std::mutex m;
15public:
16    threadsafe_stack() = default;
17
18    threadsafe_stack(const threadsafe_stack &other)
19    {
20        std::lock_guard<std::mutex> lock(other.m);
21        data = other.data;
22    }
23
24    void push(T new_value)
25    {
26        std::lock_guard<std::mutex> lock(m);
27        data.push(new_value);
28    }
29
30    std::shared_ptr<T> pop()
31    {
32        std::lock_guard<std::mutex> lock(m);
33        if (data.empty()) throw empty_stack();
34        std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
35        data.pop();
36        return res;
37    }
38
39    void pop(T &value)
40    {
41        std::lock_guard<std::mutex> lock(m);
42        if (data.empty()) throw empty_stack();
43        value = data.top();
44        data.pop();
45    }
46
47    bool empty()
48    {
49        std::lock_guard<std::mutex> lock(m);
50        return data.empty();
51    }
52};

fine-grained locking scheme: 细粒度锁方案

如果采用细粒度锁会增加复杂度, 而且多个锁会有可能造成死锁

死锁的解决办法是每次都按照相同的顺序给两个mutex上锁, C++标准库中有std::lock可以同时对两个锁加锁而不会产生死锁

code
 1class some_big_object;
 2void swap(some_big_object &lhs, some_big_object &rhs);
 3
 4class X
 5{
 6private:
 7    some_big_object some_details;
 8    std::mutex m;
 9public:
10    X(some_big_object const &d) : some_detail(sd) {}
11    friend void swap(X &lhs, X&rhs)
12    {
13        if (&lhs == &rhs) return;
14        std::lock (lhs.m, rhs.m);
15        std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
16        std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
17        swap(lhs.some_detail, rhs.some_detail);
18    }
19};

函数std::adopt_lock的含义是表示构造函数第一个参数中的锁已经锁上了, 再声明std::lock_guard则是程序结束后释放锁

死锁问题还存在于两个线程互相调用join, 都在等待对方执行完毕, 所以在对方线程有几率等待己方线程时不要等待对方线程

TIPS:

AVOID NESTED LOCKS(避免嵌套锁)

在已经保持了一个锁之后不要再请求一个锁

AVOID CALLING USER-SUPPLIED CODE WHILE HOLDING A LOCK(有锁时避免调用用户提供的代码)

由于不知道用户代码会做什么, 很可能导致死锁, 不要传闭包或者函数指针进去

ACQUIRE LOCKS IN A FIXED ORDER(以固定顺序获取锁)

对于两个锁或者多个锁, 每个线程中以固定顺序获取可以防止死锁

对于类似链表这种数据结构, 考虑每个节点上锁, 以逐节向上锁定的方式可以允许多线程访问链表, 但是必须以相同的顺序访问, 先获取后一个的锁然后释放前一个的锁, 对于删除则需要获取删除元素和删除元素两边的锁, 同样需要按照顺序锁定三个元素, 需要定义遍历顺序

USE A LOCK HIERARCHY(使用锁层次)

将应用程序分层, 并确认所有能够在任意层级被上锁的互斥元

code
 1#include <mutex>
 2
 3class hierarchical_mutex
 4{
 5    std::mutex internal_mutex;
 6    unsigned long const hierarchy_value;
 7    unsigned long previous_hierarchy_value;
 8    static thread_local unsigned long this_thread_hierarchy_value;
 9
10    void check_for_heirarchy_violation() const
11    {
12        if (this_thread_hierarchy_value <= hierarchy_value) {
13            throw std::logic_error("mutex hierarchy violated");
14        }
15    }
16
17    void update_hierarchy_value()
18    {
19        previous_hierarchy_value = this_thread_hierarchy_value;
20        this_thread_hierarchy_value = hierarchy_value;
21    }
22
23public:
24    explicit hierarchical_mutex(unsigned long value) : hierarchy_value(value), previous_hierarchy_value(0)
25    {}
26
27    void lock()
28    {
29        check_for_heirarchy_violation();
30        internal_mutex.lock();
31        update_hierarchy_value();
32    }
33
34    void unlock()
35    {
36        this_thread_hierarchy_value = previous_hierarchy_value;
37        internal_mutex.unlock();
38    }
39
40    bool try_lock()
41    {
42        check_for_heirarchy_violation();
43        if (!internal_mutex.try_lock())
44            return false;
45        update_hierarchy_value();
46        return true;
47    }
48};
49
50// thread_local表示每个线程都会有一个该变量的副本
51thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
52
53hierarchical_mutex high_level_mutex(10000);
54hierarchical_mutex low_level_mutex(5000);
55
56int do_low_level_stuff();
57
58int low_level_func()
59{
60    std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
61    return do_low_level_stuff();
62}
63
64void high_level_stuff(int some_param);
65
66void high_level_func()
67{
68    std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
69    high_level_stuff(low_level_func());
70}
71
72void thread_a()
73{
74    high_level_func();
75}
76
77hierarchical_mutex other_mutex(100);
78void do_other_stuff();
79
80void other_stuff()
81{
82    high_level_func();
83    do_other_stuff();
84}
85
86void thread_b()
87{
88    std::lock_guard<hierarchical_mutex> lk(other_mutex);
89    other_stuff();
90}

线程a遵守规则, b不遵守. a先调用高层次API, 然后锁定了高层次锁, 给自己设置了高层次值(10000), 才能通过低层次的锁的验证, 从而获取低层次的锁; 而b先锁住了低层次的锁(100), 转而调用高层次API, 由于层次值低于(10000)被判定不能获取锁

层次锁的好处就是从上至下设置好层次值后, 通过多高的层次值调用会依次给下层设置更高的层次值从而锁定, 比如:

stateDiagram-v2

HIGH_API(1000)-->MID_API(100)
HIGH1_API(1000)-->MID_API(100)
MID_API(100)-->LOW_API(10)
  

this_thread_hierarchy_value为10000初始值下, 从上至下调用后锁变为

stateDiagram-v2

HIGH_API(10000)-->MID_API(1000)
HIGH1_API(1000)-->MID_API(1000)
MID_API(1000)-->LOW_API(100)
  

从而能够防止其他API比如HIGH1_API来调用MID_API, 退出后值又会返回原来的值

EXTENDING THESE GUIDELINES BEYOND LOCKS(把这些原则扩展到锁以为的地方去)

死锁不光会在有锁的地方发生, 也可能发生在循环等待(互相等待)中

使用std::unique_lock

std::unique_lock的第二个参数可以传入std::defer_lockstd::adopt_lock, 前者必须要求std::lock_guard在构造时锁没锁上

1unique_lock(_Mutex& _Mtx, adopt_lock_t)
2        : _Pmtx(_STD addressof(_Mtx)), _Owns(true) {} // construct and assume already locked
3
4unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept
5        : _Pmtx(_STD addressof(_Mtx)), _Owns(false) {} // construct but don't lock
code
 1#include <mutex>
 2
 3class some_big_object {};
 4void swap(some_big_object &lhs, some_big_object &rhs);
 5class X
 6{
 7private:
 8    some_big_object some_detail;
 9    std::mutex m;
10public:
11    X(some_big_object const &sd) : some_detail(sd) {}
12    friend void swap(X &lhs, X &rhs) {
13        if (&lhs == &rhs)
14            return;
15        std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
16        std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
17        std::lock(lock_a, lock_b);
18        swap(lhs.some_detail, rhs.some_detail);
19    }
20};

std::unique_lock可移动不可复制, 允许函数返回std::unique_lock给调用者

std::unique_lock支持手动unlock而不需要等到析构, 可以提升应用程序性能, 而std::lock_guard仅在作用域范围内生效, 可以把std::unique_lock看作类似std::unique_ptr类似的功能

锁粒度在恰当的粒度

如果可能, 仅在实际访问共享数据的时候锁定互斥元, 尝试在锁的外面做任意数据处理, 否则由于多个线程同时等待同一个互斥元会耗费大量的时间

1void get_and_process_data()
2{
3    std::unique_lock<std::mutex> lk(mutex);
4    data d = get_next_data();
5    lk.unlock();	//解锁
6    result r = process(d);
7    lk.lock();		//重新上锁
8    write_result(r);
9}

在持有锁时, 避免诸如文件I/O等操作

一般情况下, 只应该以执行要求的操作所需的最小可能时间而去持有锁

反例:

code
 1class Y
 2{
 3private:
 4    int some_detail;
 5    mutable std::mutex m;
 6
 7    int get_detail() const
 8    {
 9        std::lock_guard<std::mutex> lk(m);
10        return some_detail;
11    }
12
13public:
14    explicit Y(int sd) : some_detail(sd)
15    {}
16
17    friend bool operator==(Y const &lhs, Y const &rhs)
18    {
19        if (&lhs == &rhs) return true;
20        int const lhs_value = lhs.get_detail();
21        int const rhs_value = rhs.get_detail();
22        return lhs == rhs;
23    }
24};

在这段代码中, 当比较的结果返回时, 由于细粒度锁, 可能此时lhs的值已经改变了, 返回的结果也不是正确的, 这要看对当前比较函数的定义

Alternative facilities for protecting shared data

考察那种只有在初始化时才需要进行保护以后都是只读的数据

lazy initialization

1std::shared_ptr<some_resource> resource_ptr;
2void foo()
3{
4    if (!resource_ptr) {
5        resource_ptr = std::make_shared<some_resource>();
6    }
7    resource_ptr->do_something();
8}

通常的做法:

 1std::shared_ptr<some_resource> resource_ptr;
 2std::mutex resource_mutex;
 3void foo()
 4{
 5    std::unique_lock<std::mutex> lk(resource_mutex);
 6    if (!resource_ptr) {
 7        resource_ptr = std::make_shared<some_resource>();
 8    }
 9    lk.unlock();
10    resource_ptr->do_something();
11}

如果改成:

 1std::shared_ptr<some_resource> resource_ptr;
 2std::mutex resource_mutex;
 3void foo()
 4{
 5    if (!resource_ptr) {
 6        std::unique_lock<std::mutex> lk(resource_mutex);
 7        if (!resource_ptr) {
 8            resource_ptr = std::make_shared<some_resource>();
 9        }
10    }
11    resource_ptr->do_something();
12}

这种做法会导致数据竞争, 锁的外部调用resource_ptr和锁内部进行写入不同步

标准C++库提供std::call_once保证函数只会调用一次

code
 1#include <iostream>
 2#include <thread>
 3#include <mutex>
 4
 5std::once_flag flag1, flag2;
 6
 7void simple_do_once()
 8{
 9    std::call_once(flag1, []() {
10        std::cout << "Simple example: called once: "
11                  << std::this_thread::get_id() << std::endl;
12    });
13}
14
15void may_throw_function(bool do_throw)
16{
17    if (do_throw) {
18        std::cout << "throw: call_once will retry\n"; // this may appear more than once
19        throw std::exception();
20    }
21    std::cout << "Didn't throw, call_once will not attempt again\n"; // guaranteed once
22}
23
24void do_once(bool do_throw)
25{
26    try {
27        std::call_once(flag2, may_throw_function, do_throw);
28    }
29    catch (...) {
30    }
31}
32
33int main()
34{
35    std::thread st1(simple_do_once);
36    std::thread st2(simple_do_once);
37    std::thread st3(simple_do_once);
38    std::thread st4(simple_do_once);
39    st1.join();
40    st2.join();
41    st3.join();
42    st4.join();
43
44    std::thread t1(do_once, true);
45    std::thread t2(do_once, true);
46    std::thread t3(do_once, false);
47    std::thread t4(do_once, true);
48    t1.join();
49    t2.join();
50    t3.join();
51    t4.join();
52}

结果

Simple example: called once: 13464
throw: call_once will retry
throw: call_once will retry
throw: call_once will retry
Didn't throw, call_once will not attempt again

std::call_once很容易用于类成员延迟初始化

静态变量也可以避免初始化的竞争

1class my_class;
2my_class &get_instance()
3{
4    static my_class instance;
5    return instance;
6}

读写锁, 多个可以同时读取, 但是写入和读取互斥, 多个同时写入也是互斥

std::shared_mutexC++17标准加入, 之前在boost里面, std::shared_mutexstd::shared_lock去锁可以做到多个读取不会互相竞争, 而写用std::lock_guard和读取竞争即可

code
 1#include <thread>
 2#include <shared_mutex>
 3#include <chrono>
 4#include <iostream>
 5
 6using namespace std::chrono_literals;
 7
 8std::shared_mutex count_mutex;
 9int count = 0;
10
11void increase_count()
12{
13    std::lock_guard<std::shared_mutex> lk(count_mutex);
14    count +=1;
15}
16
17int get_count(std::chrono::duration<long long int, std::ratio<1>> sec)
18{
19    std::shared_lock<std::shared_mutex> lk(count_mutex);
20    std::this_thread::sleep_for(sec);
21    return count;
22}
23
24void increase_thread(int c)
25{
26    for (int i = 0; i < c; ++i) {
27        increase_count();
28        std::this_thread::sleep_for(2.2s);
29        std::cout << "thread " << std::this_thread::get_id() << " increase" << std::endl;
30    }
31}
32
33void get_thread(int c)
34{
35    for (int i = 0; i < c; ++i) {
36        int n = get_count(1s);
37        std::cout << "thread " << std::this_thread::get_id() << " get " << n << std::endl;
38    }
39}
40
41
42int main()
43{
44    std::thread t1(increase_thread, 10);
45    std::thread t2(get_thread, 10);
46    std::thread t3(get_thread, 10);
47    t1.join();
48    t2.join();
49    t3.join();
50    return 0;
51}

递归锁std::recursive_mutex, 可以在一个线程里面多次锁, 但是也要多次释放, 可以用std::lock_guard进行处理

Synchronizing concurrent operations

任务同步, 条件变量(condition variables), 期值(future)

Waiting for an event or other condition

等待任务完成

  1. 不断查询状态
  2. 每次查询状态进行延时
  3. 采用条件变量

条件变量std::condition_variablestd::condition_variable_any

code
 1#include<mutex>
 2#include<queue>
 3
 4struct data_chunk
 5{
 6};
 7
 8std::mutex mut;
 9std::queue<data_chunk> data_queue;
10std::condition_variable data_cond;
11
12bool more_data_to_prepare()
13{ return true; }
14
15data_chunk prepare_data()
16{ return {}; }
17
18void process(data_chunk &data)
19{}
20
21bool is_last_chunk()
22{ return false; }
23
24void data_preparation_thread()
25{
26    while (!more_data_to_prepare()) {
27        data_chunk const data = prepare_data();
28        std::lock_guard<std::mutex> lk(mut);
29        data_queue.push(data);
30        data_cond.notify_one();
31    }
32}
33
34void data_processing_thread()
35{
36    while (true) {
37        std::unique_lock<std::mutex> lk(mut);
38        data_cond.wait(lk, [] { return !data_queue.empty(); });
39        data_chunk data = data_queue.front();
40        data_queue.pop();
41        lk.unlock();
42        process(data);
43        if (is_last_chunk())
44            break;
45    }
46}
1template< class Predicate >
2void wait( std::unique_lock<std::mutex>& lock, Predicate pred )

等价于

1while (!pred()) {
2    wait(lock);
3}

安全队列

code
 1#include <memory>
 2#include <queue>
 3#include <condition_variable>
 4
 5class data_chunk {};
 6
 7template<typename T>
 8class threadsafe_queue
 9{
10private:
11    mutable std::mutex mut;
12    std::queue<T> data_queue;
13    std::condition_variable data_cond;
14public:
15    threadsafe_queue() = default;
16    threadsafe_queue(const threadsafe_queue &other)
17    {
18        std::lock_guard<std::mutex> lk(mut);
19        data_queue = other.data_queue;
20    }
21    threadsafe_queue &operator=(const threadsafe_queue &) = delete;
22
23    void push(T new_value)
24    {
25        std::lock_guard<std::mutex> lk(mut);
26        data_queue.push(new_value);
27        data_cond.notify_one();
28    }
29
30    bool try_pop(T &value)
31    {
32        std::lock_guard<std::mutex> lk(mut);
33        if (data_queue.empty()) {
34            return false;
35        }
36        value = data_queue.front();
37        data_queue.pop();
38        return true;
39    }
40    std::shared_ptr<T> try_pop()
41    {
42        std::lock_guard<std::mutex> lk(mut);
43        if (data_queue.empty()) {
44            return std::shared_ptr<T>();
45        }
46        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
47        return res;
48    }
49
50    void wait_and_pop(T &value)
51    {
52        std::unique_lock<std::mutex> lk(mut);
53        data_cond.wait(lk, [this] { return !data_queue.empty();});
54        value = data_queue.front();
55        data_queue.pop();
56    }
57
58    std::shared_ptr<T> wait_and_pop()
59    {
60        std::unique_lock<std::mutex> lk(mut);
61        data_cond.wait(lk, [this] { return !data_queue.empty(); });
62        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
63        return res;
64    }
65
66    bool empty() const
67    {
68        std::lock_guard<std::mutex> lk(mut);
69        return data_queue.empty();
70    }
71};

Waiting for one-off events with futures

使用future等待一次性事件