C++ 多线程编程(四):生产者消费者模型


C++ 多线程编程(四):生产者消费者模型

1. 生产者消费者模型简介

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品;当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

简单来说,这个模型是由两类线程构成:

  • 生产者线程:“生产”产品,并把产品放到一个队列里;
  • 消费者线程:“消费”产品,具体是从队列中取出产品。

为什么使用生产者消费者模型?

在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据,否则新生产的数据也没有地方存放;同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者,不然没有数据消费者也没办法消费。

为了达到生产者生产数据和消费者消费数据之间的平衡,就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模式。

2. 实现生产者消费者模型

生产者消费者模型可以分为:单生产者-单消费者模型,单生产者-多消费者模型,多生产者-单消费者模型,多生产者-多消费者模型。我这里主要介绍单生产者-单消费者模型,其他模型可以参考下面几个网址:

顾名思义,单生产者-单消费者模型就是只有一个生产者、一个消费者的情况,这是最简单的生产者消费者模型。实现代码如下所示:

#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>

std::deque<int> q;
std::mutex mu;
std::condition_variable cond;

void function_1()
{
    int count = 10;
    while (count > 0)
    {
        std::unique_lock<std::mutex> locker(mu);
        q.push_front(count);
        locker.unlock();
        cond.notify_one();  // Notify one waiting thread, if there is one.
        count--;
    }
}

void function_2()
{
    int data = 0;
    while (data != 1)
    {
        std::unique_lock<std::mutex> locker(mu);
        while (q.empty())
            cond.wait(locker); // Unlock mu and wait to be notified
        data = q.back();
        q.pop_back();
        locker.unlock();
        std::cout << "t2 got a value from t1: " << data << std::endl;
    }
}

int main()
{
    std::thread t1(function_1);
    std::thread t2(function_2);
    t1.join();
    t2.join();
    return 0;
}

上面的代码有三个注意事项:

  1. 在function_2中,在判断队列是否为空的时候,使用的是while(q.empty()),而不是if(q.empty()).

    这是因为wait()从阻塞到返回,不一定就是由于notify_one()函数造成的,还有可能由于系统的不确定原因唤醒(可能和条件变量的实现机制有关),这个的时机和频率都是不确定的,被称作伪唤醒,如果在错误的时候被唤醒了,执行后面的语句就会错误,所以需要再次判断队列是否为空,如果还是为空,就继续wait()阻塞。

  2. 在管理互斥锁的时候,使用的是std::unique_lock而不是std::lock_guard,而且事实上也不能使用std::lock_guard

    这需要先解释下wait()函数所做的事情。可以看到,在wait()函数之前,使用互斥锁保护了,如果wait的时候什么都没做,岂不是一直持有互斥锁?那生产者也会一直卡住,不能够将数据放入队列中了。所以,wait()函数会先调用互斥锁的unlock()函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作。而lock_guard没有lock和unlock接口,而unique_lock提供了。这就是必须使用unique_lock的原因。

  3. 使用细粒度锁,尽量减小锁的范围。

    在notify_one()的时候,不需要处于互斥锁的保护范围内,所以在唤醒条件变量之前可以将锁unlock()。还可以将cond.wait(locker);换一种写法,wait()的第二个参数可以传入一个函数表示检查条件,这里使用lambda函数最为简单,如果这个函数返回的是true,wait()函数不会阻塞会直接返回,如果这个函数返回的是false,wait()函数就会阻塞着等待唤醒,如果被伪唤醒,会继续判断函数返回值。

3. 实际应用生产者消费者模型

这里介绍一个个人认为比较好用的编程方法。

在我们的实际开发中,比如在SLAM中,可能是定位一个线程,建图一个线程,定位线程会计算出目前的位置,而建图线程需要根据这个位置来建立地图。

如果采用上面这样的方式,代码就不是很简洁,并且你需要保证一些变量是全局的,两个线程都能够接触到,这样零零碎碎的变量比较多,管理起来很麻烦。

这里我推荐构建一个生产者消费者模型的类,并且最好是使用单例模式的模板类。这样的话就可以非常简单的在两个线程中通信,并且后续如果要对通信方式进行修改,也不需要修改到定位和建图的代码,封装性更好!

下面是一个简单的例子:

#ifndef THREAD_TEST_DATATRANS_H
#define THREAD_TEST_DATATRANS_H

#include <list>
#include <mutex>
#include <condition_variable>

#define Trans DataTrans<int>::Instance()

template<typename T>
class DataTrans
{
private:
    std::list<T> m_queue;//队列
    std::mutex m_mutex;//全局互斥锁
    std::condition_variable_any m_notEmpty;//全局条件变量(不为空)
    std::condition_variable_any m_notFull;//全局条件变量(不为满)
    int m_maxSize{};//队列最大容量

private:
    //队列为空
    bool isEmpty() const
    {
        return m_queue.empty();
    }

    //队列已满
    bool isFull() const
    {
        return m_queue.size() == m_maxSize;
    }

    //构造函数
    DataTrans()
    {
        this->m_maxSize = 30;
    }

public:

    // 获取单实例对象
    static DataTrans &Instance()
    {
        /**
         * 局部静态特性的方式实现单实例。
         * 静态局部变量只在当前函数内有效,其他函数无法访问。
         * 静态局部变量只在第一次被调用的时候初始化,也存储在静态存储区,生命周期从第一次被初始化起至程序结束止。
         */
        static DataTrans instance;
        return instance;
    }

    void product(const T &v)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        while (isFull())
        {
            //生产者等待"产品队列缓冲区不为满"这一条件发生.
            m_notFull.wait(m_mutex);
        }
        m_queue.push_back(v);
        locker.unlock();
        m_notEmpty.notify_one();
    }

    void consumption(T &v)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        while (isEmpty())
        {
            // 消费者等待"产品队列缓冲区不为空"这一条件发生.
            m_notEmpty.wait(m_mutex);
        }
        //在队列里面消费一个元素,同时通知队列不满这个信号量
        v = m_queue.front();
        m_queue.pop_front();
        locker.unlock();
        m_notFull.notify_one();
    }

};

#endif //THREAD_TEST_DATATRANS_H

使用上面这个类的简单示例:

#include <thread>
#include <iostream>
#include "DataTrans.h"

void function_1()
{
    int count = 300;
    while (count > 0)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        Trans.product(count);
        std::cout << "product:\t" << count << std::endl;
        count--;
    }
}

void function_2()
{
    int data = 0;
    while (data != 1)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        Trans.consumption(data);
        std::cout << data << "\tis consumed!" << std::endl;
    }
}

int main()
{
    std::thread t1(function_1);
    std::thread t2(function_2);
    t1.join();
    t2.join();
    return 0;
}

文章作者: Immortalqx
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Immortalqx !
评论
  目录