CPP11 线程

在 C++ 11 之前, C++ 没有对线程提供语言级别的支持各种操作系统和编译器实现线程的方法不一样。

C++ 11 增加了线程以及线程相关的类,统一编程风格,简单易用,跨平台。

创建线程

#include <thread>

std::thread // 线程类

thread() noexcept;
// 默认构造函数,构造一个线程对象,不执行任何任务(不会创建/启动子线程)

template< class Function, class... Args >
explicit thread( Function&& f, Args&&... args );
// 创建线程对象,在线程中执行任务函数 fx 中的代码, args 是要传递给任务函数 fx 的参数。
// 任务函数 fx 可以是 普通函数、类的非静态成员函数,类的静态成员函数,匿名函数,仿函数。

thread(const thread&) = delete;
// 删除拷贝构造函数,不允许线程对象之间的拷贝

thread(thread&& other) noexcept;
// 移动构造函数,将线程 other 的资源所有权转移给新创建的线程对象。

// 赋值函数:
thread& operator= (thread&& other) noexcept;
thread& operator=(const other&) = delete;
// 线程的资源不能被复制,如果 other 是右值,会进行资源所有权的转移,如果 other 是左值,禁止拷贝。
  • 示例程序
// 线程使用

#include <thread>
#include <iostream>
#include <unistd.h>
#include <string>

using namespace std;

// 普通函数
void func(int n, const string &str) {
    int times = 10;
    string flag = "(普通函数)";
    
    for (int i = 0; i < times; i++) {
        cout << str << " " << n << " - " << (i + 1) << "/" << times << " " << flag << endl;
        sleep(1);
    }
}

// 用 lambda 函数创建线程
auto f =[](int n, const string &str) {
    int times = 10;
    string flag = "(匿名函数)";
    
    for (int i = 0; i < times; i++) {
        cout << str << " " << n << " - " << (i + 1) << "/" << times << " " << flag << endl;
        sleep(1);
    }
};

// 用仿函数创建线程
class myThread1 {
public:
    void operator()(int n, const string &str) {
        int times = 10;
        string flag = "(仿函数)";
        
        for (int i = 0; i < times; i++) {
            cout << str << " " << n << " - " << (i + 1) << "/" << times << " " << flag << endl;
            sleep(1);
        }
    }
};

// 用类的静态成员函数创建线程
class myThread2 {
    public:
    static void func(int n, const string &str) {
        int times = 10;
        string flag = "(类的静态成员函数)";
        
        for (int i = 0; i < times; i++) {
            cout << str << " " << n << " - " << (i + 1) << "/" << times << " " << flag << endl;
            sleep(1);
        }
    }
};

// 用类的普通成员函数创建线程
class myThread3 {
    public:
        void func(int n, const string &str) {
        int times = 10;
        string flag = "(类的普通成员函数)";
        
        for (int i = 0; i < times; i++) {
            cout << str << " " << n << " - " << (i + 1) << "/" << times << " " << flag << endl;
            sleep(1);
        }
    }
};

int main() {

    // 用普通函数创建线程
    thread t1(func, 1, "t1");
    thread t2(func, 2, "t2");
    thread t3(f, 3, "t3");
    thread t4(myThread1(), 4, "t4");
    thread t5(myThread2::func, 5, "t5");
    
    // 必须先创建类的对象,必须保证对象的生命周期比子线程要长。
    myThread3 my_th;
    
    // 第二个参数必须填对象的 this 指针
    thread t6(&myThread3::func, &my_th, 6, "t6");
    
    // 回收资源
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    t6.join();
    
    return 0;
}

回收线程资源

  • 先创建的子线程不一定跑得最快(程序的运行速度有很大的偶然性)。
  • 线程的任务函数返回后,子线程将终止。
  • 如果主程序(主线程)退出,(不论是正常退出还是意外终止),全部的子线程将强行北终止。

