C++11 多线程入门

前言

本文不赘述操作系统中多线程的相关知识,只讲述了 C++11 中关于多线程库的一些使用和说明。

线程的创建及入口函数

线程的入口函数传入一共有三种方法:

  • 普通函数
  • 成员函数
  • 仿函数
#include <thread>
using namespace std;

// 1. 普通函数
void func(int arg){}

// 2. 成员函数
class Func{
public:
	void func(int arg){}
};

// 3. 仿函数
class Functor{
public:
	void operator()(int arg){}
};

int main(){
	int arg = 1;
	Func func;
	Functor functor;
	// 1. 普通函数传入函数地址和参数列表
	thread t1(func, arg);
	// 2. 成员函数除了传入函数地址,还需额外传入个类对象
	thread t2(&Func::func, &func, arg);
	// 3. 与 1 类似
	thread t3(functor, arg);

	return 0;
}

线程创建之后会自动开始执行入口函数,后续根据需要在主线程进行 joindetach

join 和 detach

join 是同步,会在当前线程阻塞,等待子线程执行完成再执行后续操作

detach 是异步,无需等待子线程执行完毕即可执行后续。但是需要注意,如果当前线程是主线程,主线程执行完毕会结束进程,此时子线程如果任务没执行完,会被强制结束。

detach 中的坑

由于 detach 之后,子线程和当前线程分离,所以如果传入参数是当前线程的局部变量,则需要小心,有可能当前线程已经结束,相关参数已经释放和销毁,子线程仍然使用则会产生错误。

线程入口函数的参数

当使用 thread 创建线程时,即使是传引用(参数需要是 const xx&)也会在创建线程时自动创建副本,但指针副本仍然是指向同一位置。所以创建线程传入指针需要慎重考虑,除非是同步。

另外,传参时不能隐式转换,此时对象构造在子线程开始时才会执行,此时可能当前线程用于构造对象的参数已经释放,行为未定义。

可能编译器不同,有不同的行为。经测试,在 gcc 8.1.0 中,创建线程时,构造参数会被复制一份,在线程入口函数开始时进行构造,即使此时父线程的参数已经被释放,仍然有复制后的参数,不影响程序行为。

当明确需要传入引用,避免复制时,可以使用 std::ref() 函数,明确是引用,将不会进行复制操作,安全由自己保证。

当传入智能指针时:

  • shared_ptr 会在创建线程时复制一份(即使父线程先结束,也不会影响这个副本,所以不用担心智能指针所指对象的析构)
  • unique_ptr 则需要使用 std::move(),并在父进程明确后续不会再使用这个指针

互斥锁

互斥锁是为了解决在多线程环境下对数据的读写问题,锁对性能多少会有影响,所以上锁的代码要尽可能少。

最简单的锁 mutex

管理成本高:需要手动 lockunlock,如果不成对,或者 lock 两次、忘记 unlock 都会造成问题。
当调用 lock 时,如果此时锁已经上锁,则线程阻塞在该句,直到锁被解开,是阻塞的。

#include <thread>
#include <iostream>
#include <mutex>
using namespace std;
mutex loc;
void test_thread(){
	loc.lock();
    cout << "1" << endl;
    cout << "2" << endl;
    cout << "3" << endl;
    loc.unlock();
}
int main(){
	thread t1(test_thread);
    thread t2(test_thread);
    t1.join();
    t2.join();

	return 0;
}

try_lock:和 lock 相比,多了个返回值且非阻塞,如果已经上过锁,则返回 false,否则返回 true 和正常 lock 行为一致。

自动加解锁 lock_guard

无需手动 lockunlock,其原理是构造时会执行一次 lock,析构时执行 unlock

mutex loc;
void test_thread(){
	lock_guard<mutex> lock(loc);
    cout << "1" << endl;
    cout << "2" << endl;
    cout << "3" << endl;
}

另外:当构造时加入参数 lock_guard<mutex> lock(loc, std::adopt_lock) 时,构造时不会对锁进行 lock 操作

