C++ Concurrency in Action
Managing threads
Basic thread
std::thread
可以使用函数和callable对象创建
code
1#include <iostream>
2#include <thread>
3#include <mutex>
4
5using namespace std;
6mutex print_lock;
7
8void hello_func()
9{
10 lock_guard<mutex> lg{print_lock};
11 cout << "hello function: " << std::this_thread::get_id() << endl;
12}
13
14class hello_class
15{
16public:
17 void operator()()
18 {
19 lock_guard<mutex> lg{print_lock};
20 cout << "hello callable: " << std::this_thread::get_id() << endl;
21 }
22};
23
24int main()
25{
26 std::thread thread_func(hello_func);
27 hello_class c;
28 std::thread thread_class(c);
29 std::thread thread_lambda([] {
30 lock_guard<mutex> lg{print_lock};
31 cout << "hello lambda: " << std::this_thread::get_id() << endl;
32 });
33 {
34 lock_guard<mutex> lg{print_lock};
35 cout << "main: " << std::this_thread::get_id() << endl;
36 }
37 thread_func.join();
38 thread_class.join();
39 thread_lambda.join();
40 return 0;
41}
使用detach
分离线程和当前线程, 使用join
等待线程完成, 如果在调用join
时线程是非joinable
的就会出现异常, 首先要判断是否joinable
**RAII(Resource Acquisition Is Initialization)**方式管理线程
C++构造函数获取资源, 析构函数释放资源, 利用C++对象必定会析构的特征
code
1#include <thread>
2#include <chrono>
3#include <iostream>
4
5//using namespace std;
6using namespace std::chrono_literals;
7
8class thread_guard
9{
10 std::thread &t;
11public:
12 explicit thread_guard(std::thread &t_) : t(t_) {}
13 ~thread_guard() {
14 if (t.joinable()) {
15 t.join();
16 }
17 }
18 thread_guard(thread_guard const &) = delete;
19 thread_guard &operator=(thread_guard const &)=delete;
20};
21
22void func(int n) {
23 for (int i = 0; i < n; ++i) {
24 std::this_thread::sleep_for(1s);
25 std::cout << "sleep for 1s" << std::endl;
26 }
27}
28
29int main()
30{
31 std::thread t1(func, 3);
32 thread_guard guard(t1);
33 return 0;
34}
std::thread
对象在detach
后就进入后台运行, 没有任何手段获取该对象的引用, 也不能join
, 该对象从此由C++ Runtime Library管理. 在Unix中分离的线程成为daemon threads, 同样的进程称为daemon process, 代表在后台运行没有显示的用户界面. 通常这种后台线程用于监视文件系统, 清理缓存中无用的记录, 优化数据结构. 同时也用于验证在fire and forget(收发隔离系统)任务中
只有在joinable()
为真时才能detach()
例子:
code
1void edit_document(std::string const& filename)
2{
3 open_document_and_display_gui(filename);
4 while(!done_editing())
5 {
6 user_command cmd=get_user_input();
7 if(cmd.type==open_new_document)
8 {
9 std::string const new_name=get_filename_from_user();
10 std::thread t(edit_document,new_name);
11 t.detach();
12 }
13 else
14 {
15 process_user_input(cmd);
16 }
17 }
18}
在新建文件时创建新的线程, 后台处理文本编辑命令, command pattern
Passing arguments to a thread function
传入线程的参数是引用和指针时要考虑变量的生存周期, 但有时候线程需要更新数据就要传输引用进去
就算函数声明的是引用, 在构造线程传参时依旧会变成拷贝, 例子:
code
1#include <thread>
2#include <string>
3#include <iostream>
4
5using namespace std;
6
7struct people
8{
9 int age;
10 string name;
11};
12
13void happy_birthday(people &p)
14{
15 cout << p.name << " is " << p.age++
16 << " years old, next year will be " << p.age << endl;
17}
18
19int main()
20{
21 people p{18, "Bob"};
22 std::thread t(happy_birthday, p);
23 t.join();
24 cout << "age :" << p.age << endl;
25}
26
27//结果
28Bob is 18 years old, next year will be 19
29age :18
Transferring ownership of a thread
传递引用
为了传引用必须使用std::ref(p)
才能成功
绑定成员函数
1struct cat
2{ void sleep() {}; }
3cat c;
4std::thread t(&cat::sleep, &c);
第一个参数是类的指针
传递右值(变量转移)
1void process_big_object(std::unique_ptr<big_object>);
2std::unique_ptr<big_object> p(new big_object);
3p->prepare_data(42);
4std::thread t(process_big_object,std::move(p));
thread是moveable的, unique_ptr, 和ifstream也是, 可以用std::move, 可以用thread::move
code
1#include <thread>
2#include <iostream>
3#include <chrono>
4
5using namespace std;
6using namespace std::chrono_literals;
7
8void some_function1()
9{
10 for (int i = 0; i < 100; ++i) {
11 this_thread::sleep_for(1s);
12 cout << "func 1, this id: " << this_thread::get_id() << endl;
13 }
14}
15
16void some_function2()
17{
18 for (int i = 0; i < 100; ++i) {
19 this_thread::sleep_for(1s);
20 cout << "func 2, this id: " << this_thread::get_id() << endl;
21 }
22}
23
24int main()
25{
26 thread t1(some_function1);
27 this_thread::sleep_for(3s);
28 thread t2 = std::move(t1);
29 this_thread::sleep_for(3s);
30 t1 = thread(some_function2);
31 this_thread::sleep_for(3s);
32 thread t3 = std::move(t2);
33 this_thread::sleep_for(3s);
34 t1 = std::move(t3); // std::terminate()会发生
35}
在已经运行函数的线程中用另外一个线程赋值会崩溃, move线程不会改变线程id
Choosing the number of threads at runtime
使用std::thread::hardware_concurrency()
可以得到cpu并发能力
std::accumulate
累加算法, std::distance
迭代器直接的距离
Identifying threads
使用std::thread::id
来标识线程, 对于当前线程使用std::this_thread::get_id()
Sharing data between threads
涉及线程间共享数据, 用锁保护数据, 其他保护共享数据的方法
Problems with sharing data between threads
双向链表的移除, 先修改两边的指针, 然后再删除, 然而修改指针分两步, 左边和右边, 与此同时如果线程进行读取将会出问题
解决办法有两种:
- 采用数据结构保护机制
- 采用无锁编程, memory model需要设计
- 采用STM(software transactional memory), 参考事务内存
Protecting shared data with mutexes
不推荐直接使用std::mutex
, 而是使用std::lock_guard
可以把mutex写在结构体里面, 但是要确定结构体里面的正确的数据被保护起来了
原则: 不要在mutex保护范围外传递被保护数据的引用和指针, 或者通过函数返回, 或者在可见的额外内存存储, 或者把他们当变量传递给其他用户提供的函数
设计接口问题:
对于总体需要进行保护的数据, 比如双向链表, 单保护被删除的对象不行, 其左右两边的对象也需要加锁;
对于类似stack这种数据, 其empty()
,top()
和size()
方法虽然调用的时候是正确的, 但是在实际使用的时候, 一旦返回完这两个函数的结果, 另一个线程对数据进行改变, 那么这两个结果可能就失去了正确性, 就算在数据结构内部增加mutex防止读写问题也没用
对于大量数据的操作, 比如stack<vector<int>>
, 如果stack在pop的时候由于申请新的内存失败, 然后没能把数据拷贝出去就删除了数据, 那么就会造成数据的永久丢失, 所以stack的接口设计了top
和pop
, 把数据拷贝和数据删除分开, 防止出现该问题
解决方案1:
传递一个引用到pop里面去, 确保内存申请操作在pop函数里面得到确认
1std::vector<int> result;
2some_stack.pop(result);
缺点是需要额外的构造变量, 而且有些对象没有默认构造只有拷贝构造或者构造的时候就初始化, 不支持默认构造
解决方案2:
使用移动构造, 或者无异常构造, 缺点就是有些用户定义的对象不可避免在构造函数抛出异常
解决方案3:
返回被pop
的对象的指针, 缺点就是对于int这种数据返回指针不划算, 同时返回的指针容易产生内存泄露, 也可以用共享指针来管理内存
实现代码:
code
1class empty_stack : public std::exception
2{
3public:
4 empty_stack() noexcept
5 : exception("empty stack", 1)
6 {}
7};
8
9template<typename T>
10class threadsafe_stack
11{
12private:
13 std::stack<T> data;
14 mutable std::mutex m;
15public:
16 threadsafe_stack() = default;
17
18 threadsafe_stack(const threadsafe_stack &other)
19 {
20 std::lock_guard<std::mutex> lock(other.m);
21 data = other.data;
22 }
23
24 void push(T new_value)
25 {
26 std::lock_guard<std::mutex> lock(m);
27 data.push(new_value);
28 }
29
30 std::shared_ptr<T> pop()
31 {
32 std::lock_guard<std::mutex> lock(m);
33 if (data.empty()) throw empty_stack();
34 std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
35 data.pop();
36 return res;
37 }
38
39 void pop(T &value)
40 {
41 std::lock_guard<std::mutex> lock(m);
42 if (data.empty()) throw empty_stack();
43 value = data.top();
44 data.pop();
45 }
46
47 bool empty()
48 {
49 std::lock_guard<std::mutex> lock(m);
50 return data.empty();
51 }
52};
fine-grained locking scheme: 细粒度锁方案
如果采用细粒度锁会增加复杂度, 而且多个锁会有可能造成死锁
死锁的解决办法是每次都按照相同的顺序给两个mutex上锁, C++标准库中有std::lock
可以同时对两个锁加锁而不会产生死锁
code
1class some_big_object;
2void swap(some_big_object &lhs, some_big_object &rhs);
3
4class X
5{
6private:
7 some_big_object some_details;
8 std::mutex m;
9public:
10 X(some_big_object const &d) : some_detail(sd) {}
11 friend void swap(X &lhs, X&rhs)
12 {
13 if (&lhs == &rhs) return;
14 std::lock (lhs.m, rhs.m);
15 std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
16 std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
17 swap(lhs.some_detail, rhs.some_detail);
18 }
19};
函数std::adopt_lock
的含义是表示构造函数第一个参数中的锁已经锁上了, 再声明std::lock_guard
则是程序结束后释放锁
死锁问题还存在于两个线程互相调用join
, 都在等待对方执行完毕, 所以在对方线程有几率等待己方线程时不要等待对方线程
TIPS:
AVOID NESTED LOCKS(避免嵌套锁)
在已经保持了一个锁之后不要再请求一个锁
AVOID CALLING USER-SUPPLIED CODE WHILE HOLDING A LOCK(有锁时避免调用用户提供的代码)
由于不知道用户代码会做什么, 很可能导致死锁, 不要传闭包或者函数指针进去
ACQUIRE LOCKS IN A FIXED ORDER(以固定顺序获取锁)
对于两个锁或者多个锁, 每个线程中以固定顺序获取可以防止死锁
对于类似链表这种数据结构, 考虑每个节点上锁, 以逐节向上锁定的方式可以允许多线程访问链表, 但是必须以相同的顺序访问, 先获取后一个的锁然后释放前一个的锁, 对于删除则需要获取删除元素和删除元素两边的锁, 同样需要按照顺序锁定三个元素, 需要定义遍历顺序
USE A LOCK HIERARCHY(使用锁层次)
将应用程序分层, 并确认所有能够在任意层级被上锁的互斥元
code
1#include <mutex>
2
3class hierarchical_mutex
4{
5 std::mutex internal_mutex;
6 unsigned long const hierarchy_value;
7 unsigned long previous_hierarchy_value;
8 static thread_local unsigned long this_thread_hierarchy_value;
9
10 void check_for_heirarchy_violation() const
11 {
12 if (this_thread_hierarchy_value <= hierarchy_value) {
13 throw std::logic_error("mutex hierarchy violated");
14 }
15 }
16
17 void update_hierarchy_value()
18 {
19 previous_hierarchy_value = this_thread_hierarchy_value;
20 this_thread_hierarchy_value = hierarchy_value;
21 }
22
23public:
24 explicit hierarchical_mutex(unsigned long value) : hierarchy_value(value), previous_hierarchy_value(0)
25 {}
26
27 void lock()
28 {
29 check_for_heirarchy_violation();
30 internal_mutex.lock();
31 update_hierarchy_value();
32 }
33
34 void unlock()
35 {
36 this_thread_hierarchy_value = previous_hierarchy_value;
37 internal_mutex.unlock();
38 }
39
40 bool try_lock()
41 {
42 check_for_heirarchy_violation();
43 if (!internal_mutex.try_lock())
44 return false;
45 update_hierarchy_value();
46 return true;
47 }
48};
49
50// thread_local表示每个线程都会有一个该变量的副本
51thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
52
53hierarchical_mutex high_level_mutex(10000);
54hierarchical_mutex low_level_mutex(5000);
55
56int do_low_level_stuff();
57
58int low_level_func()
59{
60 std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
61 return do_low_level_stuff();
62}
63
64void high_level_stuff(int some_param);
65
66void high_level_func()
67{
68 std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
69 high_level_stuff(low_level_func());
70}
71
72void thread_a()
73{
74 high_level_func();
75}
76
77hierarchical_mutex other_mutex(100);
78void do_other_stuff();
79
80void other_stuff()
81{
82 high_level_func();
83 do_other_stuff();
84}
85
86void thread_b()
87{
88 std::lock_guard<hierarchical_mutex> lk(other_mutex);
89 other_stuff();
90}
线程a遵守规则, b不遵守. a先调用高层次API, 然后锁定了高层次锁, 给自己设置了高层次值(10000), 才能通过低层次的锁的验证, 从而获取低层次的锁; 而b先锁住了低层次的锁(100), 转而调用高层次API, 由于层次值低于(10000)被判定不能获取锁
层次锁的好处就是从上至下设置好层次值后, 通过多高的层次值调用会依次给下层设置更高的层次值从而锁定, 比如:
stateDiagram-v2 HIGH_API(1000)-->MID_API(100) HIGH1_API(1000)-->MID_API(100) MID_API(100)-->LOW_API(10)
在this_thread_hierarchy_value
为10000初始值下, 从上至下调用后锁变为
stateDiagram-v2 HIGH_API(10000)-->MID_API(1000) HIGH1_API(1000)-->MID_API(1000) MID_API(1000)-->LOW_API(100)
从而能够防止其他API比如HIGH1_API来调用MID_API, 退出后值又会返回原来的值
EXTENDING THESE GUIDELINES BEYOND LOCKS(把这些原则扩展到锁以为的地方去)
死锁不光会在有锁的地方发生, 也可能发生在循环等待(互相等待)中
使用std::unique_lock
在std::unique_lock
的第二个参数可以传入std::defer_lock
和std::adopt_lock
, 前者必须要求std::lock_guard
在构造时锁没锁上
1unique_lock(_Mutex& _Mtx, adopt_lock_t)
2 : _Pmtx(_STD addressof(_Mtx)), _Owns(true) {} // construct and assume already locked
3
4unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept
5 : _Pmtx(_STD addressof(_Mtx)), _Owns(false) {} // construct but don't lock
code
1#include <mutex>
2
3class some_big_object {};
4void swap(some_big_object &lhs, some_big_object &rhs);
5class X
6{
7private:
8 some_big_object some_detail;
9 std::mutex m;
10public:
11 X(some_big_object const &sd) : some_detail(sd) {}
12 friend void swap(X &lhs, X &rhs) {
13 if (&lhs == &rhs)
14 return;
15 std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
16 std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
17 std::lock(lock_a, lock_b);
18 swap(lhs.some_detail, rhs.some_detail);
19 }
20};
std::unique_lock
可移动不可复制, 允许函数返回std::unique_lock
给调用者
std::unique_lock
支持手动unlock
而不需要等到析构, 可以提升应用程序性能, 而std::lock_guard
仅在作用域范围内生效, 可以把std::unique_lock
看作类似std::unique_ptr
类似的功能
锁粒度在恰当的粒度
如果可能, 仅在实际访问共享数据的时候锁定互斥元, 尝试在锁的外面做任意数据处理, 否则由于多个线程同时等待同一个互斥元会耗费大量的时间
1void get_and_process_data()
2{
3 std::unique_lock<std::mutex> lk(mutex);
4 data d = get_next_data();
5 lk.unlock(); //解锁
6 result r = process(d);
7 lk.lock(); //重新上锁
8 write_result(r);
9}
在持有锁时, 避免诸如文件I/O等操作
一般情况下, 只应该以执行要求的操作所需的最小可能时间而去持有锁
反例:
code
1class Y
2{
3private:
4 int some_detail;
5 mutable std::mutex m;
6
7 int get_detail() const
8 {
9 std::lock_guard<std::mutex> lk(m);
10 return some_detail;
11 }
12
13public:
14 explicit Y(int sd) : some_detail(sd)
15 {}
16
17 friend bool operator==(Y const &lhs, Y const &rhs)
18 {
19 if (&lhs == &rhs) return true;
20 int const lhs_value = lhs.get_detail();
21 int const rhs_value = rhs.get_detail();
22 return lhs == rhs;
23 }
24};
在这段代码中, 当比较的结果返回时, 由于细粒度锁, 可能此时lhs的值已经改变了, 返回的结果也不是正确的, 这要看对当前比较函数的定义
Alternative facilities for protecting shared data
考察那种只有在初始化时才需要进行保护以后都是只读的数据
lazy initialization
1std::shared_ptr<some_resource> resource_ptr;
2void foo()
3{
4 if (!resource_ptr) {
5 resource_ptr = std::make_shared<some_resource>();
6 }
7 resource_ptr->do_something();
8}
通常的做法:
1std::shared_ptr<some_resource> resource_ptr;
2std::mutex resource_mutex;
3void foo()
4{
5 std::unique_lock<std::mutex> lk(resource_mutex);
6 if (!resource_ptr) {
7 resource_ptr = std::make_shared<some_resource>();
8 }
9 lk.unlock();
10 resource_ptr->do_something();
11}
如果改成:
1std::shared_ptr<some_resource> resource_ptr;
2std::mutex resource_mutex;
3void foo()
4{
5 if (!resource_ptr) {
6 std::unique_lock<std::mutex> lk(resource_mutex);
7 if (!resource_ptr) {
8 resource_ptr = std::make_shared<some_resource>();
9 }
10 }
11 resource_ptr->do_something();
12}
这种做法会导致数据竞争, 锁的外部调用resource_ptr和锁内部进行写入不同步
标准C++库提供std::call_once
保证函数只会调用一次
code
1#include <iostream>
2#include <thread>
3#include <mutex>
4
5std::once_flag flag1, flag2;
6
7void simple_do_once()
8{
9 std::call_once(flag1, []() {
10 std::cout << "Simple example: called once: "
11 << std::this_thread::get_id() << std::endl;
12 });
13}
14
15void may_throw_function(bool do_throw)
16{
17 if (do_throw) {
18 std::cout << "throw: call_once will retry\n"; // this may appear more than once
19 throw std::exception();
20 }
21 std::cout << "Didn't throw, call_once will not attempt again\n"; // guaranteed once
22}
23
24void do_once(bool do_throw)
25{
26 try {
27 std::call_once(flag2, may_throw_function, do_throw);
28 }
29 catch (...) {
30 }
31}
32
33int main()
34{
35 std::thread st1(simple_do_once);
36 std::thread st2(simple_do_once);
37 std::thread st3(simple_do_once);
38 std::thread st4(simple_do_once);
39 st1.join();
40 st2.join();
41 st3.join();
42 st4.join();
43
44 std::thread t1(do_once, true);
45 std::thread t2(do_once, true);
46 std::thread t3(do_once, false);
47 std::thread t4(do_once, true);
48 t1.join();
49 t2.join();
50 t3.join();
51 t4.join();
52}
结果
Simple example: called once: 13464
throw: call_once will retry
throw: call_once will retry
throw: call_once will retry
Didn't throw, call_once will not attempt again
std::call_once
很容易用于类成员延迟初始化
静态变量也可以避免初始化的竞争
1class my_class;
2my_class &get_instance()
3{
4 static my_class instance;
5 return instance;
6}
读写锁, 多个可以同时读取, 但是写入和读取互斥, 多个同时写入也是互斥
std::shared_mutex
C++17标准加入, 之前在boost里面, std::shared_mutex
用std::shared_lock
去锁可以做到多个读取不会互相竞争, 而写用std::lock_guard
和读取竞争即可
code
1#include <thread>
2#include <shared_mutex>
3#include <chrono>
4#include <iostream>
5
6using namespace std::chrono_literals;
7
8std::shared_mutex count_mutex;
9int count = 0;
10
11void increase_count()
12{
13 std::lock_guard<std::shared_mutex> lk(count_mutex);
14 count +=1;
15}
16
17int get_count(std::chrono::duration<long long int, std::ratio<1>> sec)
18{
19 std::shared_lock<std::shared_mutex> lk(count_mutex);
20 std::this_thread::sleep_for(sec);
21 return count;
22}
23
24void increase_thread(int c)
25{
26 for (int i = 0; i < c; ++i) {
27 increase_count();
28 std::this_thread::sleep_for(2.2s);
29 std::cout << "thread " << std::this_thread::get_id() << " increase" << std::endl;
30 }
31}
32
33void get_thread(int c)
34{
35 for (int i = 0; i < c; ++i) {
36 int n = get_count(1s);
37 std::cout << "thread " << std::this_thread::get_id() << " get " << n << std::endl;
38 }
39}
40
41
42int main()
43{
44 std::thread t1(increase_thread, 10);
45 std::thread t2(get_thread, 10);
46 std::thread t3(get_thread, 10);
47 t1.join();
48 t2.join();
49 t3.join();
50 return 0;
51}
递归锁std::recursive_mutex
, 可以在一个线程里面多次锁, 但是也要多次释放, 可以用std::lock_guard
进行处理
Synchronizing concurrent operations
任务同步, 条件变量(condition variables), 期值(future)
Waiting for an event or other condition
等待任务完成
- 不断查询状态
- 每次查询状态进行延时
- 采用条件变量
条件变量std::condition_variable
和std::condition_variable_any
code
1#include<mutex>
2#include<queue>
3
4struct data_chunk
5{
6};
7
8std::mutex mut;
9std::queue<data_chunk> data_queue;
10std::condition_variable data_cond;
11
12bool more_data_to_prepare()
13{ return true; }
14
15data_chunk prepare_data()
16{ return {}; }
17
18void process(data_chunk &data)
19{}
20
21bool is_last_chunk()
22{ return false; }
23
24void data_preparation_thread()
25{
26 while (!more_data_to_prepare()) {
27 data_chunk const data = prepare_data();
28 std::lock_guard<std::mutex> lk(mut);
29 data_queue.push(data);
30 data_cond.notify_one();
31 }
32}
33
34void data_processing_thread()
35{
36 while (true) {
37 std::unique_lock<std::mutex> lk(mut);
38 data_cond.wait(lk, [] { return !data_queue.empty(); });
39 data_chunk data = data_queue.front();
40 data_queue.pop();
41 lk.unlock();
42 process(data);
43 if (is_last_chunk())
44 break;
45 }
46}
1template< class Predicate >
2void wait( std::unique_lock<std::mutex>& lock, Predicate pred )
等价于
1while (!pred()) {
2 wait(lock);
3}
安全队列
code
1#include <memory>
2#include <queue>
3#include <condition_variable>
4
5class data_chunk {};
6
7template<typename T>
8class threadsafe_queue
9{
10private:
11 mutable std::mutex mut;
12 std::queue<T> data_queue;
13 std::condition_variable data_cond;
14public:
15 threadsafe_queue() = default;
16 threadsafe_queue(const threadsafe_queue &other)
17 {
18 std::lock_guard<std::mutex> lk(mut);
19 data_queue = other.data_queue;
20 }
21 threadsafe_queue &operator=(const threadsafe_queue &) = delete;
22
23 void push(T new_value)
24 {
25 std::lock_guard<std::mutex> lk(mut);
26 data_queue.push(new_value);
27 data_cond.notify_one();
28 }
29
30 bool try_pop(T &value)
31 {
32 std::lock_guard<std::mutex> lk(mut);
33 if (data_queue.empty()) {
34 return false;
35 }
36 value = data_queue.front();
37 data_queue.pop();
38 return true;
39 }
40 std::shared_ptr<T> try_pop()
41 {
42 std::lock_guard<std::mutex> lk(mut);
43 if (data_queue.empty()) {
44 return std::shared_ptr<T>();
45 }
46 std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
47 return res;
48 }
49
50 void wait_and_pop(T &value)
51 {
52 std::unique_lock<std::mutex> lk(mut);
53 data_cond.wait(lk, [this] { return !data_queue.empty();});
54 value = data_queue.front();
55 data_queue.pop();
56 }
57
58 std::shared_ptr<T> wait_and_pop()
59 {
60 std::unique_lock<std::mutex> lk(mut);
61 data_cond.wait(lk, [this] { return !data_queue.empty(); });
62 std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
63 return res;
64 }
65
66 bool empty() const
67 {
68 std::lock_guard<std::mutex> lk(mut);
69 return data_queue.empty();
70 }
71};
Waiting for one-off events with futures
使用future等待一次性事件