虽然同一个进程的多个线程共享进程的栈空间,但是,每个子线程在这个栈中拥有自己私有的栈空间。所以,线程结束时需要回收资源。

回收子线程资源有两种方法:

  • 在主程序中,调用 join() 成员函数等待子线程退出,回收它的资源。如果子线程已退出, join() 函数立即返回,否则会产生阻塞,直到子线程退出。
  • 在主线程中,调用 detach() 成员函数分离子线程,子线程退出时,系统将自动回收资源。分离后的线程不可 join()。 用 joinable() 函数可以判断子线程的分离状态,函数返回布尔类型。

示例程序:

#include <thread>
#include <iostream>
#include <unistd.h>
#include <string>

using namespace std;

// 普通函数
void func(int n, const string &str) {
    int times = 10;
    string flag = "(普通函数)";

    for (int i = 0; i < times; i++) {
        cout << str << " " << n << " - " << (i + 1) << "/" << times << " " << flag << endl;
        sleep(1);
    }
}

int main() {
    // 用普通函数创建线程
    thread t1(func, 1, "t1");
    t1.detach();

    sleep(12); // 主程序不能退出
    return 0;
}

线程的 (this_thread) 的全局函数

C++ 11 提供了命名空间 this_thread 来表示当前线程,该命名空间有四个函数,get_id(),sleep_for(),sleep_until(),yield()

#include <thread>

// 该函数用于获取线程 ID,thread 类也有同名的成员函数。
thread::id get_id() noexcept;

// 该函数让线程休眠一段实践。
template< class Rep, class Period >
void sleep_for( const std::chrono::duration<Rep, Period>& sleep_duration );

// 该函数让线程休眠到指定时间点。可实现定时任务。
template< class Clock, class Duration >
void sleep_until( const std::chrono::time_point<Clock,Duration>& sleep_time );

// 该函数让线程主动让出自己已经抢到的 CPU 时间片。
void yield() noexcept;

// 交换两个线程对象
void swap(std::thread& other);

// 返回硬件线程上下文的数量
static unsigned int hardware_concurrency() noexcept;

示例程序

// 线程使用
#include <thread>
#include <iostream>
#include <unistd.h>
#include <string>

using namespace std;

// 普通函数
void func(int n, const string &str) {
    int times = 3;
    string flag = "(普通函数)";
    cout << str << " thread's id: " << this_thread::get_id() << endl;
    
    for (int i = 0; i < times; i++) {
        cout << str << " " << n << " - " << (i + 1) << "/" << times << " " << flag << endl;
        // sleep(1);
        this_thread::sleep_for(chrono::seconds(1));
    }
}


int main() {
    // 用普通函数创建线程
    thread t1(func, 1, "t1");
    thread t2(func, 2, "t2");
    
    cout << "main thread id: " << this_thread::get_id() << endl;
    cout << "t1 thread id: " << t1.get_id() << endl;
    cout << "t2 thread id: " << t2.get_id() << endl;
    
    t1.swap(t2);
    cout << "(after swap) t1 thread id: " << t1.get_id() << endl;
    cout << "(after swap) t2 thread id: " << t2.get_id() << endl;
    
    thread t3 = std::move(t2);
    
    t1.join();
    // t2.join();
    t3.join();
    return 0;
}

线程 call_once 函数

在多线程环境中,某些函数只能被调用一次,例如:初始化某个对象,而这个对象只能被初始化一次。

在线程的任务函数中,可以用 std::call_once() 来保证某个函数只能被调用一次。

#include <mutex>
template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );
// once_flag 本质是取值为 0 和 1 的锁。

示例程序

#include <thread>
#include <iostream>
#include <unistd.h>
#include <string>
#include <mutex>

using namespace std;

once_flag onceFlag;

void once_func(const int bh, const string &str) {
    cout << "once_func() bh = " << bh << " ,str = " << str << endl;
}