多个互斥量的加解锁

在互斥量存在多个时,会存在死锁问题,标准库提供了 std::lock() 函数解决死锁问题,我们不用操心上锁的顺序问题,只有当所有锁都是解锁状态时,std::lock() 才会对所有锁上锁。(参数至少要有两个锁)

mutex loc1, loc2;
void test_thread(){
	lock(loc1, loc2);
	lock_guard<mutex> lock1(loc1, adopt_lock);
	lock_guard<mutex> lock2(loc2, adopt_lock);
    cout << "1" << endl;
    cout << "2" << endl;
    cout << "3" << endl;
}

unique_lock

功能更灵活的锁,但是性能差一些,构造时有四种选择:

  • 默认,和 lock_guard 行为一致
  • std::adopt_lock:此时和 lock_guard: adopt_lock 表现一致
  • std::try_to_lock:非阻塞式加锁,如果没锁上就加锁,锁上则没有行为,不阻塞,与 owns_lock() 配合使用
  • std::defer_lock:完全手动,不进行任何自动操作

功能函数:

  • lock()unlock()try_lock()mutex 都一致
  • release():释放当前锁的控制权,之后对 unique_lock 的任何操作都不会影响到原来所绑定的那个锁。

unique_ptr 一样,可以通过 std::move 转移控制权

mutex loc;
// 如果获得锁就打印,否则无行为
void test_thread(){
    unique_lock<mutex> ul(loc, try_to_lock);
    if(ul.owns_lock()){
        cout << "1" << endl;
        cout << "2" << endl;
        cout << "3" << endl;
    }
}

需要注意,在任何模式下,获得锁之后进行 try_lock 或 lock 都会导致死锁,所以在 lock 和 try_lock 之前,可以使用 owns_lock 判断是否取得锁。如果没有取得锁,可以使用 try_lock 非阻塞式获取锁,也可以使用 lock 阻塞获取锁。

超时锁 timed_mutex

try_lock_for(<time>):在等待 time 时间内拿到锁返回 true 否则返回 false
try_lock_until(<time>):直到时间拿到锁返回 true 否则返回 false

timed_mutex tloc;
void test_thread(){
    if(tloc.try_lock_for(chrono::seconds(2))){
        cout << "1" << endl;
        cout << "2" << endl;
        cout << "3" << endl;
        tloc.unlock();
    }
}

递归锁 recursive_lock

和普通锁不同的是可以在同线程中多次 lock,但需要保证 unlock 次数匹配

rucursive_timed_locktimed_mutex 类似,多了同线程可以多次 lock

多线程同步

函数只会调用一次 call_once

需要一个标记量,保证在多线程环境下,该函数只会调用一次,例如多线程下的单例模式的初始化。

once_flag flag;
void oncefunc(int arg){
    cout << "call once" << endl;
}
void test_thread(){
    call_once(flag, oncefunc, 0);
}

条件变量 condition_variable

由于不断地轮询互斥量会占用计算资源,条件变量可以配合互斥量使用,可以在线程之间实现更复杂的同步关系,也能节约计算资源,其构造参数只能接受 unique_lock

当执行 wait 时(此时锁必须是加锁状态),有两种情况

  1. wait 只有一个参数,则执行 wait 时会解开锁,并阻塞自身,等待其他线程调用 notify_one,并在此之后等待锁解开,锁解开则继续后续流程。
  2. wait 有传入调用对象参数(必须返回 bool),则会执行调用对象
    1. 如果调用对象返回 true,则继续控制锁,并执行后续流程
    2. 如果调用对象返回 false,则阻塞并释放锁,并等待其他线程调用 notify_one,当其他线程调用 notify_one 时,再执行调用对象
      1. 若返回 false 则继续阻塞等待 notify_one
      2. 若返回 true 则阻塞等待锁解锁,如果锁已经解锁则加锁并继续执行后续流程

