当前位置: 首页 > news >正文

网站建设子栏目文案保定市做网站

网站建设子栏目文案,保定市做网站,比特币网站怎么做,高德地图实况街景怎么打开文章目录基础概念生产者消费者概念JUC阻塞队列的存取方法ArrayBlockingQueueArrayBlockingQueue的基本使用生产者方法实现原理ArrayBlockingQueue的常见属性add方法实现offer方法实现offer(time,unit)方法put方法消费者方法实现原理remove方法poll方法poll(time,unit)方法take方… 文章目录基础概念生产者消费者概念JUC阻塞队列的存取方法ArrayBlockingQueueArrayBlockingQueue的基本使用生产者方法实现原理ArrayBlockingQueue的常见属性add方法实现offer方法实现offer(time,unit)方法put方法消费者方法实现原理remove方法poll方法poll(time,unit)方法take方法虚假唤醒LinkedBlockingQueueLinkedBlockingQueue的底层实现生产者方法实现原理add方法offer方法offer(time,unit)方法put方法消费者方法实现原理remove方法poll方法poll(time,unit)方法take方法PriorityBlockingQueue概念PriorityBlockingQueue介绍二叉堆结构介绍PriorityBlockingQueue核心属性PriorityBlockingQueue的写入操作offer基本流程offer扩容操作offer扩容操作PriorityBlockingQueue的读取操作查看获取方法流程查看dequeue获取数据下移做平衡操作DelayQueueDelayQueue介绍应用DelayQueue核心属性DelayQueue写入流程分析DelayQueue读取流程分析remove方法poll方法poll(time,unit)方法take方法SynchronousQueueSynchronousQueue介绍SynchronousQueue核心属性SynchronousQueue的TransferQueue源码QNode源码信息transfer方法实现基础概念 生产者消费者概念 生产者消费者是设计模式的一种。让生产者和消费者基于一个容器来解决强耦合问题。 生产者消费者彼此之间不会直接通讯的而是通过一个容器(队列)进行通讯。 所以生产者生产完数据后扔到容器中不通用等待消费者来处理。 消费者不需要去找生产者要数据直接从容器中获取即可。 而这种容器最常用的结构就是队列。 JUC阻塞队列的存取方法 常用的存取方法都是来自于JUC包下的BlockingQueue。 生产者存储方法 boolean add(E e); // 添加数据到队列如果队列满了无法存储抛出异常 boolean offer(E e); // 添加数据到队列如果队列满了返回false boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 添加数据到队列如果队列满了阻塞timeout时间如果阻塞一段时间依然没添加进入返回false void put(E e) throws InterruptedException; // 添加数据到队列如果队列满了挂起线程等到队列中有位置再扔数据进去死等消费者取数据方法 boolean remove(Object o);// 从队列中移除数据如果队列为空抛出异常 E poll();// 从队列中移除数据如果队列为空返回null么的数据 E poll(long timeout, TimeUnit unit) throws InterruptedException;// 从队列中移除数据如果队列为空挂起线程timeout时间等生产者扔数据再获取 E take() throws InterruptedException;// 从队列中移除数据如果队列为空线程挂起一直等到生产者扔数据再获取ArrayBlockingQueue ArrayBlockingQueue的基本使用 ArrayBlockingQueue在初始化的时候必须指定当前队列的长度。 因为ArrayBlockingQueue是基于数组实现的队列结构数组长度不可变必须提前设置数组长度信息。 public static void main(String[] args) throws InterruptedException {// 必须设置队列的长度ArrayBlockingQueue queue new ArrayBlockingQueue(4);// 生产者扔数据queue.add(1);queue.offer(2);queue.offer(3, 2, TimeUnit.SECONDS);queue.put(2);// 消费者取数据System.out.println(queue.remove());System.out.println(queue.poll());System.out.println(queue.poll(2, TimeUnit.SECONDS));System.out.println(queue.take()); }生产者方法实现原理 生产者添加数据到队列的方法比较多需要一个一个查看。 ArrayBlockingQueue的常见属性 ArrayBlockingQueue中的成员变量。 final ReentrantLock lock; //就是一个ReentrantLock int count;//就是当前数组中元素的个数 final Object[] items; //就是数组本身 //# 基于putIndex和takeIndex将数组结构实现为了队列结构 int putIndex;//存储数据时的下标 int takeIndex;//取储数据时的下标 private final Condition notEmpty;//消费者挂起线程和唤醒线程用到的Condition(看成sync的wait和notify) private final Condition notFull;//生产者挂起线程和唤醒线程用到的Condition(看成sync的wait和notify)add方法实现 add方法本身就是调用了offer方法如果offer方法返回false直接抛出异常。 public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException(Queue full); }offer方法实现 public boolean offer(E e) {// 要求存储的数据不允许为null为null就抛出空指针checkNotNull(e);// 当前阻塞队列的lock锁final ReentrantLock lock this.lock;// 为了保证线程安全加锁lock.lock();try {// 如果队列中的元素已经存满了if (count items.length)return false;else {// 队列没满执行enqueue将元素添加到队列中enqueue(e);return true;}} finally {// 操作完释放锁lock.unlock();} }private void enqueue(E x) {// 拿到数组的引用final Object[] items this.items;// 将元素放到指定位置items[putIndex] x;// 对inputIndex进行操作并且判断是否已经等于数组长度需要归位if (putIndex items.length)// 将索引设置为0putIndex 0;// 元素添加成功进行操作。count;// 将一个Condition中阻塞的线程唤醒。notEmpty.signal(); }offer(time,unit)方法 生产者在添加数据时如果队列已经满了阻塞一会。 阻塞到消费者消费了消息然后唤醒当前阻塞线程。阻塞到了time时间再次判断是否可以添加不能直接告辞。 // 如果线程在挂起的时候如果对当前阻塞线程的中断标记位进行设置此时会抛出异常直接结束 public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {// 非空检验checkNotNull(e);// 将时间单位转换为纳秒long nanos unit.toNanos(timeout);// 加锁final ReentrantLock lock this.lock;// 允许线程中断并排除异常的加锁方式lock.lockInterruptibly();try {// 为什么是while(虚假唤醒)// 如果元素个数和数组长度一致队列慢了while (count items.length) {// 判断等待的时间是否还充裕if (nanos 0)// 不充裕直接添加失败return false;// 挂起等待会同时释放锁资源(对标sync的wait方法)// awaitNanos会挂起线程并且返回剩余的阻塞时间// 恢复执行时需要重新获取锁资源nanos notFull.awaitNanos(nanos);}// 说明队列有空间了enqueue将数据扔到阻塞队列中enqueue(e);return true;} finally {// 释放锁资源lock.unlock();} }put方法 如果队列是满的 就一直挂起直到被唤醒或者被中断。 public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock this.lock;lock.lockInterruptibly();try {while (count items.length)// await方法一直阻塞直到被唤醒或者中断标记位notFull.await();enqueue(e);} finally {lock.unlock();} }消费者方法实现原理 remove方法 poll方法 // 拉取数据 public E poll() {// 加锁操作final ReentrantLock lock this.lock;lock.lock();try {// 如果没有数据直接返回null如果有数据执行dequeue取出数据并返回return (count 0) ? null : dequeue();} finally {lock.unlock();} }// 取出数据 private E dequeue() {// 将成员变量引用到局部变量final Object[] items this.items;// 直接获取指定索引位置的数据E x (E) items[takeIndex];// 将数组上指定索引位置设置为nullitems[takeIndex] null;// 设置下次取数据时的索引位置if (takeIndex items.length)takeIndex 0;// 对count进行--操作count--;// 迭代器内容先跳过if (itrs ! null)itrs.elementDequeued();// signal方法会唤醒当前Condition中排队的一个Node。// signalAll方法会将Condition中所有的Node全都唤醒notFull.signal();// 返回数据。return x; }poll(time,unit)方法 public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos unit.toNanos(timeout);// 竞争锁final ReentrantLock lock this.lock;lock.lockInterruptibly();try {// 如果没有数据while (count 0) {if (nanos 0)// 没数据也无法阻塞了返回nullreturn null;// 没数据挂起消费者线程nanos notEmpty.awaitNanos(nanos);}// 取数据return dequeue();} finally {lock.unlock();} }take方法 public E take() throws InterruptedException {final ReentrantLock lock this.lock;lock.lockInterruptibly();try {// 虚假唤醒while (count 0)notEmpty.await();return dequeue();} finally {lock.unlock();} }虚假唤醒 阻塞队列中如果需要线程挂起操作判断有无数据的位置采用的是while循环 为什么不能换成if肯定是不能换成if逻辑判断。 线程A线程B线程E线程C。 其中ABE生产者C属于消费者。 假如线程的队列是满的。 // E拿到锁资源还没有走while判断 while (count items.length) // A醒了 // B挂起 notFull.await(); enqueue(e);C此时消费一条数据执行notFull.signal()唤醒一个线程A线程被唤醒。 E走判断发现有空余位置可以添加数据到队列E添加数据走enqueue。 如果判断是ifA在E释放锁资源后拿到锁资源直接走enqueue方法。 此时A线程就是在putIndex的位置覆盖掉之前的数据造成数据安全问题。 LinkedBlockingQueue LinkedBlockingQueue的底层实现 查看LinkedBlockingQueue是如何存储数据并且实现链表结构的。 // Node对象就是存储数据的单位 static class NodeE {// 存储的数据E item;// 指向下一个数据的指针NodeE next;// 有参构造Node(E x) { item x; } }查看LinkedBlockingQueue的有参构造 // 可以手动指定LinkedBlockingQueue的长度如果没有指定默认为Integer.MAX_VALUE public LinkedBlockingQueue(int capacity) {if (capacity 0) throw new IllegalArgumentException();this.capacity capacity;// 在初始化时构建一个item为null的节点作为head和last// 这种node可以成为哨兵Node// 如果没有哨兵节点那么在获取数据时需要判断head是否为null才能找next // 如果没有哨兵节点那么在添加数据时需要判断last是否为null才能找nextlast head new NodeE(null); }查看LinkedBlockingQueue的其他属性 // 因为是链表没有想数组的length属性基于AtomicInteger来记录长度 private final AtomicInteger count new AtomicInteger(); // 链表的头取 transient NodeE head; // 链表的尾存 private transient NodeE last; // 消费者的锁 private final ReentrantLock takeLock new ReentrantLock(); // 消费者的挂起操作以及唤醒用的condition private final Condition notEmpty takeLock.newCondition(); // 生产者的锁 private final ReentrantLock putLock new ReentrantLock(); // 生产者的挂起操作以及唤醒用的condition private final Condition notFull putLock.newCondition();生产者方法实现原理 add方法 public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException(Queue full); }offer方法 public boolean offer(E e) {// 非空校验if (e null) throw new NullPointerException();// 拿到存储数据条数的countfinal AtomicInteger count this.count;// 查看当前数据条数是否等于队列限制长度达到了这个长度直接返回falseif (count.get() capacity)return false;// 声明c作为标记存在int c -1;// 将存储的数据封装为Node对象NodeE node new NodeE(e);// 获取生产者的锁。final ReentrantLock putLock this.putLock;// 竞争锁资源putLock.lock();try {// 再次做一个判断查看是否还有空间if (count.get() capacity) {// enqueue扔数据enqueue(node);// 将数据个数 1c count.getAndIncrement();// 拿到count的值 小于 长度限制// 有生产者在基于await挂起这里添加完数据后发现还有空间可以存储数据 // 唤醒前面可能已经挂起的生产者// 因为这里生产者和消费者不是互斥的写操作进行的同时可能也有消费者在消费数据。if (c 1 capacity)// 唤醒生产者notFull.signal();}} finally {// 释放锁资源putLock.unlock();}// 如果c 0代表添加数据之前队列元素个数是0个。// 如果有消费者在队列没有数据的时候来消费此时消费者一定会挂起线程if (c 0)// 唤醒消费者signalNotEmpty();// 添加成功返回true失败返回-1return c 0; }private void enqueue(NodeE node) {// 将当前Node设置为last的next并且再将当前Node作为lastlast last.next node; }private void signalNotEmpty() {// 获取读锁final ReentrantLock takeLock this.takeLock;takeLock.lock();try {// 唤醒notEmpty.signal();} finally {takeLock.unlock();} }offer(time,unit)方法 public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {// 非空检验if (e null) throw new NullPointerException();// 将时间转换为纳秒long nanos unit.toNanos(timeout);// 标记int c -1;// 写锁数据条数final ReentrantLock putLock this.putLock;final AtomicInteger count this.count;// 允许中断的加锁方式putLock.lockInterruptibly();try {// 如果元素个数和限制个数一致直接准备挂起while (count.get() capacity) {// 挂起的时间是不是已经没了if (nanos 0)// 添加失败返回falsereturn false;// 挂起线程nanos notFull.awaitNanos(nanos);}// 有空余位置enqueue添加数据enqueue(new NodeE(e));// 元素个数 1c count.getAndIncrement();// 当前添加完数据还有位置可以添加数据唤醒可能阻塞的生产者if (c 1 capacity)notFull.signal();} finally {// 释放锁putLock.unlock();}// 如果之前元素个数是0唤醒可能等待的消费者if (c 0)signalNotEmpty();return true; }put方法 public void put(E e) throws InterruptedException {if (e null) throw new NullPointerException();int c -1;NodeE node new NodeE(e);final ReentrantLock putLock this.putLock;final AtomicInteger count this.count;putLock.lockInterruptibly();try {while (count.get() capacity) {// 一直挂起线程等待被唤醒notFull.await();}enqueue(node);c count.getAndIncrement();if (c 1 capacity)notFull.signal();} finally {putLock.unlock();}if (c 0)signalNotEmpty(); }消费者方法实现原理 从remove方法开始查看消费者获取数据的方式。 remove方法 public E remove() { E x poll();if (x ! null)return x;elsethrow new NoSuchElementException(); }poll方法 public E poll() {// 拿到队列数据个数的计数器final AtomicInteger count this.count;// 当前队列中数据是否0if (count.get() 0)// 说明队列没数据直接返回null即可return null;// 声明返回结果E x null;// 标记int c -1;// 获取消费者的takeLockfinal ReentrantLock takeLock this.takeLock;// 加锁takeLock.lock();try {// 基于DCL确保当前队列中依然有元素if (count.get() 0) {// 从队列中移除数据x dequeue();// 将之前的元素个数获取并--c count.getAndDecrement();if (c 1)// 如果依然有数据继续唤醒await的消费者。notEmpty.signal();}} finally {// 释放锁资源takeLock.unlock();}// 如果之前的元素个数为当前队列的限制长度// 现在消费者消费了一个数据多了一个空位可以添加if (c capacity)// 唤醒阻塞的生产者signalNotFull();return x; }private E dequeue() {// 拿到队列的head位置数据NodeE h head;NodeE first h.next;// 将之前的哨兵Node.next置位null。help GC。h.next h; // help GC// 将first置位新的headhead first;// 拿到返回结果first节点的item数据也就是之前head.next.itemE x first.item;// 将first数据置位null作为新的headfirst.item null;// 返回数据return x; }private void signalNotFull() {final ReentrantLock putLock this.putLock;putLock.lock();try {// 唤醒生产者。notFull.signal();} finally {putLock.unlock();} }poll(time,unit)方法 public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 返回结果E x null;// 标识int c -1;// 将挂起实现设置为纳秒级别long nanos unit.toNanos(timeout);// 拿到计数器final AtomicInteger count this.count;// take锁加锁final ReentrantLock takeLock this.takeLock;takeLock.lockInterruptibly();try {// 如果没数据进到whilewhile (count.get() 0) {if (nanos 0)return null;// 挂起当前线程nanos notEmpty.awaitNanos(nanos);}// 剩下内容和之前一样。x dequeue();c count.getAndDecrement();if (c 1)notEmpty.signal();} finally {takeLock.unlock();}if (c capacity)signalNotFull();return x; }take方法 public E take() throws InterruptedException {E x;int c -1;final AtomicInteger count this.count;final ReentrantLock takeLock this.takeLock;takeLock.lockInterruptibly();try {// 相比poll(time,unit)方法这里的出口只有一个就是中断标记位抛出异常否则一直等待while (count.get() 0) {notEmpty.await();}x dequeue();c count.getAndDecrement();if (c 1)notEmpty.signal();} finally {takeLock.unlock();}if (c capacity)signalNotFull();return x; }PriorityBlockingQueue概念 PriorityBlockingQueue介绍 首先PriorityBlockingQueue是一个优先级队列他不满足先进先出的概念。 会将查询的数据进行排序排序的方式就是基于插入数据值的本身。 如果是自定义对象必须要实现Comparable接口才可以添加到优先级队列。 排序的方式是基于二叉堆实现的底层是采用数据结构实现的二叉堆。 public static void main(String[] args) {PriorityBlockingQueue queue new PriorityBlockingQueue();queue.add(234);queue.add(123);queue.add(456);queue.add(345);System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll());/* 输出顺序依次为123、234、345、456 */ }二叉堆结构介绍 优先级队列PriorityBlockingQueue基于二叉堆实现的。 private transient Object[] queue;PriorityBlockingQueue是基于数组实现的二叉堆。 二叉堆是什么? 二叉堆就是一个完整的二叉树。任意一个节点大于父节点或者小于父节点。基于同步的方式可以定义出小顶堆和大顶堆。 小顶堆以及小顶堆基于数据实现的方式。 PriorityBlockingQueue核心属性 // 数组的初始长度 private static final int DEFAULT_INITIAL_CAPACITY 11; // 数组的最大长度 // -8的目的是为了适配各个版本的虚拟机 // 默认当前使用的hotspot虚拟机最大支持Integer.MAX_VALUE - 2但是其他版本的虚拟机不一定。 private static final int MAX_ARRAY_SIZE Integer.MAX_VALUE - 8; // 存储数据的数组也是基于这个数组实现的二叉堆。 private transient Object[] queue; // size记录当前阻塞队列中元素的个数 private transient int size; // 要求使用的对象要实现Comparable比较器。基于comparator做对象之间的比较 private transient Comparator? super E comparator; // 实现阻塞队列的lock锁 private final ReentrantLock lock; // 挂起线程操作。 private final Condition notEmpty; // 因为PriorityBlockingQueue的底层是基于二叉堆的而二叉堆又是基于数组实现的数组长度是固定的如果需要扩容需 private transient volatile int allocationSpinLock;// 阻塞队列中用到的原理其实就是普通的优先级队列。 private PriorityQueueE q;PriorityBlockingQueue的写入操作 毕竟是阻塞队列添加数据的操作咱们是很了解无法还是addofferoffer(time,unit)put。 但是因为优先级队列中数组是可以扩容的虽然有长度限制但是依然属于无界队列的概念所以生产者不会阻塞所以只有offer方法可以查看。 这次核心的内容并不是添加数据的区别。主要关注的是如何保证二叉堆中小顶堆的结构的并且还要查看数组扩容的一个过程是怎样的。 offer基本流程 因为add方法依然调用的是offer方法直接查看offer方法即可。 public boolean offer(E e) {// 非空判断。if (e null)throw new NullPointerException();// 拿到锁直接上锁final ReentrantLock lock this.lock;lock.lock();// n:size元素的个数// cap:当前数组的长度// array:就是存储数据的数组int n, cap;Object[] array;while ((n size) (cap (array queue).length))// 如果元素个数大于等于数组的长度需要尝试扩容。tryGrow(array, cap);try {// 拿到了比较器Comparator? super E cmp comparator;// 比较数据大小存储数据是否需要做上移操作保证平衡的if (cmp null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size n 1;// 如果有挂起的线程需要去唤醒挂起的消费者。notEmpty.signal();} finally {// 释放锁lock.unlock();}return true; }offer扩容操作 在添加数据之前会采用while循环的方式来判断当前元素个数是否大于等于数组长度。如果满足需要执行tryGrow方法对数组进行扩容。 如果两个线程同时执行tryGrow只会有一个线程在扩容另一个线程可能多次走while循环多次走tryGrow方法但是依然需要等待前面的线程扩容完毕。 private void tryGrow(Object[] array, int oldCap) {// 释放锁资源。lock.unlock(); // must release and then re-acquire main lock// 声明新数组。Object[] newArray null;// 如果allocationSpinLock属性值为0说明当前没有线程正在扩容的。if (allocationSpinLock 0 // 基于CAS的方式将allocationSpinLock从0修改为1代表当前线程可以开始扩容UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {// 计算新数组长度int newCap oldCap ((oldCap 64) ?// 如果数组长度比较小这里加快扩容长度速度。(oldCap 2) : // grow faster if small// 如果长度大于等于64了每次扩容到1.5倍即可。(oldCap 1));// 如果新数组长度大于MAX_ARRAY_SIZE需要做点事了。if (newCap - MAX_ARRAY_SIZE 0) { // possible overflow// 声明minCap长度为老数组 1int minCap oldCap 1;// 老数组1变为负数或者老数组长度已经大于MAX_ARRAY_SIZE了无法扩容了。if (minCap 0 || minCap MAX_ARRAY_SIZE)throw new OutOfMemoryError();// 如果没有超过限制直接设置为最大长度即可newCap MAX_ARRAY_SIZE;}// 新数组长度得大于老数组长度// 第二个判断确保没有并发扩容的出现。if (newCap oldCap queue array)// 构建出新数组newArray new Object[newCap];} finally {// 新数组有了标记位归0~~allocationSpinLock 0;}}// 如果到了这newArray依然为null说明这个线程没有进到if方法中去构建新数组if (newArray null) // back off if another thread is allocatingThread.yield();// 拿锁资源lock.lock();// 拿到锁资源后确认是构建了新数组的线程这里就需要将新数组复制给queue并且导入数据if (newArray ! null queue array) {// 将新数组赋值给queuequeue newArray;// 将老数组的数据全部导入到新数组中。System.arraycopy(array, 0, newArray, 0, oldCap);} }offer扩容操作 这里是数据如何放到数组上并且如何保证的二叉堆结构。 // k:当前元素的个数(其实就是要放的索引位置) // x:需要添加的数据 // array:数组 private static T void siftUpComparable(int k, T x, Object[] array) {// 将插入的元素直接强转为Comparable(com.aiz.User cannot be cast to java.lang.Comparable) // 这行强转会导致添加没有实现Comparable的元素直接报错。Comparable? super T key (Comparable? super T) x;// k大于0走while逻辑。(原来有数据)while (k 0) {// 获取父节点的索引位置。int parent (k - 1) 1;// 拿到父节点的元素。Object e array[parent];// 用子节点compareTo父节点如果 0说明当前son节点比parent要大。if (key.compareTo((T) e) 0)// 直接break完事break;// 将son节点的位置设置上之前的parent节点array[k] e;// 重新设置x节点需要放置的位置。k parent;}// k 0当前元素是第一个元素直接插入进去。array[k] key; }PriorityBlockingQueue的读取操作 读取操作是存储现在挂起的情况的因为如果数组中元素个数为0当前线程如果执行了take方法 必然需要挂起。 其次获取数据因为是优先级队列所以需要从二叉堆栈顶拿数据直接拿索引为0的数据即可但是拿完之后需要保持二叉堆结构所以会有下移操作。 查看获取方法流程 poll public E poll() {final ReentrantLock lock this.lock;lock.lock();try {// 拿到返回数据没拿到返回nullreturn dequeue();} finally {lock.unlock();}}poll(time,unit) public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 将挂起的时间转换为纳秒long nanos unit.toNanos(timeout);final ReentrantLock lock this.lock;// 允许线程中断抛异常的加锁lock.lockInterruptibly();// 声明结果E result;try {// dequeue是去拿数据的可能会出现拿到的数据为null如果为null同时挂起时间还有剩余这边就直接通过notEmpwhile ( (result dequeue()) null nanos 0)nanos notEmpty.awaitNanos(nanos);} finally {lock.unlock();}// 有数据正常返回没数据告辞~return result; }take public E take() throws InterruptedException {final ReentrantLock lock this.lock;lock.lockInterruptibly();E result;try {while ( (result dequeue()) null)// 无线等要么有数据要么中断线程notEmpty.await();} finally {lock.unlock();}return result; }查看dequeue获取数据 获取数据主要就是从数组中拿到0索引位置数据然后保持二叉堆结构。 private E dequeue() {// 将元素个数-1拿到了索引位置。int n size - 1;// 判断是不是木有数据了没数据直接返回null即可if (n 0)return null;// 说明有数据else {// 拿到数组arrayObject[] array queue;、// 拿到0索引位置的数据E result (E) array[0];// 拿到最后一个数据E x (E) array[n];// 将最后一个位置置位nullarray[n] null;Comparator? super E cmp comparator;if (cmp null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);// 元素个数-1赋值sizesize n;// 返回resultreturn result;} }下移做平衡操作 一定要以局部的方式去查看树结构的变化他是从跟节点往下找较小的一个子节点将较小的子节点挪动到父节点位置再将循环往下走如果一来整个二叉堆的结构就可以保证了。 // k:默认进来是0 // x:代表二叉堆的最后一个数据 // array:数组 // n:最后一个索引 private static T void siftDownComparable(int k, T x, Object[] array,int n) {// 健壮性校验取完第一个数据已经没数据了那就不需要做平衡操作if (n 0) {// 拿到最后一个数据的比较器Comparable? super T key (Comparable? super T)x;// 因为二叉堆是一个二叉满树所以在保证二叉堆结构时只需要做一半就可以int half n 1; // loop while a non-leaf// 做了超过一半就不需要再往下找了。while (k half) {// 找左子节点索引一个公式可以找到当前节点的左子节点int child (k 1) 1; // assume left child is least// 拿到左子节点的数据Object c array[child];// 拿到右子节点索引int right child 1;// 确认有右子节点// 判断左节点是否大于右节点if (right n ((Comparable? super T) c).compareTo((T) array[right]) 0)// 如果左大于右那么c就执行右c array[child right];// 比较最后一个节点是否小于当前的较小的子节点if (key.compareTo((T) c) 0)break;// 将左右子节点较小的放到之前的父节点位置array[k] c;// k重置到之前的子节点位置k child;}// 上面while循环搞定后可以确认整个二叉堆中数据已经移动ok了只差当前k的位置数据是null// 将最后一个索引的数据放到k的位置array[k] key;} } DelayQueue DelayQueue介绍应用 DelayQueue就是一个延迟队列生产者写入一个消息这个消息还有直接被消费的延迟时间。需要让消息具有延迟的特性。 DelayQueue也是基于二叉堆结构实现的甚至本事就是基于PriorityQueue实现的功能。二叉堆结构每次获取的是栈顶的数据需要让DelayQueue中的数据在比较时跟根据延迟时间做比较剩余时间最短的要放在栈顶。 查看DelayQueue类信息 public class DelayQueueE extends Delayed extends AbstractQueueEimplements BlockingQueueE { } // 接口继承了Comparable这样就具备了比较的能力。 public interface Delayed extends ComparableDelayed {// 抽象方法就是咱们需要设置的延迟时间long getDelay(TimeUnit unit);// Comparable接口提供的public int compareTo(T o); }基于上述特点声明一个可以写入DelayQueue的元素类 public class Task implements Delayed {/*** 任务的名称*/private String name;/*** 什么时间点执行*/private Long time;public Long getTime() {return time;}/*** 单位毫秒。*/public Task(String name, Long delay) {// 任务名称this.name name;this.time System.currentTimeMillis() delay;}/*** 设置任务什么时候可以出延迟队列* param unit 单位* return*/Overridepublic long getDelay(TimeUnit unit) {// 单位是毫秒return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}/*** 两个任务在插入到延迟队列时的比较方式*/Overridepublic int compareTo(Delayed o) {return (int) (this.time - ((Task) o).getTime());} }在使用时查看到DelayQueue底层用了PriorityQueue在一定程度上DelayQueue也是无界队列。 测试效果 public static void main(String[] args) throws InterruptedException {// 声明元素Task task1 new Task(A, 1000L);Task task2 new Task(B, 5000L);Task task3 new Task(C, 3000L);Task task4 new Task(D, 2000L);// 声明阻塞队列DelayQueueTask queue new DelayQueue();// 将元素添加到延迟队列中queue.put(task1);queue.put(task2);queue.put(task3);queue.put(task4);// 获取元素System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());// A,D,C,B }在点外卖时15分钟商家需要接单如果不接单这个订单自动取消。 可以每下一个订单就放到延迟队列中如果规定时间内商家没有接单直接通过消费者获取元素然后取消订单。 只要是有需要延迟一定时间后再执行的任务就可以通过延迟队列去实现。 DelayQueue核心属性 可以查看到DelayQueue就四个核心属性 // 因为DelayQueue依然属于阻塞队列需要保证线程安全。看到只有一把锁生产者和消费者使用的是一个lock private final transient ReentrantLock lock new ReentrantLock(); // 因为DelayQueue还是基于二叉堆结构实现的没有必要重新搞一个二叉堆直接使用的PriorityQueue private final PriorityQueueE q new PriorityQueueE(); // leader一般会存储等待栈顶数据的消费者在整体写入和消费的过程中会设置的leader的一些判断。 private Thread leader null; // 生产者在插入数据时不会阻塞的。当前的Condition就是给消费者用的 // 比如消费者在获取数据时发现栈顶的数据还又没到延迟时间。 // 这个时候咱们就需要将消费者线程挂起阻塞一会阻塞到元素到了延迟时间或者是生产者插入的元素到了栈顶此时 private final Condition available lock.newCondition();DelayQueue写入流程分析 Delay是无界的数组可以动态的扩容不需要关注生产者的阻塞问题他就没有阻塞问题。 这里只需要查看offer方法即可。 public boolean offer(E e) {// 直接获取lock加锁。final ReentrantLock lock this.lock;lock.lock();try {// 直接调用PriorityQueue的插入方法这里会根据之前重写Delayed接口中的compareTo方法做排序然后调整上移和下移q.offer(e);// 调用优先级队列的peek方法拿到堆顶的数据// 拿到堆顶数据后判断是否是刚刚插入的元素if (q.peek() e) {// leader赋值为null。在消费者的位置再提一嘴leader null;// 唤醒消费者避免刚刚插入的数据的延迟时间出现问题。available.signal();}// 插入成功return true;} finally {// 释放锁lock.unlock();} }DelayQueue读取流程分析 remove方法 // 依然是AbstractQueue提供的方法有结果就返回没结果扔异常 public E remove() {E x poll();if (x ! null)return x;elsethrow new NoSuchElementException(); }poll方法 // poll是浅尝一下不会阻塞消费者能拿就拿拿不到就拉倒 public E poll() {// 消费者和生产者是一把锁先拿锁加锁。final ReentrantLock lock this.lock;lock.lock();try {// 拿到栈顶数据。E first q.peek();// 如果元素为null直接返回null// 如果getDelay方法返回的结果是大于0的那说明当前元素还每到延迟时间元素无法返回返回nullif (first null || first.getDelay(NANOSECONDS) 0)return null;else// 到这说明元素不为null并且已经达到了延迟时间直接调用优先级队列的poll方法return q.poll();} finally {lock.unlock();} }poll(time,unit)方法 这个是允许阻塞的并且指定一定的时间。 public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 先将时间转为纳秒long nanos unit.toNanos(timeout);// 拿锁加锁。final ReentrantLock lock this.lock;lock.lockInterruptibly();try {// 死循环。for (;;) {// 拿到堆顶数据E first q.peek();// 如果元素为nullif (first null) {// 并且等待的时间小于等于0。不能等了直接返回nullif (nanos 0)return null;// 说明当前线程还有可以阻塞的时间阻塞指定时间即可。else// 这里挂起线程后说明队列没有元素在生产者添加数据之后会唤醒nanos available.awaitNanos(nanos);// 到这说明有数据} else {// 有数据的话先获取数据现在是否可以执行延迟时间是否已经到了指定时间long delay first.getDelay(NANOSECONDS);// 延迟时间是否已经到了if (delay 0)// 时间到了直接执行优先级队列的poll方法返回元素return q.poll();// 延迟时间没到消费者需要等一会// 这个是查看消费者可以等待的时间if (nanos 0)// 直接返回nulllreturn null;// 延迟时间没到消费者可以等一会// 把first赋值为nullfirst null; // dont retain ref while waiting// 如果等待的时间小于元素剩余的延迟时间消费者直接挂起。反正暂时拿不到但是不能保证后续是否有生产者 // 如果已经有一个消费者在等待堆顶数据了我这边不做额外操作直接挂起即可。if (nanos delay || leader ! null)nanos available.awaitNanos(nanos);// 当前消费者的阻塞时间可以拿到数据并且没有其他消费者在等待堆顶数据else {// 拿到当前消费者的线程对象Thread thisThread Thread.currentThread();// 将leader设置为当前线程leader thisThread;try {// 会让当前消费者阻塞这个元素的延迟时间long timeLeft available.awaitNanos(delay);// 重新计算当前消费者剩余的可阻塞时间。nanos - delay - timeLeft;} finally {// 到了时间将leader设置为nullif (leader thisThread)leader null;}}}}} finally {// 没有消费者在等待元素队列中的元素不为nullif (leader null q.peek() ! null)// 只要当前没有leader在等并且队列有元素就需要再次唤醒消费者。// 避免队列有元素但是没有消费者处理的问题available.signal();// 释放锁lock.unlock();} }take方法 这个是允许阻塞的但是可以一直等要么等到元素要么等到被中断。 public E take() throws InterruptedException {// 正常加锁并且允许中断final ReentrantLock lock this.lock;lock.lockInterruptibly();try {for (;;) {// 拿到元素E first q.peek();if (first null)// 没有元素挂起。available.await();else {// 有元素获取延迟时间。long delay first.getDelay(NANOSECONDS);// 判断延迟时间是不是已经到了if (delay 0)// 基于优先级队列的poll方法返回return q.poll();first null; // dont retain ref while waiting// 如果有消费者在等就正常await挂起if (leader ! null)available.await();// 如果没有消费者在等的堆顶数据我来等else {// 获取当前线程Thread thisThread Thread.currentThread();// 设置为leader代表等待堆顶的数据leader thisThread;try {// 等待指定(堆顶元素的延迟时间)时长available.awaitNanos(delay);} finally {if (leader thisThread)// leader赋值nullleader null;}}}}} finally {// 避免消费者无线等来一个唤醒消费者的方法一般是其他消费者拿到元素走了之后并且延迟队列还有元素就执行if (leader null q.peek() ! null)available.signal();// 释放锁lock.unlock();} }SynchronousQueue SynchronousQueue介绍 SynchronousQueue这个阻塞队列和其他的阻塞队列有很大的区别。 在咱们的概念中队列肯定是要存储数据的但是SynchronousQueue不会存储数据的。 SynchronousQueue队列中他不存储数据存储生产者或者是消费者。 当存储一个生产者到SynchronousQueue队列中之后生产者会阻塞(看你调用的方法) 。 生产者最终会有几种结果 如果在阻塞期间有消费者来匹配生产者就会将绑定的消息交给消费者。生产者得等阻塞结果或者不允许阻塞那么就直接失败。生产者在阻塞期间如果线程中断直接告辞。 同理消费者和生产者的效果是一样。 生产者和消费者的数据是直接传递的不会经过SynchronousQueue。 SynchronousQueue是不会存储数据的。 经过阻塞队列的学习 生产者 offer()生产者在放到SynchronousQueue的同时如果有消费者在等待消息直接配对。如果没有消费者在等待消息这里直接返回告辞。offer(time,unit)生产者在放到SynchronousQueue的同时如果有消费者在等待消息直接配对。如果没有消费者在等待消息阻塞time时间如果还没有告辞。put()生产者在放到SynchronousQueue的同时如果有消费者在等待消息直接配对。如果没有死等。 测试效果 public static void main(String[] args) throws InterruptedException {// 因为当前队列不存在数据没有长度的概念。SynchronousQueue queue new SynchronousQueue();String msg 消息!;/*new Thread(() - {// b false:代表没有消费者来拿boolean b false;try {b queue.offer(msg, 1, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(b);}).start();Thread.sleep(100);new Thread(() - {System.out.println(queue.poll());}).start();*/new Thread(() - {try {System.out.println(queue.poll(1, TimeUnit.SECONDS));} catch (InterruptedException e) {e.printStackTrace();}}).start();Thread.sleep(100);new Thread(() - {queue.offer(msg);}).start(); }SynchronousQueue核心属性 进到SynchronousQueue类的内部后发现了一个内部类Transferer内部提供了一个transfer的方法。 abstract static class TransfererE {abstract E transfer(E e, boolean timed, long nanos); }当前这个类中提供的transfer方法就是生产者和消费者在调用读写数据时要用到的核心方法。 生产者在调用上述的transfer方法时第一个参数e会正常传递数据。 消费者在调用上述的transfer方法时第一个参数e会传递null。 SynchronousQueue针对抽象类Transferer做了几种实现。 一共看到了两种实现方式 TransferStackTransferQueue 这两种类继承了Transferer抽象类在构建SynchronousQueue时会指定使用哪种子类。 // 到底采用哪种实现需要把对应的对象存放到这个属性中 private transient volatile TransfererE transferer; // 采用无参时会调用下述方法再次调用有参构造传入false public SynchronousQueue() {this(false); } // 调用的是当前的有参构造fair代表公平还是不公平 public SynchronousQueue(boolean fair) {// 如果是公平采用Queue如果是不公平采用Stacktransferer fair ? new TransferQueueE() : new TransferStackE(); }TransferQueue的特点 代码查看效果 public static void main(String[] args) throws InterruptedException {// 因为当前队列不存在数据没有长度的概念。//SynchronousQueue queue new SynchronousQueue(true);SynchronousQueue queue new SynchronousQueue(false);new Thread(() - {try {queue.put(生1);} catch (InterruptedException e) {e.printStackTrace();}}).start();new Thread(() - {try {queue.put(生2);} catch (InterruptedException e) {e.printStackTrace();}}).start();new Thread(() - {try {queue.put(生3);} catch (InterruptedException e) {e.printStackTrace();}}).start();Thread.sleep(100);new Thread(() - {System.out.println(消1: queue.poll());}).start();Thread.sleep(100);new Thread(() - {System.out.println(消2: queue.poll());}).start();Thread.sleep(100);new Thread(() - {System.out.println(消3: queue.poll());}).start(); }SynchronousQueue的TransferQueue源码 为了查看清除SynchronousQueue的TransferQueue源码需要从两点开始查看源码信息 QNode源码信息 static final class QNode {// 当前节点可以获取到next节点volatile QNode next; // next node in queue// item在不同情况下效果不同// 生产者:有数据// 消费者:为nullvolatile Object item; // CASed to or from null// 当前线程volatile Thread waiter; // to control park/unparkfinal boolean isData; // true 是 putfalse 是 take// 最终生产者需要将item交给消费者// 最终消费者需要获取生产者的item// 省略了大量提供的CAS操作//... ... }transfer方法实现 // 当前方法是TransferQueue的核心内容 // e:传递的数据 // timed:false代表无限阻塞true代表阻塞nacos时间 E transfer(E e, boolean timed, long nanos) {// 当前QNode是要封装当前生产者或者消费者的信息QNode s null; // constructed/reused as needed// isData true:代表是生产者// isData false:代表是消费者boolean isData (e ! null);// 死循环for (;;) {// 获取尾节点和头结点QNode t tail;QNode h head;// 为了避免TransferQueue还没有初始化这边做一个健壮性判断if (t null || h null) // saw uninitialized valuecontinue; // spin// 如果满足h t 条件说明当前队列没有生产者或者消费者为空// 如果有节点同时当前节点和队列节点属于同一种角色。// if中的逻辑是进到队列if (h t || t.isData isData) { // empty or same-mode// 在判断并发问题// 拿到尾节点的nextQNode tn t.next;// 如果t不为尾节点进来说明有其他线程并发修改了tailif (t ! tail) // inconsistent read// 重新走for循环continue;// tn如果为不null说明前面有线程并发添加了一个节点if (tn ! null) { // lagging tail// 直接帮助那个并发线程修改tail的指向advanceTail(t, tn);// 重新走for循环continue;}// 获取当前线程是否可以阻塞// 如果timed为true并且阻塞的时间小于等于0// 不需要匹配直接告辞!!!if (timed nanos 0) // cant waitreturn null;// 如果可以阻塞将当前需要插入到队列的QNode构建出来if (s null)s new QNode(e, isData);// 基于CAS操作将tail节点的next设置为当前线程if (!t.casNext(null, s)) // failed to link in// 如果进到if说明修改失败重新执行for循环修改continue;// CAS操作成功直接替换tail的指向advanceTail(t, s); // swing tail and wait// 如果进到队列中了挂起线程要么等生产者要么等消费者。// x是返回替换后的数据Object x awaitFulfill(s, e, timed, nanos);// 如果元素和节点相等说明节点取消了if (x s) { // wait was cancelled// 清空当前节点将上一个节点的next指向当前节点的next直接告辞clean(t, s);return null;}// 判断当前节点是否还在队列中if (!s.isOffList()) { // not already unlinked// 将当前节点设置为headadvanceHead(t, s); // unlink if head// 如果 x ! null 如果拿到了数据说明我是消费者if (x ! null) // and forget fields// 将当前节点的item设置为自己s.item s;// 线程置位nulls.waiter null;}// 返回数据return (x ! null) ? (E)x : e;// 匹配队列中的橘色} else { // complementary-mode// 拿到head的next作为要匹配的节点QNode m h.next; // node to fulfill// 做并发判断如果头节点尾节点或者head.next发生了变化这边要重新走for循环if (t ! tail || m null || h ! head)continue; // inconsistent read// 没并发问题可以拿数据// 拿到m节点的item作为x。Object x m.item;// 如果isData (x ! null)满足说明当前出现了并发问题消费者去匹配队列的消费者不合理if (isData (x ! null) || // m already fulfilled// 如果排队的节点取消就会讲当前QNode中的item指向QNodex m || // m cancelled// 如果前面两个都没满足可以交换数据了。// 如果交换失败说明有并发问题!m.casItem(x, e)) { // lost CAS// 重新设置head节点并且再走一次循环advanceHead(h, m); // dequeue and retrycontinue;}// 替换headadvanceHead(h, m); // successfully fulfilled// 唤醒head.next中的线程LockSupport.unpark(m.waiter);// 这边匹配好了数据也交换了直接返回// 如果 x ! null说明队列中是生产者当前是消费者这边直接返回x具体数据 // 反之队列中是消费者当前是生产者直接返回自己的数据return (x ! null) ? (E)x : e;}} }
http://www.tj-hxxt.cn/news/140628.html