// 普通函数
void func(int n, const string &str) {
    call_once(onceFlag, once_func, 3, "once");

    int times = 2;
    string flag = "(普通函数)";

    for (int i = 0; i < times; i++) {
        cout << str << " " << n << " - " << (i + 1) << "/" << times << " " << flag << endl;
        sleep(1);
    }
}

int main() {

    // 用普通函数创建线程
    thread t1(func, 1, "t1");
    thread t2(func, 2, "t2");

    t1.join();
    t2.join();
    return 0;
}

线程 native_handle 函数

C++ 定义了线程标准,不同的平台和编译器在实现的时候,本质上都是对操作系统的线程库进行封装,会损失一部分功能。

为了弥补 C++ 线程库的不足,thread 类提供了 native_handle() 成员函数,用于获取与操作系统相关的原生线程句柄, 操作系统原生的线程库就可以用原生线程句柄操作线程。

#include <thread>
#include <iostream>
#include <unistd.h>
#include <string>
#include <pthread.h>
using namespace std;

// 普通函数
void func(int n, const string &str) {
    int times = 10;
    string flag = "(普通函数)";

    for (int i = 0; i < times; i++) {
        cout << str << " " << n << " - " << (i + 1) << "/" << times << " " << flag << endl;
        sleep(1);
    }
}

int main() {

    // 用普通函数创建线程
    thread t1(func, 1, "t1");

    this_thread::sleep_for(chrono::seconds(5));
    pthread_t tid = t1.native_handle(); // 获取 linux 操作系统原生的线程句柄

    pthread_cancel(tid); // 取消线程

    t1.join();
    return 0;
}

线程安全

#include <thread>
#include <iostream>

using namespace std;

// int a = 0;
volatile int a = 0;

void func() {
    for (int i = 0; i < 1e6; i++)
        ++a;
}

int main() {
//    func();
//    func();  // a = 2000000

    thread t1(func);
    thread t2(func); // a = 10352
    
    t1.join();
    t2.join();
    cout << "a = " << a << endl;
    return 0;
}
  • 同一进程中的多个线程共享该进程中的全部系统资源。
  • 多个线程访问同一共享资源的时候会产生冲突。
  • 顺序性、可见性、原子性。

顺序性

  • 程序按照代码的先后顺序执行。
  • CPU 为了提高程序整体的执行效率,编译器可能会对代码进行优化,按更高校的顺序执行。
  • CPU 虽然不保证完全按照代码的顺序执行,但它会保证最终的结果和按代码顺序执行时的结果一致。

可见性

  • 线程操作共享变量时,会将该变量从内存加载到 CPU 缓存中,修改该变量后,CPU 会立即更新缓存,但不一定会立即将它写回内存。
  • 这个时候如果其他线程访问该变量,从内存中读到的是旧数据,而非第一个线程操作后的数据。
  • 当多个线程并发访问共享变量时,一个线程对共享变量的修改,其他线程能够立即看到。

原子性

  • CPU 执行指令:读取指令、读取内存、执行指令、写回内存。
  • i++, 1) 从内存中读取 i 的值、 2) 把 i+1、 3) 把结果写回内存。
  • 一个操作(有可能包含有多个步骤)要么全部执行(生效),要么全部都不执行(都不生效)。

如何保证线程安全?

  • volatile 关键字: 保证内存变量的可见性,禁止代码优化(重排序)。
  • 原子操作(原子类型)
  • 线程同步(锁)。

线程同步

线程同步是指多个线程协同工作,协商如何使用共享资源。

  • 互斥锁
  • 条件变量
  • 生产者、消费者模型

互斥锁

  • 加锁和解锁,确保同一时间只有一个线程访问共享资源。
  • 访问共享资源之前加锁,访问完成后释放锁。
  • 如果某线程只有锁,其它线程形成等待队列。

C++ 11 提供四种互斥锁。