假如当其他线程执行 notify_one 时,当前线程并不处在 wait 等待唤醒的状态,则 notify_one 无效。wait 中的函数调用并不是原子执行的,需要注意。

所有情况的例子如下:

// 所有例子中 main 都如下
int main(){
	thread t1(test_thread1);
    thread t2(test_thread2);
    t1.join();
    t2.join();

	return 0;
}

condition_variable cv;
mutex loc, print;

// 1. 只有一个参数
// 输出:
// test_thread2
// test_thread1
void test_thread1(){
    unique_lock<mutex> ul(loc);
    // 2. 先获得锁,但 wait 会释放锁,并等待 notify
    cv.wait(ul);
    cout << "test_thread1" << endl;
}
void test_thread2(){
    // 1. 先 sleep 保证 thread1 先执行
    this_thread::sleep_for(chrono::seconds(1));
    // 3. 获得锁并 notify,但此时并没有释放锁,所以 thread1 仍然会继续阻塞
    unique_lock<mutex> ul(loc);
    cv.notify_one();
    // 即使 sleep thread1 也不会执行,因为获得不到锁
    this_thread::sleep_for(chrono::seconds(1)); 
    cout << "test_thread2" << endl;
    // 4. 释放锁,thread1 获得锁,继续后续流程
}

// 2.1 返回 true
// 输出:
// test_thread1
// test_thread2
void test_thread1(){
    unique_lock<mutex> ul(loc);
    // 2. 先获得锁,wait 返回 true 继续占有锁
    cv.wait(ul, []{ return true; });
    this_thread::sleep_for(chrono::seconds(1));
    cout << "test_thread1" << endl;
}
void test_thread2(){
    // 1. 先 sleep 保证 thread1 先执行
    this_thread::sleep_for(chrono::seconds(1));
    unique_lock<mutex> ul(loc);
    cv.notify_one();    // 该例中实际上没什么用
    cout << "test_thread2" << endl;
}

// 2.2.1 返回 false 后还是 false
// 输出:
// test_thread2
// 卡死
void test_thread1(){
    unique_lock<mutex> ul(loc);
    // 2. 先获得锁,但 wait 会释放锁,并等待 notify
    // 4. 由于仍然返回 false,所以该流程无法走下去,程序卡死在这
    cv.wait(ul, []{ return false; });
    cout << "test_thread1" << endl;
}
void test_thread2(){
    // 1. 先 sleep 保证 thread1 先执行
    this_thread::sleep_for(chrono::seconds(1));
    unique_lock<mutex> ul(loc);
    // 3. notify
    cv.notify_one();
    cout << "test_thread2" << endl;
}

// 2.2.2 返回 false 后返回 true
// 输出:
// test_thread2
// test_thread1
bool temp = false;
void test_thread1(){
    unique_lock<mutex> ul(loc);
    // 2. 先获得锁,但 wait 会释放锁,并等待 notify
    // 5. 虽然返回了 true,但仍需等待锁解锁,在 thread2 释放锁之后继续流程
    cv.wait(ul, []{ return temp; });
    cout << "test_thread1" << endl;
}
void test_thread2(){
    // 1. 先 sleep 保证 thread1 先执行
    this_thread::sleep_for(chrono::seconds(1));
    unique_lock<mutex> ul(loc);
    // 3. 将条件值设为 true
    temp = true;
    cv.notify_one();
    this_thread::sleep_for(chrono::seconds(1));
    // 4. 由于没有释放锁,thread2 仍然继续流程
    cout << "test_thread2" << endl;
}

另外:notify_all 功能在有多个线程在 wait 的时候有用,在多个线程状态下,notify_one 是随机唤醒一个线程。(这里唤醒其实并不准确,真正是否唤醒还要取决于函数调用结果和锁的情况)

异步任务

异步任务和线程有一定区别,可以指定创建新线程执行或以同步方式执行

async 和 future

async 创建一个异步任务,需要传入一个入口函数(和创建线程类似),而 futureasync 的返回值,包装着函数的返回值。

