网站开发的背景和意义,站牛网,门户网站建设专业,网站怎么做微信扫描登录网站目录
一、阻塞队列的简单介绍
二、生产者消费者模型
1、举个栗子#xff1a;
2、引入生产者消费者模型的意义#xff1a;
#xff08;1#xff09;解耦合
#xff08;2#xff09;削峰填谷
三、模拟实现阻塞队列
1、阻塞队列的简单介绍
2、实现阻塞队列
#…目录
一、阻塞队列的简单介绍
二、生产者消费者模型
1、举个栗子
2、引入生产者消费者模型的意义
1解耦合
2削峰填谷
三、模拟实现阻塞队列
1、阻塞队列的简单介绍
2、实现阻塞队列
1实现普通队列
2加上线程安全
3加上阻塞功能
3、运用阻塞队列的生产者消费者模型
都看到这了点个赞再走吧谢谢谢谢谢 一、阻塞队列的简单介绍
首先我们都知道队列是先进先出的一种数据结构而阻塞队列是基于队列做了一些扩展在多线程有就非常有意义了
阻塞队列的特性
1是线程安全的
2具有阻塞的特性 ①当队列满了这时不能往队列里放数据就会阻塞等待等队列的数据出队列后这时队列没满才能放数据。 ②当队列空了这时不能拿队列里的数据就会阻塞等待等有数据如队列了这时队列不为空才能拿数据。
这里阻塞队列的用处非常大基于阻塞队列的功能就可以实现 “生产者消费者模型”。 二、生产者消费者模型
生产者消费者模型是一种很朴素的概念描述的是一种多线程编程的方法。
1、举个栗子
一家人包饺子首先得和面和完面就要开始擀饺子皮了然后才开始包饺子这里因为一般家里也只有一个擀面杖所以只能一个人擀饺子皮其余的家人就会帮忙包饺子假设一个人擀饺子皮2个人包饺子那么擀饺子皮的人就是生产者包饺子的人就是消费者这也就是消费者生产者模型。
一个桌子能发饺子皮的数量是有限的当擀饺子皮的人比较快桌子放满饺子皮后就要等包饺子的人消耗一些饺子皮来包饺子才能继续擀饺子皮而这也类似阻塞队列中队里满的情况。当包饺子的人包的比较快桌子上的饺子皮都没了就要等擀饺子皮的人擀了饺子皮后才能继续包饺子而这也类似阻塞队列空的情况。不同的人分工不同也类似多线程各自干各自的事情。
2、引入生产者消费者模型的意义
1解耦合 引入生产者消费者模型就能更好的做到解耦合把代码的耦合程度从高降低就称为解耦合 在实际开发中会涉及到 “分布式系统” 服务器的整个功能不是又一个服务器完成实现的而是由多个服务器各自实现各自的一部分功能再通过网络通信把这些服务器联系起来最终完成整个服务器的功能。粗糙流程如图 而这时入口服务器与A、B服务器的联系是密切相关的请求要经过入口服务器才能传达给A、B服务器再A、B服务器拿到想要的数据再返回给入口服务器通过入口服务器再把响应传给客户端。如果是这样那如果请求突然骤升这时超过入口服务器接收请求的峰值这时入口服务器就挂了入口服务器挂了后A、B服务器拿不到请求也会挂掉这就体现了入口服务器和A、B服务器的耦合性比较高。 当我们在入口服务器和A、B服务器之间引入阻塞队列时如图 这时如果入口服务器挂了但是阻塞队列中还有请求的数据至少不会因入口服务器挂了A、B服务器也挂了这样入口服务器和A、B服务器的耦合性也就降低了。 2削峰填谷 如图当客户端这边的请求突然骤增时入口服务器都是比较能抗压的但是也是有极限的这时我们引入阻塞队列就可以把这些请求数据都放进阻塞队列中形成一个缓冲区这样即使外面的请求达到了峰值也是由阻塞队列来承担这样就形成了削峰填谷的效果。 注意这时的阻塞队列是基于阻塞队列这一数据结构而实现的服务器程序所以也叫消息队列 三、模拟实现阻塞队列
1、阻塞队列的简单介绍 在java标准库中提供了现成的阻塞队列这一数据结构如图 是基于队列扩展而来的队列有的它也有我们知道入队列时可以用offer方法出队列时可以用poll方法在阻塞队列中也有这两个方法但是这两个方法是不带阻塞功能的其中在阻塞队列中put是在阻塞功能的入队列take方法是带阻塞功能的出队列。 代码案例 public class TestDemo1 {public static void main(String[] args) throws InterruptedException {BlockingQueueString blockingQueue new ArrayBlockingQueue(10);blockingQueue.put(aaa);String s1 blockingQueue.take();System.out.println(第一个打印s1 s1);s1 blockingQueue.take();System.out.println(第二个打印s1 s1);}
}执行结果 线程卡住不动了原因是想要第二次出队列时队列是空的所以要等队列有元素入队列时才能出队列也就是说这是带有阻塞功能的。 2、实现阻塞队列
阻塞队列是通过循环队列实现的而队列是依靠数组来实现的这里的阻塞队列我们只模拟实现其中的put和take方法。
1实现普通队列 代码 // 为了简单, 不写作泛型的形式. 考虑存储的元素就是单纯的 String
class MyBlockingQueue {private String elems[] null;private int head 0;//记录头结点private int tail 0;//记录尾结点private int size 0;//队列元素个数//构造方法定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems new String[capacity];}//入队列public void put(String elem) {//判断容量满了没满了就不能入队列要阻塞等待if(size this.elems.length) {//阻塞等待先不写先实现普通功能的队列return;}//入队列elems[tail] elem;tail;//因为是循环队列所以要判断尾巴有没有超过容量大小下标超过了就要从0开始了if(tail elems.length) {tail 0;}//队列元素要size;}//出队列public String take() {String elem null;//要判断队列是不是空的空就不能出队列了要阻塞等待if(size 0) {//阻塞等待因为是先实现普通队列的功能所以后面再补充return null;}elem elems[head];head;//因为是循环队列所以要判断头结点有没有超过容量大小下标超过了就要0开始了if(head elems.length) {head 0;}//出队列后队列元素要--size--;return elem;}
} 每个步骤说明代码中有注释。 测试一下可不可以用如图 是可以用的这样普通的队列就已经搞好了 2加上线程安全 我们想想put和take里面要给哪里上锁首先写操作肯定是要加锁的因为多线程同时执行写操作肯定是线程不安全的也就是下面这段代码 接下来我们讨论一下这两个代码要不要加锁以take为例如图 我们画一下图会比较好理解 如果size -1是不符合我们预期的size最小也只可能是0不可能是-1所以我要上面的判断条件也要加锁。 而put也一样判断条件也要加锁。 代码 class MyBlockingQueue {Object locker new Object();private String elems[] null;private int head 0;//记录头结点private int tail 0;//记录尾结点private int size 0;//队列元素个数//构造方法定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems new String[capacity];}//入队列public void put(String elem) {synchronized (locker) {//判断容量满了没满了就不能入队列要阻塞等待if(size this.elems.length) {//阻塞等待先不写先实现普通功能的队列return;}//因为这些都是写操作也有读操作多线程并发执行时写操作是线程不安全的要把这些打包成一个原子加锁synchronized (locker) {//入队列elems[tail] elem;tail;//因为是循环队列所以要判断尾巴有没有超过容量大小下标超过了就要从0开始了if(tail elems.length) {tail 0;}//队列元素要size;}}}//出队列public String take() {String elem null;//因为这些都是写操作也有读操作多线程并发执行时写操作是线程不安全的要把这些打包成一个原子加锁synchronized (locker) {//要判断队列是不是空的空就不能出队列了要阻塞等待if(size 0) {//阻塞等待因为是先实现普通队列的功能所以后面再补充return null;}elem elems[head];head;//因为是循环队列所以要判断头结点有没有超过容量大小下标超过了就要0开始了if(head elems.length) {head 0;}//出队列后队列元素要--size--;return elem;}}
} 3加上阻塞功能 我们要加上阻塞功能就要在这两条件判断上加上wait我们用locker的对象给他wait而且wait必须要在synchronized内使用这里的locker正好能对应上当这个队列满时就阻塞等待等take方法拿走一个数据时才给他唤醒。 加上阻塞功能后的代码如下不是最终代码里面还是存在线程安全问题 class MyBlockingQueue {Object locker new Object();private String elems[] null;private int head 0;//记录头结点private int tail 0;//记录尾结点private int size 0;//队列元素个数//构造方法定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems new String[capacity];}//入队列public void put(String elem) throws InterruptedException {synchronized (locker) {//判断容量满了没满了就不能入队列要阻塞等待if (size this.elems.length) {//阻塞等待先不写先实现普通功能的队列synchronized (locker) {locker.wait();}}//因为这些都是写操作也有读操作多线程并发执行时写操作是线程不安全的要把这些打包成一个原子加锁synchronized (locker) {//入队列elems[tail] elem;tail;//因为是循环队列所以要判断尾巴有没有超过容量大小下标超过了就要从0开始了if(tail elems.length) {tail 0;}//队列元素要size;locker.notify();}}}//出队列public String take() throws InterruptedException {String elem null;//因为这些都是写操作也有读操作多线程并发执行时写操作是线程不安全的要把这些打包成一个原子加锁synchronized (locker) {//要判断队列是不是空的空就不能出队列了要阻塞等待if (size 0) {//阻塞等待因为是先实现普通队列的功能所以后面再补充synchronized (locker) {locker.wait();}}elem elems[head];head;//因为是循环队列所以要判断头结点有没有超过容量大小下标超过了就要0开始了if(head elems.length) {head 0;}//出队列后队列元素要--size--;locker.notify();return elem;}}
} 加上阻塞功能的判断语句 与其对应的一定要记住put和take后要notify不然就会一直阻塞下去导致程序动不了如图 现在进行代码分析 当put时队列满了就要阻塞等待等take这队列后就会唤醒put操作接着put就能入队列了如果是take就相反这是符合我们预期的。如果不满也不空时每次put和take都会notify一次这时会有影响吗答案肯定是否定的不会有影响因为就算没有其他线程在等待唤醒也没有事不会对程序造成啥影响。而且我们现在的代码一定是要么满要么空要么不满也不空。 但是如果有两个线程同时put现在队列是满的A线程先阻塞B线程也阻塞这时有第三个线程take一次把A线程的wait唤醒了等A执行到下面的notifyA线程里put的notify就会唤醒B线程里的wait但是因为A线程put了和第三个线程的take一取一放抵消了此时队列还是满的因为A线程里的put把B线程里的wait唤醒了这时已经是满了的队列还往里放元素就造成了线程安全问题。 解决方案把条件判断if换成while循环语句不是只判断一次当有其他线程把wait唤醒后还要再判断一次这个队列是不是满的或者是空的如果不是满的或者不是空的才释放这个wait不然就要继续wait这样问题也就解决了。 最终代码 class MyBlockingQueue {Object locker new Object();private String elems[] null;private int head 0;//记录头结点private int tail 0;//记录尾结点private int size 0;//队列元素个数//构造方法定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems new String[capacity];}//入队列public void put(String elem) throws InterruptedException {synchronized (locker) {//判断容量满了没满了就不能入队列要阻塞等待while (size this.elems.length) {//阻塞等待先不写先实现普通功能的队列synchronized (locker) {locker.wait();}}//因为这些都是写操作也有读操作多线程并发执行时写操作是线程不安全的要把这些打包成一个原子加锁synchronized (locker) {//入队列elems[tail] elem;tail;//因为是循环队列所以要判断尾巴有没有超过容量大小下标超过了就要从0开始了if(tail elems.length) {tail 0;}//队列元素要size;locker.notify();}}}//出队列public String take() throws InterruptedException {String elem null;//因为这些都是写操作也有读操作多线程并发执行时写操作是线程不安全的要把这些打包成一个原子加锁synchronized (locker) {//要判断队列是不是空的空就不能出队列了要阻塞等待while (size 0) {//阻塞等待因为是先实现普通队列的功能所以后面再补充synchronized (locker) {locker.wait();}}elem elems[head];head;//因为是循环队列所以要判断头结点有没有超过容量大小下标超过了就要0开始了if(head elems.length) {head 0;}//出队列后队列元素要--size--;locker.notify();return elem;}}
} 我们看看wait内部可以看到它内部也是会有说明wait可能会提前被唤醒就要我们多加一次判断这样用while会比用if更好如图 在实际开发中生产者消费者模型往往是多个生产者多个消费者这里的生产者和消费者往往不仅仅是一个线程也可能是一个独立的服务器甚至是一组服务器程序。
但生产者消费者模型最核心的部分还是阻塞队列可以使用synchronized和wait / notify 达到线程安全与阻塞。
3、运用阻塞队列的生产者消费者模型 简单的生产者消费者模型代码 public class Test {public static void main(String[] args) {BlockingQueueInteger queue new ArrayBlockingQueue(10);//生产者Thread t1 new Thread(() - {int n 1;while (true) {try {queue.put(n);System.out.println(生产者元素 n);n;Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});//消费者Thread t2 new Thread(() - {while (true) {try {int n queue.take();System.out.println(消费者元素 n);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}执行结果 可以看到生产者生产一个消费者就消费一个继续等待生产者生产元素后再消费。 都看到这了点个赞再走吧谢谢谢谢谢