#include <mutex>
// 四个类
mutex // 互斥锁
timed_mutex // 带超时机制的互斥锁
recursive_mutex // 递归互斥锁
recursive_timed_mutex // 带超时机制的递归互斥锁
  • mutex
  1. lock() 加锁。 互斥锁有锁定和未锁定两种状态。如果互斥锁是未锁定状态,调用 lock() 函数的线程会得到互斥锁的所有权,并将其上锁。 如果互斥锁是锁定状态,调用 lock() 函数的线程就会阻塞等待,知道互斥锁编程未锁定状态。

  2. unlock() 解锁。 只有持有锁的线程才能解锁

  3. trylock() 尝试加锁。 如果互斥锁是未锁定状态,则加锁成功,函数返回 true。如果互斥锁是锁定状态,则加锁失败,函数立即返回 false。 线程不会阻塞等待。例如有多个共享资源。

#include <thread>
#include <iostream>
#include <mutex>

using namespace std;

int a = 0;
mutex mtx;

void func() {
    for (int i = 0; i < 1000000; i++){
        mtx.lock();
        a++;
        mtx.unlock();
    }
}

int main() {
    thread t1(func);
    thread t2(func); // a = 2000000

    t1.join();
    t2.join();
    cout << "a = " << a << endl;
    return 0;
}
  • time_mutex 类(带超时机制的互斥锁)

增加了两个成员函数

bool try_lock_for(时间长度);
bool try_lock_until(时间点);
  • recursive_mutex 类
#include <iostream>
#include <mutex>
#include <thread>

using namespace std;

class Demo {
private:
    // std::mutex m_mtx;
    std::recursive_mutex m_mtx;
public:
    void func1() {
        m_mtx.lock();
        cout << "call func1." << endl;
        m_mtx.unlock();
    }

    void func2() {
        m_mtx.lock();
        func1();
        cout << "call func2." << endl;
        m_mtx.unlock();
    }
};

int main() {
    Demo demo;

    // demo.func1(); // call func1.
    demo.func2(); // wait
    // call func1.
    // call func2.
    return 0;
}

递归互斥锁允许同一线程多次获得互斥锁,可以解决同一线程多次加锁造成的死锁问题。

  • lock_guard 类

lock_guard 是模板类,可以简化互斥锁的使用,也更安全。

template< class Mutex >
class lock_guard{
    explicit lock_guard(Mutex& mutex);
};

lock_guard 在构造函数中加锁,在析构函数中解锁。 lock_guard 采用了 RAII 思想,(在类构造函数中分配资源,在析构函数中释放资源,保证资源在离开作用域时自动释放)。

#include <thread>
#include <iostream>
#include <mutex>

using namespace std;

int a = 0;
mutex mtx;

void func() {
    for (int i = 0; i < 1000000; i++){
        lock_guard<mutex> lock(mtx);
        a++;
    }
}

int main() {
    thread t1(func);
    thread t2(func); // a = 2000000

    t1.join();
    t2.join();
    cout << "a = " << a << endl;
    return 0;
}

条件变量与生产消费者模型

条件变量

  • 当条件不满足时,相关线程被抑制阻塞,直到某种条件出现,这些线程才会被唤醒。
  • 为了保护共享资源,条件变量需要和互斥锁结合一起使用。
  • 生产/消费者模型(高速缓存队列)。

条件变量是一种线程同步机制。当条件不满足时,相关线程一直被阻塞,直到某种条件出现,这些线程才会被唤醒。

C++ 11 的条件变量提供了两个类。

#include <condition_variable>
  • condition_variable: 只支持与普通 mutex 搭配,效率高。
  • condition_variable_any: 是一种通用的条件变量,可以与任意 mutex 搭配,(包括用户自定义的锁类型)。