当使用 future::get() 时,会对当前线程进行阻塞,直到对应 async 的函数返回了一个值

get 使用的是移动语义,所以不能多次使用,如果想要多次使用,可以使用 shared_future,可以多次调用 get,此时是复制语义。

#include <thread>
#include <future>

int test_func(int i){
    this_thread::sleep_for(chrono::seconds(1));
    cout << "test_func" << endl;
    return i;
}
int main(){
    cout << "start" << endl;
    // async(launch::async, ..) 开启一个新线程执行异步任务
    // async(launch::deferred, ..) 延迟至 future::get() 或 future::wait() 时执行,变成同步
    future<int> result = async(test_func, 1); // 默认模式,具体实现由具体库决定
    // 延迟一秒后获得数据
    cout << result.get() << endl;

	return 0;
}

另外:future 还有 wait 函数,除了没有返回值,和 get 一样

  • wait_for:等待一定的时间,之后返回函数执行状态
  • wait_until:等待直到该时间点,返回函数执行状态
返回常量 含义
future_status::deferred 函数仍未启动
future_status::ready 正常结束
future_status::timeout 已经开始执行但未结束

packaged_task

将可调用对象包装起来,便于进行各种异步执行,是一种仿函数(重载了 () 运算符),类中可以返回一个储存返回值的 future 对象,所以可以通过 packaged_task 获得线程入口函数的返回值。其本身也可以像普通函数一样直接调用,此时是同步的。

int test_func(int i){
    this_thread::sleep_for(chrono::seconds(1));
    cout << "test_func" << endl;
    return i;
}
int main(){
    packaged_task<int(int)> pt(test_func);
    thread t1(std::ref(pt), 1);
    t1.detach();
    // 即使是 detach,由于有 future::get 的存在,也会阻塞等待线程入口函数执行完毕
    future<int> result = pt.get_future();
    cout << result.get() << endl;

	return 0;
}

promise

除了线程入口函数返回值获取数据,可不可以在函数过程中也获得数据呢,使用 promise 就可以很方便地获得其他线程执行过程中的数据。

流程:在一个线程中向 promise 中写入值(promise::set_value()),在另一个线程中从 promise 中读出值(promise::get_future() 获得 future,通过 future 获取数据 )

void print_int(std::future<int> &fut) {
	// 阻塞等待 prom.set_value()
	int x = fut.get();
	std::cout << "value: " << x << '\n';
}

int main() {
	std::promise<int> prom;
	std::future<int> fut = prom.get_future();
	std::thread th1(print_int, std::ref(fut));
	prom.set_value(10);
	th1.join();
	return 0;
}

其他方法:

  • set_exception:向所关联的 future 抛出一个异常,停止阻塞
  • set_exception_at_thread_exit:当当前线程结束时才向 future 抛出异常
  • set_value_at_thread_exit:当当前线程结束时才向 future 设置值

promise 在使用 get_future 之后会关联一个 future,并与之共享状态,当 promise 设置相关值时,共享状态就会变成 ready,从而判断 future::get 是否需要阻塞,以及阻塞后何时恢复。所以,promise 和 future 是一一对应的关系,任何一个都禁止复制。

原子操作 atomic

原子操作也可以通过锁来实现,但是效率会慢很多,原子操作提供一种更高效的解决方式(无锁方式)

原子操作的对象是一个变量,可以是基础变量或是自定义对象,但自定义对象有一定的限制要求。

原子操作在重载运算符中有效,复杂运算可能失败,例如:atom = atom + 1,可能就没法实现原子操作。

// 稳定输出 2000000,如果非 atomic,其结果可能非预期
atomic<int> count;

void test_func(){
    for(int i = 0; i < 1000000; i++){
        count++;
    }
}

int main(){
    thread t1(test_func);
    thread t2(test_func);
    t1.join();
    t2.join();
    cout << count << endl;

    return 0;
}

其他:均是原子操作

  • 读数据:load
  • 写数据:store
上一篇 下一篇

评论 | 0条评论