C++ 多线程编程(四):生产者消费者模型
1. 生产者消费者模型简介
生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品;当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
简单来说,这个模型是由两类线程构成:
- 生产者线程:“生产”产品,并把产品放到一个队列里;
- 消费者线程:“消费”产品,具体是从队列中取出产品。
为什么使用生产者消费者模型?
在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据,否则新生产的数据也没有地方存放;同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者,不然没有数据消费者也没办法消费。
为了达到生产者生产数据和消费者消费数据之间的平衡,就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模式。
2. 实现生产者消费者模型
生产者消费者模型可以分为:单生产者-单消费者模型,单生产者-多消费者模型,多生产者-单消费者模型,多生产者-多消费者模型。我这里主要介绍单生产者-单消费者模型,其他模型可以参考下面几个网址:
顾名思义,单生产者-单消费者模型就是只有一个生产者、一个消费者的情况,这是最简单的生产者消费者模型。实现代码如下所示:
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
void function_1()
{
int count = 10;
while (count > 0)
{
std::unique_lock<std::mutex> locker(mu);
q.push_front(count);
locker.unlock();
cond.notify_one(); // Notify one waiting thread, if there is one.
count--;
}
}
void function_2()
{
int data = 0;
while (data != 1)
{
std::unique_lock<std::mutex> locker(mu);
while (q.empty())
cond.wait(locker); // Unlock mu and wait to be notified
data = q.back();
q.pop_back();
locker.unlock();
std::cout << "t2 got a value from t1: " << data << std::endl;
}
}
int main()
{
std::thread t1(function_1);
std::thread t2(function_2);
t1.join();
t2.join();
return 0;
}
上面的代码有三个注意事项:
在function_2中,在判断队列是否为空的时候,使用的是
while(q.empty())
,而不是if(q.empty())
.这是因为wait()从阻塞到返回,不一定就是由于notify_one()函数造成的,还有可能由于系统的不确定原因唤醒(可能和条件变量的实现机制有关),这个的时机和频率都是不确定的,被称作伪唤醒,如果在错误的时候被唤醒了,执行后面的语句就会错误,所以需要再次判断队列是否为空,如果还是为空,就继续wait()阻塞。
在管理互斥锁的时候,使用的是
std::unique_lock
而不是std::lock_guard
,而且事实上也不能使用std::lock_guard
。这需要先解释下wait()函数所做的事情。可以看到,在wait()函数之前,使用互斥锁保护了,如果wait的时候什么都没做,岂不是一直持有互斥锁?那生产者也会一直卡住,不能够将数据放入队列中了。所以,wait()函数会先调用互斥锁的unlock()函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作。而lock_guard没有lock和unlock接口,而unique_lock提供了。这就是必须使用unique_lock的原因。
使用细粒度锁,尽量减小锁的范围。
在notify_one()的时候,不需要处于互斥锁的保护范围内,所以在唤醒条件变量之前可以将锁unlock()。还可以将
cond.wait(locker);
换一种写法,wait()的第二个参数可以传入一个函数表示检查条件,这里使用lambda函数最为简单,如果这个函数返回的是true,wait()函数不会阻塞会直接返回,如果这个函数返回的是false,wait()函数就会阻塞着等待唤醒,如果被伪唤醒,会继续判断函数返回值。
3. 实际应用生产者消费者模型
这里介绍一个个人认为比较好用的编程方法。
在我们的实际开发中,比如在SLAM中,可能是定位一个线程,建图一个线程,定位线程会计算出目前的位置,而建图线程需要根据这个位置来建立地图。
如果采用上面这样的方式,代码就不是很简洁,并且你需要保证一些变量是全局的,两个线程都能够接触到,这样零零碎碎的变量比较多,管理起来很麻烦。
这里我推荐构建一个生产者消费者模型的类,并且最好是使用单例模式的模板类。这样的话就可以非常简单的在两个线程中通信,并且后续如果要对通信方式进行修改,也不需要修改到定位和建图的代码,封装性更好!
下面是一个简单的例子:
#ifndef THREAD_TEST_DATATRANS_H
#define THREAD_TEST_DATATRANS_H
#include <list>
#include <mutex>
#include <condition_variable>
#define Trans DataTrans<int>::Instance()
template<typename T>
class DataTrans
{
private:
std::list<T> m_queue;//队列
std::mutex m_mutex;//全局互斥锁
std::condition_variable_any m_notEmpty;//全局条件变量(不为空)
std::condition_variable_any m_notFull;//全局条件变量(不为满)
int m_maxSize{};//队列最大容量
private:
//队列为空
bool isEmpty() const
{
return m_queue.empty();
}
//队列已满
bool isFull() const
{
return m_queue.size() == m_maxSize;
}
//构造函数
DataTrans()
{
this->m_maxSize = 30;
}
public:
// 获取单实例对象
static DataTrans &Instance()
{
/**
* 局部静态特性的方式实现单实例。
* 静态局部变量只在当前函数内有效,其他函数无法访问。
* 静态局部变量只在第一次被调用的时候初始化,也存储在静态存储区,生命周期从第一次被初始化起至程序结束止。
*/
static DataTrans instance;
return instance;
}
void product(const T &v)
{
std::unique_lock<std::mutex> locker(m_mutex);
while (isFull())
{
//生产者等待"产品队列缓冲区不为满"这一条件发生.
m_notFull.wait(m_mutex);
}
m_queue.push_back(v);
locker.unlock();
m_notEmpty.notify_one();
}
void consumption(T &v)
{
std::unique_lock<std::mutex> locker(m_mutex);
while (isEmpty())
{
// 消费者等待"产品队列缓冲区不为空"这一条件发生.
m_notEmpty.wait(m_mutex);
}
//在队列里面消费一个元素,同时通知队列不满这个信号量
v = m_queue.front();
m_queue.pop_front();
locker.unlock();
m_notFull.notify_one();
}
};
#endif //THREAD_TEST_DATATRANS_H
使用上面这个类的简单示例:
#include <thread>
#include <iostream>
#include "DataTrans.h"
void function_1()
{
int count = 300;
while (count > 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Trans.product(count);
std::cout << "product:\t" << count << std::endl;
count--;
}
}
void function_2()
{
int data = 0;
while (data != 1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
Trans.consumption(data);
std::cout << data << "\tis consumed!" << std::endl;
}
}
int main()
{
std::thread t1(function_1);
std::thread t2(function_2);
t1.join();
t2.join();
return 0;
}