// condition_variable 类
condition_variable() // 默认构造函数
condition_variable(const condition_variable &)=delete; // 禁止拷贝
condition_variable& condition_variable::operator=(const condition_variable &)=delete; // 禁止赋值
notify_one(); // 通知一个等待的线程
notify_all(); //通知全部等待的线程
wait(unique_lock<mutex> lock); // 阻塞当前线程,知道通知到达。
wait(unique_lock<mutex> lock, Pred pred); // 循环的阻塞当前线程,知道通知到达且谓词满足
wait_for(unique_lock<mutex> lock, 时间长度);
wait_for(unique_lock<mutex> lock, 时间长度, Pred pred);
wait_until(unique_lock<mutex> lock, 时间点);
wait_until(unique_lock<mutex> lock, 时间点, Pred pred);

条件变量的 wait(mutex) 函数作用:

  1. 把互斥锁解锁
  2. 阻塞,等待北唤醒
  3. 给互斥锁加锁

unique_lock 类

tmplate<class Mutex> class unique_lock 是模板类,模板参数为互斥锁类型。 unique_lock 和 lock_guard 都是管理锁的辅助类,都是 RAII 风格(在构造时获得锁,在析构时释放锁)。 他们的区别在于:为了配合 condition_variable, unique_lock 还有 lock()unlock() 成员函数。

//
// Created by larry on 23-4-10.
//

#include <iostream>
#include<string>
#include <thread>
#include <mutex>
#include <deque>
#include <queue>
#include <condition_variable>

using namespace std;

class Demo {
    // timed_mutex m_mutex;
    // condition_variable_any m_cond;

    mutex m_mutex; // 互斥锁
    condition_variable m_cond; // 条件变量
    queue<string, deque<string>> m_q; // 缓存队列,底层容器用 deque

public:
    void inCache(int num) {  // 生产数据,num 指定数据的个数
        lock_guard<mutex> lock(m_mutex);
        for (int i = 0; i < num; i++) {
            static int number = 1; // 编号
            auto message = std::to_string(number++) + "号";
            m_q.push(message); // 把生产出来的数据入队

        }
        // m_cond.notify_one(); // 唤醒一个被当前条件变量阻塞的线程。
        m_cond.notify_all(); // 唤醒全部被当前条件变量阻塞的线程。
    }

    void outCache() { // 消费者线程任务函数
        string message; // 存放出队的数据
        while (true) {
//            cout<<"thread: "<<this_thread::get_id()<<", 申请加锁..."<<endl;
            // 把互斥锁转换称 unique_lock<mutex> ,并申请加锁
            unique_lock<mutex> lock(m_mutex);
//            cout<<"thread: "<<this_thread::get_id()<<", 加锁成功..."<<endl;

            // this_thread::sleep_for(chrono::hours(1)); // 让线程休眠一小时

//            // 条件变量虚假唤醒:消费者线程被唤醒后,缓存队列中没有数据。
//            while (m_q.empty()) { // 如果队列非空,不进入循环,直接处理数据。必须用循环,不能用 if
//                // 等待生产者的唤醒信号
//                m_cond.wait(lock); // 1)把互斥锁解开,2)阻塞,等待被唤醒,3)给互斥锁加锁。
//                // cout << "线程:" << this_thread::get_id() << " 被唤醒了..." << endl;
//            }

            m_cond.wait(lock, [this] { return !m_q.empty(); });

            message = m_q.front();
            m_q.pop();
            cout << "get thread: " << this_thread::get_id() << ", " << message << endl;
            lock.unlock(); // 手动解锁,或者用作用域解锁

            // 处理出对的数据(把数据消费掉)。
            this_thread::sleep_for(chrono::microseconds(1)); // 假设处理数据需要 1 ms。
        }
    }
};

int main() {

    Demo demo;

    thread t1(&Demo::outCache, &demo); // 创建消费者线程
    thread t2(&Demo::outCache, &demo);
    thread t3(&Demo::outCache, &demo);

    this_thread::sleep_for(chrono::seconds(2));
    // demo.inCache(3); // 生产三个数据。
    demo.inCache(2); // 生产两个数据。

    this_thread::sleep_for(chrono::seconds(3));
    // demo.inCache(5);

    t1.join();
    t2.join();
    t3.join();
    return 0;
}

