电子商务网站问题与解决方案,wordpress头条,网站的软文推广,wordpress supports5.1 DelayQueue介绍应用
DelayQueue就是一个延迟队列#xff0c;生产者写入一个消息#xff0c;这个消息还有直接被消费的延迟时间。 需要让消息具有延迟的特性。 DelayQueue也是基于二叉堆结构实现的#xff0c;甚至本事就是基于PriorityQueue实现的功能。二叉堆结构…5.1 DelayQueue介绍应用
DelayQueue就是一个延迟队列生产者写入一个消息这个消息还有直接被消费的延迟时间。 需要让消息具有延迟的特性。 DelayQueue也是基于二叉堆结构实现的甚至本事就是基于PriorityQueue实现的功能。二叉堆结构每次获取的是栈顶的数据需要让DelayQueue中的数据在比较时跟根据延迟时间做比较剩余时间最短的要放在栈顶。查看DelayQueue类信息
public class DelayQueueE extends Delayed extends AbstractQueueE implements BlockingQueueE {
// 发现DelayQueue中的元素需要继承Delayed接口。
}
//
// 接口继承了Comparable这样就具备了比较的能力。
public interface Delayed extends ComparableDelayed {
// 抽象方法就是咱们需要设置的延迟时间
long getDelay(TimeUnit unit);
// Comparable接口提供的public int compareTo(T o);
}
基于上述特点声明一个可以写入DelayQueue的元素类
public class Task implements Delayed {
/** 任务的名称 */
private String name;
/** 什么时间点执行 */
private Long time;
/**
*
* param name
* param delay 单位毫秒。
*/
public Task(String name, Long delay) {
// 任务名称
this.name name;this.time System.currentTimeMillis() delay;
}
/**
* 设置任务什么时候可以出延迟队列
* param unit
* return
*/
Override
public long getDelay(TimeUnit unit) {
// 单位是毫秒视频里写错了写成了纳秒
return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 两个任务在插入到延迟队列时的比较方式
* param o
* return
*/
Override
public int compareTo(Delayed o) {
return (int) (this.time - ((Task)o).getTime());
}
}
在使用时查看到DelayQueue底层用了PriorityQueue在一定程度上DelayQueue也是无界队列。 测试效果
public static void main(String[] args) throws InterruptedException {
// 声明元素
Task task1 new Task(A,1000L);
Task task2 new Task(B,5000L);
Task task3 new Task(C,3000L);
Task task4 new Task(D,2000L);
// 声明阻塞队列
DelayQueueTask queue new DelayQueue();
// 将元素添加到延迟队列中
queue.put(task1);
queue.put(task2);
queue.put(task3);
queue.put(task4);
// 获取元素
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
// A,D,C,B
}
在应用时外卖15分钟商家需要节点如果不节点这个订单自动取消。 可以每下一个订单就放到延迟队列中如果规定时间内商家没有节点直接通过消费者获取元素然后取消订单。 只要是有需要延迟一定时间后再执行的任务就可以通过延迟队列去实现。
5.2、DelayQueue核心属性
可以查看到DelayQueue就四个核心属性
// 因为DelayQueue依然属于阻塞队列需要保证线程安全。看到只有一把锁生产者和消费者使用的是一个lock
private final transient ReentrantLock lock new ReentrantLock();
// 因为DelayQueue还是基于二叉堆结构实现的没有必要重新搞一个二叉堆直接使用的PriorityQueue
private final PriorityQueueE q new PriorityQueueE();
// leader一般会存储等待栈顶数据的消费者在整体写入和消费的过程中会设置的leader的一些判断。
private Thread leader null;
// 生产者在插入数据时不会阻塞的。当前的Condition就是给消费者用的
// 比如消费者在获取数据时发现栈顶的数据还又没到延迟时间。
// 这个时候咱们就需要将消费者线程挂起阻塞一会阻塞到元素到了延迟时间或者是生产者插入的元素到了栈顶此时生产者会唤醒消费者。
private final Condition available lock.newCondition();
5.3、DelayQueue写入流程分析
Delay是无界的数组可以动态的扩容不需要关注生产者的阻塞问题他就没有阻塞问题。 这里只需要查看offer方法即可。
public boolean offer(E e) {
// 直接获取lock加锁。
final ReentrantLock lock this.lock;lock.lock();
try {
// 直接调用PriorityQueue的插入方法这里会根据之前重写Delayed接口中的compareTo方法做排序然后调整上移和下移操作。
q.offer(e);
// 调用优先级队列的peek方法拿到堆顶的数据
// 拿到堆顶数据后判断是否是刚刚插入的元素
if (q.peek() e) {
// leader赋值为null。在消费者的位置再提一嘴
leader null;
// 唤醒消费者避免刚刚插入的数据的延迟时间出现问题。
available.signal();
}
// 插入成功
return true;
} finally {
// 释放锁
lock.unlock();
}
}
5.4、DelayQueue读取流程分析
消费者依然还是存在阻塞的情况因为有两个情况 ● 消费者要拿到栈顶数据但是延迟时间还没到此时消费者需要等待一会。 ● 消费者要来拿数据但是发现已经有消费者在等待栈顶数据了这个后来的消费者也需要等待一会。 依然需要查看四个方法的实现
5.4.1 remove方法
// 依然是AbstractQueue提供的方法有结果就返回没结果扔异常
public E remove() {
E x poll();
if (x ! null)
return x;
else
throw new NoSuchElementException();
}
5.4.2 poll方法
// poll是浅尝一下不会阻塞消费者能拿就拿拿不到就拉倒
public E poll() {
// 消费者和生产者是一把锁先拿锁加锁。
final ReentrantLock lock this.lock;
lock.lock();
try {
// 拿到栈顶数据。
E first q.peek();
// 如果元素为null直接返回null
// 如果getDelay方法返回的结果是大于0的那说明当前元素还每到延迟时间元素无法返回返回null
if (first null || first.getDelay(NANOSECONDS) 0)return null;
else
// 到这说明元素不为null并且已经达到了延迟时间直接调用优先级队列的poll方法
return q.poll();
} finally {
// 释放锁。
lock.unlock();
}
}
5.4.3 poll(time,unit)方法
这个是允许阻塞的并且指定一定的时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 先将时间转为纳秒
long nanos unit.toNanos(timeout);
// 拿锁加锁。
final ReentrantLock lock this.lock;
lock.lockInterruptibly();
try {
// 死循环。
for (;;) {
// 拿到堆顶数据
E first q.peek();
// 如果元素为nullif (first null) {
// 并且等待的时间小于等于0。不能等了直接返回null
if (nanos 0)
return null;
// 说明当前线程还有可以阻塞的时间阻塞指定时间即可。
else
// 这里挂起线程后说明队列没有元素在生产者添加数据之后会唤醒
nanos available.awaitNanos(nanos);
// 到这说明有数据
} else {
// 有数据的话先获取数据现在是否可以执行延迟时间是否已经到了指定时间
long delay first.getDelay(NANOSECONDS);
// 延迟时间是否已经到了
if (delay 0)
// 时间到了直接执行优先级队列的poll方法返回元素
return q.poll();
// 延迟时间没到消费者需要等一会
// 这个是查看消费者可以等待的时间
if (nanos 0)
// 直接返回nulll
return null;
// 延迟时间没到消费者可以等一会
// 把first赋值为null
first null;
// 如果等待的时间小于元素剩余的延迟时间消费者直接挂起。反正暂时拿不到但是不能保证后续是否有生产者添加一个新的数据我是可以拿到的。
// 如果已经有一个消费者在等待堆顶数据了我这边不做额外操作直接挂起即可。
if (nanos delay || leader ! null)nanos available.awaitNanos(nanos);
// 当前消费者的阻塞时间可以拿到数据并且没有其他消费者在等待堆顶数据
else {
// 拿到当前消费者的线程对象
Thread thisThread Thread.currentThread();
// 将leader设置为当前线程
leader thisThread;
try {
// 会让当前消费者阻塞这个元素的延迟时间
long timeLeft available.awaitNanos(delay);
// 重新计算当前消费者剩余的可阻塞时间。
nanos - delay - timeLeft;
} finally {
// 到了时间将leader设置为null
if (leader thisThread)
leader null;
}
}
}
}
} finally {
// 没有消费者在等待元素队列中的元素不为null
if (leader null q.peek() ! null)
// 只要当前没有leader在等并且队列有元素就需要再次唤醒消费者。、
// 避免队列有元素但是没有消费者处理的问题
available.signal();
// 释放锁lock.unlock();
}
}
5.4.4 take方法
这个是允许阻塞的但是可以一直等要么等到元素要么等到被中断。
public E take() throws InterruptedException {
// 正常加锁并且允许中断
final ReentrantLock lock this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 拿到元素
E first q.peek();
if (first null)
// 没有元素挂起。
available.await();
else {
// 有元素获取延迟时间。
long delay first.getDelay(NANOSECONDS);
// 判断延迟时间是不是已经到了
if (delay 0)
// 基于优先级队列的poll方法返回
return q.poll();first null;
// 如果有消费者在等就正常await挂起
if (leader ! null)
available.await();
// 如果没有消费者在等的堆顶数据我来等
else {
// 获取当前线程
Thread thisThread Thread.currentThread();
// 设置为leader代表等待堆顶的数据
leader thisThread;
try {
// 等待指定堆顶元素的延迟时间时长
available.awaitNanos(delay);
} finally {
if (leader thisThread)
// leader赋值null
leader null;
}
}
}
}
} finally {
// 避免消费者无线等来一个唤醒消费者的方法一般是其他消费者拿到元素走了之后并且延迟队列还有元素就执行if内部唤醒方法
if (leader null q.peek() ! null)
available.signal();
// 释放锁
lock.unlock();}
}