站点建设网站网奇seo培训官网
目录
1.模型前提
2.阻塞队列(消费场所)
3. 实验
4.有关效率
1.模型前提
以单生产者对单消费者为例子:
前提一:有一个缓冲区作为消费场所。
前提二:有两种功能不同的线程分别具有消费与生产的能力。
前提三:生产者与生产者之间有互斥的关系,消费者与消费者之间有互斥的关系,生产者与消费者之间同时具有互斥与同步的关系。
2.阻塞队列(消费场所)
阻塞队列与普通队列最大的不同是:当队列满时生产者不能继续生产,当队列空时消费者不能继续消费。
3. 实验
来展示一份多线程工作遵循生产者消费者模型的代码。
部分1:设置消费的模板(task.hpp)
#pragma once#include <iostream>
#include <functional>typedef std::function<int(int, int)> func_t;//c++11特性打包器class Task
{public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}int operator ()() //仿函数{return func_(x_, y_);}
public:int x_;int y_;func_t func_;
};
部分2:采用raii的方式设置锁的处置方式(lockGuard.hpp)
#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}void lock() {std::cout << "要进行加锁" << std::endl;pthread_mutex_lock(pmtx_);}void unlock(){std::cout << "要进行解锁" << std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}
private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public://构造时加锁//析构时解锁lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}
private:Mutex mtx_; //锁
};
这里解释一下为什么要加锁,因为要维护前提三,也就是要维护互斥的关系。
部分3:阻塞队列这个数据结构
#pragma once#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"const int gDefaultCap = 5;template <class T>
class BlockQueue
{
private://判断队列的空与满bool isQueueEmpty(){return bq_.size() == 0;}bool isQueueFull(){return bq_.size() == capacity_;}public:BlockQueue(int capacity = gDefaultCap) : capacity_(capacity){pthread_mutex_init(&mtx_, nullptr);pthread_cond_init(&Empty_, nullptr);pthread_cond_init(&Full_, nullptr);}void push(const T &in) // 生产者{lockGuard lockgrard(&mtx_); // 自动调用构造函数//不停的检查直到队列非满while (isQueueFull())pthread_cond_wait(&Full_, &mtx_); //等待时会自动解锁,消费者可以消费bq_.push(in); //非满时可以插入任务(数据)pthread_cond_signal(&Empty_); //既然插入的数据,那么队列为空这个条件变量就可以解除了。} // 自动调用lockgrard 析构函数void pop(T *out){lockGuard lockguard(&mtx_);while (isQueueEmpty())pthread_cond_wait(&Empty_, &mtx_);*out = bq_.front();bq_.pop();pthread_cond_signal(&Full_);}~BlockQueue(){pthread_mutex_destroy(&mtx_);pthread_cond_destroy(&Empty_);pthread_cond_destroy(&Full_);}private:std::queue<T> bq_; // 阻塞队列int capacity_; // 容量上限pthread_mutex_t mtx_; // 通过互斥锁保证队列安全pthread_cond_t Empty_; // 用它来表示bq 是否空的条件pthread_cond_t Full_; // 用它来表示bq 是否满的条件
};
部分4:主程序部分,创建线程。
#include "BlockQueue.hpp"
#include "Task.hpp"#include <pthread.h>
#include <unistd.h>
#include <ctime>int myAdd(int x, int y) //设置了消费方式
{return x + y;
}void* consumer(void *args) //消费者
{BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;while(true){// 获取任务Task t;bqueue->pop(&t);// 完成任务std::cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;// sleep(1);}return nullptr;
}void* productor(void *args) //生产者
{BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;while(true){// 制作任务 -- 不一定是从生产者来的int x = rand()%10 + 1;usleep(rand()%1000);int y = rand()%5 + 1;Task t(x, y, myAdd);// 生产任务bqueue->push(t);// 输出消息std::cout <<pthread_self() <<" productor: "<< t.x_ << "+" << t.y_ << "=?" << std::endl;sleep(1);}return nullptr;
}int main()
{//生成随机数种子srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);BlockQueue<Task> *bqueue = new BlockQueue<Task>();pthread_t c[2],p[2];//创建线程pthread_create(c, nullptr, consumer, bqueue);pthread_create(c + 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p + 1, nullptr, productor, bqueue);//销毁线程pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}
结果:
没有问题,生产者生产一次消费者就消费一次,并且在这之间存在了加解锁的过程。
4.有关效率
有人可能会疑问“明明还是加锁了,那不是没有提升效率嘛”。
由于生产者与消费之间存在消费场所,就可以做到生产者生产的同时消费者从消费场所拿走数据进行消费。因此提升的不是生产者与消费者之间传递数据的速度,而是提升了生产者生产数据的效率与消费者消费数据的效率。