原子类型 atomic

C++ 11 提供论 atomic<T> 模板类(结构体),用于支持原子类型, 模板参数可以是 boolcharintlong long指针类型(不支持浮点类型和自定义数据类型)。

原子操作由 CPU 指令提供支持,它的性能比锁和消息传递更高,并且,不需要程序员处理加锁和释放锁的问题, 支持修改、读取、交换、比较并交换等操作。

#include <atomic>
#include <thread>
#include <iostream>

using namespace std;

atomic<int> a(0);

void func() {
    for (int i = 0; i < 1e6; i++)
        ++a;
}

int main() {
//    func();
//    func();  // a = 2000000

    thread t1(func);
    thread t2(func); //

    t1.join();
    t2.join();
    cout << "a = " << a << endl; // a = 2000000
    return 0;
}
atomic() noexcept = default; // 默认构造函数
atomic(T val) noexcept; // 转换函数
atomic(const atomic&) = delete; // 禁用拷贝构造函数
atomic& operator=(const atomic&) = delete; // 禁用赋值构造函数
void store(T val) oexcept; // 把 val 的值存入原子变量
T load() noexcept; // 读取原子变量中的值
T fetch_add(T val) noexcept; // 把原子变量个值和 val 相加,返回原值。
T fetch_sub(T val) noexcept; // 把原子变量的值减 val,返回原值。
T exchange(T val) noexcept; // 把 val 的值存入原子变量,返回原值。

// 比较原子变量的值和预期值 except,如果两个值相等,把 val 存储到原子变量中,函数返回 true,
// 如果两个值不相等,用原子变量的值更新预期值,函数返回 false。 CAS 指令。
T compare_exchagne_strong(T &expect, T val) noexcept;
bool is_lock_free(); // 查询某原子类型的操作是直接用 CPU 指令(返回 true),还是编译器内部的锁 返回 false 。
  • atomic 模板类重载论整数操作的各种运算符。
  • atomic 模板类的模板参数支持指针,但不代表它所指向的对象是原子类型。
  • 原子整型可以用作计数器,布尔型可以用作开关。
  • CAS 指令是实现无锁队列的基础
//
// Created by larry on 23-4-10.
//
#include <iostream>
#include <atomic>
#include <thread>

using namespace std;

int n = 0;
atomic<int *> aa(&n);

void func() {
    for (int i = 0; i < 1e6; i++)
        (*aa)++;
}

int main() {

    atomic<int> a(3);

    cout << "a = " << a.load() << endl; // a = 3
    a.store(8);
    cout << "a = " << a.load() << endl; // a = 8

    int old;
    old = a.fetch_add(5);
    cout << "old = " << old << " , a = " << a.load() << endl; // old = 8 , a = 13

    old = a.fetch_sub(2);
    cout << "old = " << old << " , a = " << a.load() << endl; // old = 13 , a = 11

    atomic<int> ii(3); // 原子变量
    int expect = 3; // 期待值
    int val = 5; // 打算存入原子变量的值
    // 比较原子变量的值和预期值 except,
    // 如果两个值相等,把 val 存储到原子变量中
    // 如果两个值不相等,用原子变量的值更新预期值
    // 执行存储操作时返回 true,否则返回 false。
    bool bret = ii.compare_exchange_strong(expect, val);
    cout << "bret = " << bret << endl; // bret = 1
    cout << "ii = " << ii << endl; // ii = 5
    cout << "expect = " << expect << endl; // expect = 3


    thread t1(func);
    thread t2(func);
    t1.join();
    t2.join();
    cout << "aa = " << *aa << endl; // aa = 1025351
    return 0;
}

参考连接

该笔记是看视频时做的笔记。