相关文章:

  • 网站404怎么做如何弄微信小程序店铺
  • 网站的回到顶部怎么做个人网页源码
  • 衡水专业网站设计旅游网站建设要求
  • 南京机关建设网站html遇到的问题及解决方法
  • 广州建设银行官方网站广东华迪工程建设监理公司网站
  • 网站开发属于技术合同公众号开发简单吗
  • 温州网站建设制作公司网页二级网站怎么做
  • wordpress网站名品传集团网站建设
  • 网站建设提供资料网站系统商城
  • 门头沟网站开发实验中心网站建设
  • 漯河市城市建设投资公司网站焦作网站建设公司排名
  • 酒店宾馆客栈旅馆古典网站源码 asp源码带后台北京旅游网站排名
  • 做特产的网站的分析广告引流推广平台
  • 长沙免费模板建站室内设计专业招聘信息
  • 做网站 客户大概会有那些问题湖南百度推广代理商
  • 动易网站频道栏目字体大小修改登录官方网站
  • 公司不需要做网站了烟台商城网站建设
  • 珠海找工作哪个网站好手机app编程教程
  • 企业网站seo诊断怎样登录微信开发者平台
  • 网站生成小程序北京信息港
  • 专业做家居的网站中国建筑工程个人信息网
  • 东莞型网站建设怎么做网站推销自己的产品
  • 建设一个网站首先需要什么问题新闻列表做的最好的网站
  • 家用电器销售的网站开发软件工程的就业前景和就业方向
  • 网站建设公司名称网站建设方案服务公司
  • 大型新型网站饥饿营销案例
  • 做网站重庆站长工具站长之家
  • 商务通代码是不是只要放在网站根目录下就可以了开发小程序用什么软件写代码
  • 网站前期准备工作淘宝做网站被骗
  • dedecms网站tag标签静态化网页设计作品作业成品免费下载