公司网站建设支出计入,这几年做网站怎么样,网站做ppt模板下载,易记域名网站大全阅读本文之前可以看一看 Java 多线程基础#xff1a; Java#xff1a;多线程#xff08;进程线程#xff0c;线程状态#xff0c;创建线程#xff0c;线程操作#xff09; Java#xff1a;多线程#xff08;同步死锁#xff0c;锁原子变量#xff0c;线程通信 Java多线程进程线程线程状态创建线程线程操作 Java多线程同步死锁锁原子变量线程通信线程池 1互斥锁
1.1可重入锁
锁的可重入性Reentrant Locking是指在同一个线程中已经获取锁的线程可以再次获取该锁而不会导致死锁。这种特性允许线程在持有锁的情况下可以递归地调用自身的同步方法或代码块而不会因为再次尝试获取相同的锁而被阻塞。显然通常的锁都要设计成可重入的。否则就会发生死锁。
synchronized关键字就是可重入锁。在一个 synchronized 方法 method1() 里面调用另外一个 synchronized 方法 method2() 。如果 synchronized 关键字不可重入那么再 method2() 处就会发生阻塞这显然不可行。
public synchronized void method1() {method2(); // 同一个线程可以再次进入 method2()
}public synchronized void method2() {// 执行某些操作
}Concurrent包中的与互斥锁ReentrantLock相关类之间的继承层次 Lock是一个接口其定义如下
lock()获取锁。如果锁已经被其他线程占用则当前线程会被阻塞直到锁被释放。lockInterruptibly()它允许线程在等待锁的过程中响应中断。如果线程在等待锁时被中断抛出 InterruptedException 并退出等待。tryLock()尝试获取锁但不会阻塞线程。如果锁定成功返回 true如果锁已被其他线程占用立即返回 false。tryLock(long time, TimeUnit unit)尝试在指定的时间内获取锁。如果锁在指定时间内被释放则返回 true 并成功获取锁否则返回 false。期间如果线程被中断会抛出 InterruptedException。unlock()释放锁。通常在获取锁之后的 finally 块中调用确保锁在任务完成后被释放避免死锁。newCondition()返回一个绑定到该锁的新 Condition 实例。Condition 提供了类似于 Object 的 wait、notify、notifyAll 方法的功能但更加灵活可以实现多条件等待/通知机制。
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException;void unlock();Condition newCondition();
}
ReentrantLock本身没有代码逻辑实现都在其内部类Sync中
public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;public void lock() {sync.lock();}public void lock() {sync.lock();}1.2公平锁非公平锁 lock() tryAcquire()
Sync是一个抽象类它有两个子类FairSync和NonfairSync分别对应公平锁和非公平锁。从下面的ReentrantLock否早方法可以看出会传入一个布尔类型的变量fair指定锁是公平锁还是非公平锁。默认设置的是非共公平锁是为了提高效率减少线程切换。
public ReentrantLock() {sync new NonfairSync();
}
public ReentrantLock(boolean fair) {sync fair ? new FairSync() : new NonfairSync();
}公平锁 公平锁是一种严格遵循先来先服务原则的锁机制。当多个线程争用同一个锁时锁会按照线程请求锁的顺序来分配即先请求的线程优先获取锁后请求的线程则需要等待前面的线程释放锁。
Lock fairLock new ReentrantLock(true); // true 表示使用公平锁static final class FairSync extends Sync {
//没有一上来就抢锁在这个函数内部排队是公平的。final void lock() {acquire(1);}
}//AbstractQueuedSynchronizer
public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}addWaiter(Node.EXCLUSIVE)该方法将当前线程添加到等待队列中Node.EXCLUSIVE表示该线程请求的是独占锁。它返回一个Node对象表示这个线程在队列中的位置。 acquireQueued(node, arg)这个方法接收先前创建的Node对象尝试在队列中获取锁。它通常会阻塞当前线程直到锁变为可用或者发生中断。 selfInterrupt()方法用于中断当前线程。当一个线程在获取锁时未能成功且已经被加入等待队列后如果该线程发现自己被阻塞了调用selfInterrupt()可以通知系统该线程需要被中断通常是为了响应外部中断请求。
//FairSync
protected final boolean tryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();//只有当c0没有线程持有锁if (c 0) {//检查当前线程前面是否有其他线程排队等待获取锁if (!hasQueuedPredecessors() //更改锁的状态compareAndSetState(0, acquires)) {//将当前线程设置为锁的独占持有者setExclusiveOwnerThread(current);return true;}}else if (current getExclusiveOwnerThread()) {int nextc c acquires;if (nextc 0)throw new Error(Maximum lock count exceeded);setState(nextc);return true;}return false;
}非公平锁 非公平锁是一种不保证锁分配顺序的锁机制。线程在尝试获取锁时可能会直接“插队”即使有其他线程已经在等待锁。如果锁是空闲的任何线程都可以获取它无论它们何时请求锁。
Lock nonFairLock new ReentrantLock(); // 默认构造方法即非公平锁static final class NonfairSync extends Sync {...final void lock() {//一上来就尝试修改state值也就是抢锁。//不考虑队列中有没有其他线程在排队。//如果成功表示当前线程获得了锁此时调用 setExclusiveOwnerThread(Thread.currentThread()) 设置当前线程为锁的拥有者。//如果 compareAndSetState 失败说明锁已经被其他线程占用此时调用 acquire(1) 方法进入获取锁的队列等待。if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
}//AbstractQueuedSynchronizer
public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
//NonfairSync
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();//如果state为0不考虑队列中没有等待的线程直接抢锁if (c 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果排他线程就是当前线程才直接设置state值。else if (current getExclusiveOwnerThread()) {int nextc c acquires;if (nextc 0) // overflowthrow new Error(Maximum lock count exceeded);setState(nextc);return true;}return false;}代码对比
1.3AbstractQueuedSynchronizer(AQS)
Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器AQS这个类非常重要该类的父类是AbstractOwnableSynchronizer。此处的锁具备synchronized功能即可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁需要几个核心要素
① 需要一个state变量标记该锁的状态。state变量至少有两个值0、1。对state变量的操作使用CAS保证线程安全。② 需要记录当前是哪个线程持有锁。③ 需要底层支持对一个线程进行阻塞或唤醒操作。④ 需要有一个队列维护所有阻塞的线程这个队列也必须是线程安全的无锁队列也需要使用CAS。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {// ...private transient Thread exclusiveOwnerThread; // 记录持有锁的线程
}public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java.io.Serializable {private volatile int state; // 记录锁的状态通过CAS修改state的值。 // ...
}针对要素 ① 和 ②在上面两个类中有对应的体现state取值可以是01还可以大于1就是为了支持锁的可重入性。例如同样一个线程调用5次lockstate会变成5然后调用5次unlockstate减为0。
当state0没有线程持有锁exclusiveOwnerThreadnull当state1有一个线程持有锁exclusiveOwnerThread该线程当state0说明该线程(exclusiveOwnerThread)重入了该锁。
对于要素 ③Unsafe类提供了阻塞或唤醒线程的一堆操作原语也就是park/unpark。
public final class Unsafe {public native void unpark(Object var1);public native void park(boolean var1, long var2);
}有一个LockSupport的工具类对这一原语做了简单封装
public class LockSupport { // ...private static final Unsafe U Unsafe.getUnsafe(); public static void park() {U.park(false, 0L); }public static void unpark(Thread thread) { if (thread ! null)U.unpark(thread); }
}在当前线程中调用park()该线程就会被阻塞在另外一个线程中调用unpark(Thread thread)传入一个被阻塞的线程就可以唤醒阻塞在park()地方的线程。 unpark(Thread thread)它实现了一个线程对另外一个线程的精准唤醒。notify也只是唤醒某一个线程但无法执行指定唤醒哪个线程。
针对要素 ④在AQS中利用双向链表和CAS实现了一个阻塞队列。
public abstract class AbstractQueuedSynchronizer { // ...static final class Node {volatile Thread thread; // 每个Node对应一个被阻塞的线程volatile Node prev;volatile Node next; // ...}private transient volatile Node head; private transient volatile Node tail; // ...
}阻塞队列是整个AQS核心中的核心。 head指向双向链表头部tail指向双向链表尾部。入队就是把新的Node加到tail后面然后对tail进行CAS操作出队就是对head进行CAS操作把head向后移一个位置。 初始时headtailNULL; 然后在往队列中加入阻塞的线程时会新建一个空的Node让head和tail都指向这个空的Node之后在后面加入被阻塞的线程对象。所以当headtail的时候说明队列为空。
1.4阻塞队列与唤醒机制⭐
addWaiter(…)方法将当前线程封装成一个 Node然后添加到等待队列的尾部。该方法的目的是让线程进入同步队列以便在适当的时机被唤醒或中断。
//AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {//创建节点尝试将节点追加到队列尾部。Node node new Node(Thread.currentThread(), mode);//获取tail节点将tail节点的next设置为当前节点。Node pred tail;//如果tail不存在就初始化队列。if (pred ! null) {node.prev pred;//先尝试加到队列尾部如果不成功则执行enq(node);if (compareAndSetTail(pred, node)) {pred.next node;return node;}}//enq内部会进行队列的初始化新建一个空的Node。然后不断尝试自旋直至成功把该Node加入队列尾部为止。enq(node);return node;
}在addWaiter(…)方法把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(…)方法完成。 线程一旦进入acquireQueued(…)就会被无限期阻塞即使有其他线程调用interrupt()方法也不能将其唤醒除非有其他线程释放了锁并且该线程拿到了锁才会从acquireQueued(…)返回。
进入acquireQueued(…)该线程被阻塞。在该方法返回的一刻就是拿到锁的那一刻也就是被唤醒的那一刻此时会删除队列的第一个元素head指针前移一个节点。
//AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {boolean failed true;try {boolean interrupted false;for (;;) {final Node p node.predecessor();//被唤醒如果自己在队列头部则尝试拿锁。if (p head tryAcquire(arg)) {//拿锁成功出队列setHead(node);p.next null; // help GCfailed false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())interrupted true;}} finally {if (failed)cancelAcquire(node);}
}首先acquireQueued(…)方法有一个返回值表示什么意思虽然该方法不会中断响应但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有则该方法返回true否则返回false。
//AbstractQueuedSynchronizer
public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
static void selfInterrupt() {Thread.currentThread().interrupt();
}当acquireQueued(…)返回true时会调用selfInterrupt()自己给自己发送中断信号也就是自己把自己的中断标志位设为true。之所以要这么做是因为自己在阻塞期间收到其他线程中断信号没有及时响应现在要进行补偿。这样一来如果该线程在lock代码块内部有调用sleep()之类的阻塞方法就可以抛出异常响应该中断信号。
阻塞就发生在下面这个方法中
//AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}线程调用park()方法自己把自己阻塞起来直到被其他线程唤醒该方法返回。
park()返回有两种情况
其他线程调用了unpark(Thread t)其他线程调用了t.interrupt()。这里要注意的是lock()不能响应中断但LockSupport.park()会响应中断。
也正因为LockSupport.park()可能被中断唤醒acquireQueued(…)方法才写了一个for死循环。唤醒之后如果发现自己排在队列头部就去拿锁如果拿不到锁则再次阻塞自己。不断循环重复此过程直到拿到锁。
被唤醒之后通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1返回fasle如果是情况2返回true。
1.5unlock()
unlock不区分公平还是非公平当前线程要释放锁先调用tryRelease(arg)方法如果返回true则取出head让head获取锁。
//ReentrantLock
public void unlock() {sync.release(1);
}//AbstractQueuedSynchronizer
public final boolean release(int arg) {//tryRelease(...)方法释放锁if (tryRelease(arg)) {Node h head;if (h ! null h.waitStatus ! 0)//unparkSuccessor(...)方法唤醒队列中的后继者unparkSuccessor(h);return true;}return false;
}tryRelease方法
//ReentrantLock
protected final boolean tryRelease(int releases) {int c getState() - releases;//只有锁的拥有者才有资格调用unlock()函数否则抛出异常if (Thread.currentThread() ! getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free false;//每次调用tryReleasestate值减1直到0才代表锁可以被成功释放if (c 0) {free true;setExclusiveOwnerThread(null);}//没有使用CAS而直接用set。因为是排他锁只有一个线程能调减state值。setState(c);return free;
}unparkSuccessor方法
//ReentrantLock
private void unparkSuccessor(Node node) {int ws node.waitStatus;if (ws 0)compareAndSetWaitStatus(node, ws, 0);Node s node.next;if (s null || s.waitStatus 0) {s null;for (Node t tail; t ! null t ! node; t t.prev)if (t.waitStatus 0)s t;}if (s ! null)LockSupport.unpark(s.thread);
}1.6trylock()
tryLock()实现基于调用非公平锁的tryAcquire(…)对state进行CAS操作如果操作成功就拿到锁如果操作不成功则直接返回false也不阻塞。
//ReentrantLock
public boolean tryLock() {return sync.nonfairTryAcquire(1);
}2读写锁
和互斥锁相比读写锁ReentrantReadWriteLock就是读线程和读线程之间不互斥。
2.1类继承层次
ReadWriteLock是一个接口内部由两个Lock接口组成。
public interface ReadWriteLock {Lock readLock();Lock writeLock();
}ReentrantReadWriteLock实现了该接口使用方式如下
ReadWriteLock readWriteLock new ReentrantReadWriteLock();
Lock readLock readWriteLock.readLock();
readLock.lock();
// 进行读取操作
readLock.unlock();Lock writeLock readWriteLock.writeLock();
writeLock.lock();
// 进行写操作
writeLock.unlock();也就是说当使用ReadWriteLock的时候并不是直接使用而是获得其内部的读锁和写锁然后分别调用lock/unlock。
2.2读写锁基本原理
从表面来看ReadLock和WriteLock是两把锁实际上它只是同一把锁的两个视图而已。什么叫两个视图呢可以理解为一把锁线程分为两类读线程和写线程。读线程和写线程之间不互斥可以同时拿到这把锁读线程之间不互斥、写线程之间互斥。
从下面的构造方法也可以看出readerLock和writerLock实际公用同一个sync对象。sync对象同互斥锁一样分为非公平锁和公平两种策略并继承自AQS。
public ReentrantReadWriteLock(boolean fair) {sync fair ? new FairSync() : new NonfairSync();readerLock new ReadLock(this);writerLock new WriteLock(this);
}同互斥锁一样读写锁也是用state变量表示锁状态的。只是state变量在这里的含义和互斥锁完全不同。在内部类Sync中对state变量进行重新定义如下所示
abstract static class Sync extends AbstractQueuedSynchronizer { // ...static final int SHARED_SHIFT 16;static final int SHARED_UNIT (1 SHARED_SHIFT);static final int MAX_COUNT (1 SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK (1 SHARED_SHIFT) - 1; // 持有读锁的线程的重入次数static int sharedCount(int c) { return c SHARED_SHIFT; } // 持有写锁的线程的重入次数static int exclusiveCount(int c) { return c EXCLUSIVE_MASK; } // ...private volatile int state;
}也就是把state变量拆成两半低16位用来记录写锁高16位用来记录读锁。但同一时间既然只能有一个线程写为什么还需要16位呢因为一个写线程可能多次重入。16位的数值范围是0到65535这意味着一个线程最多可以重入写锁65535次。这个范围通常已经足够大能够满足绝大多数场景中的需求高16位的值等于5既可以表示5个线程都拿到了该锁也可以表示一个读线程重入了5次。 为什么要把一个int类型变量拆成两半而不是用两个int型变量分别表示读锁和写锁的状态呢 CAS操作只能在一次操作中对一个内存地址的值进行比较和交换。无法用一次CAS同时操作两个int变量所以用来一个int型的高16位和低16位分别表示读锁和写锁的状态。 当state 0时说明既没有线程持有读锁也没有线程持有写锁当state !0时要么有线程持有读锁要么有线程持有写锁两者不能同时成立因为读和写互斥。这时再进一步通过sharedCount(state)和exclusiveCount(state)判断到底时读线程还是写线程持有了该锁。
2.3lock() unlock()
public static class ReadLock implements Lock, java.io.Serializable { // ...public void lock() {sync.acquireShared(1); }public void unlock() {sync.releaseShared(1); }// ...
}
public static class WriteLock implements Lock, java.io.Serializable { // ...public void lock() { sync.acquire(1); }public void unlock() { sync.release(1); }// ...
}acquire/release、acquireShared/releaseShared是AQS里面的两对模板方法。互斥锁和读写锁的写锁都是基于acquire/release模板方法来实现的。读写锁的读锁是基于acquireShared/releaseShared这对模板方法来实现的。这两对模板方法的代码如下
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // ...public final void acquire(int arg) {if (!tryAcquire(arg) // tryAcquire方法由多个Sync子类实现acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}public final void acquireShared(int arg) {if (tryAcquireShared(arg) 0) // tryAcquireShared方法由多个Sync子类实现doAcquireShared(arg);}public final boolean release(int arg) {if (tryRelease(arg)) { // tryRelease方法由多个Sync子类实现 Node h head;if (h ! null h.waitStatus ! 0) unparkSuccessor(h);return true; }return false; }public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // tryReleaseShared方法由多个Sync子类实现doReleaseShared();return true; }return false; }// ...
}将读/写、公平/非公平进行排列组合就有4种组合。如下上面的两个方法都是在Sync种实现的。Sync种的两个方法又是模板方法在NonfairSync和FairSync种分别有实现。最终的对应关系如下
读锁的公平实现Sync.tryAcquireShared() FairSync 中的 lock() tryAcquire()。tryAcquire())。读锁的非公平实现Sync.tryAcquireShared() NonfairSync中的 lock() tryAcquire()。写锁的公平实现Sync.tryAcquire() FairSync中的 lock() tryAcquire()。写锁的非公平实现Sync.tryAcquire() NonfairSync中的 lock() tryAcquire()。
/*** Nonfair version of Sync*/
static final class NonfairSync extends Sync {private static final long serialVersionUID -8159625535654395037L;// 写线程枪锁的时候是否应该阻塞final boolean writerShouldBlock() {// 写线程在抢锁之前永远不被阻塞非公平锁return false; // writers can always barge}// 读线程抢锁的时候是否应该阻塞final boolean readerShouldBlock() {// 读线程抢锁的时候当队列中第一个元素是写线程的时候要阻塞return apparentlyFirstQueuedIsExclusive();}
}/*** Fair version of Sync*/
static final class FairSync extends Sync {private static final long serialVersionUID -2274990926593161451L;// 写线程抢锁的时候是否应该阻塞final boolean writerShouldBlock() {// 写线程在抢锁之前如果队列中有其他线程在排队则阻塞。公平锁return hasQueuedPredecessors();}// 读线程抢锁的时候是否应该阻塞final boolean readerShouldBlock() {// 读线程在抢锁之前如果队列中有其他线程在排队阻塞。公平锁return hasQueuedPredecessors();}
}对于非公平读锁和写锁的实现策略稍有差异
写锁能抢锁前提是state0只有在没有其他线程持有读锁或写锁的情况下它才有机会去抢锁。或者state !0但哪个持有写锁的线程是自己再次重入。写线程是非公平的即writerShouldBlock()方法一直返回false。对于读线程假设当前线程被读线程持有然后其他读线程还非公平地一直去抢可能导致写线程永远拿不到锁所以对于读线程的非公平要做一些约束。当发现队列的第一个元素是写线程的时候读线程也要阻塞不能直接去抢。即偏向写线程。
【问题】读锁是共享的为啥还要区分公平和非公平 【答案】假设场景是图书馆有足够多的Java从入门到精通满足全球的人都可以同时借阅读锁。尽管有足够的书共享资源但是读请求还是需要排队因为系统的性能是有限的比如CPU和内存的处理能力。如果所有请求同时占用读锁可能会导致资源竞争和性能下降。在公平的情况下排队可以确保每个人都有机会顺利借书而非公平可能导致一些请求长时间得不到响应。若每个线程都获得读锁可能会导致过多的竞争和性能下降尤其在高并发情况下。
2.4WriteLock公平锁非公平锁
写锁是排他锁实现策略类似于互斥锁重写了tryAcquire/tryRelease方法。
protected final boolean tryAcquire(int acquires) {Thread current Thread.currentThread();int c getState();//写线程只能有一个但写线程可以多次重入int w exclusiveCount(c);//当c!0 说明有读线程或者写线程持有锁if (c ! 0) {// (Note: if c ! 0 and w 0 then shared count ! 0)// w 0, 说明锁被读线程持有只能返回w!0持有写锁的线程不是自己也只能返回。if (w 0 || current ! getExclusiveOwnerThread())return false;//16位用满了超过了最大重入次数。if (w exclusiveCount(acquires) MAX_COUNT)throw new Error(Maximum lock count exceeded);// Reentrant acquiresetState(c acquires);return true;}//公平锁实现和非公平锁实现只是writerShouldBlock()分别被FairSync和NonfairSync实现。if (writerShouldBlock() ||!compareAndSetState(c, c acquires))return false;//抢锁成功后将ownerThread设成自己。setExclusiveOwnerThread(current);return true;
}c!0 and w0说明当前一定是读线程拿着锁写锁一定拿不到返回false。c!0 and w!0说明当前一定是写线程拿着锁执行current!getExclusive-OwnerThread()的判断发现ownerThread不是自己返回false。c!0 and w!0 and current getExclusiveOwnerThread()才会走到 if (w exclusiveCount(acquires) MAX_COUNT)。判断重入次数重入次数超过最大值抛出异常。if (c0)说明当前既没有读线程也没有写线程持有该锁。可以通过CAS操作开抢。
tryRelease()分析
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc getState() - releases;boolean free exclusiveCount(nextc) 0;if (free)setExclusiveOwnerThread(null);//因为写锁是排他的在当前线程持有写锁的时候其他线程不会持有写锁也不会持有读锁。所以这里对state值的调减不需要CAS操作直接减1即可。setState(nextc);return free;
}tryLock和lock方法不区分公平/非公平。
//ReentrantReadWriteLock
public boolean tryLock( ) {return sync.tryWriteLock();
}
final boolean tryWriteLock() {
Thread current Thread.currentThread();int c getState();//当state不是0的时候如果写线程获取锁的个数是0或者写线程不是当前线程则返回枪锁失败。if (c ! 0) {int w exclusiveCount(c);if (w 0 || current ! getExclusiveOwnerThread())return false;if (w MAX_COUNT)throw new Error(Maximum lock count exceeded);}//只要不是上面的情况则通过CAS设置state的值。//如果设置成功就将排他线程设置为当前线程并返回true。if (!compareAndSetState(c, c 1))return false;setExclusiveOwnerThread(current);return true;
}2.5ReadLock公平锁非公平锁
读锁是共享锁重写了tryAcquireShared/tryReleaseShared方法其实现策略和排他锁有很大差异。
//ReentrantReadWriteLock
protected final int tryAcquireShared(int unused) {Thread current Thread.currentThread();int c getState();//写锁被某线程持有并且这个线程还不是自己读锁肯定拿不到直接返回。if (exclusiveCount(c) ! 0 getExclusiveOwnerThread() ! current)return -1;int r sharedCount(c);//公平和非公平的差异就在于这个函数if (!readerShouldBlock() r MAX_COUNT //CAS拿读锁高16位加1compareAndSetState(c, c SHARED_UNIT)) {//r之前等于0说明这是第一个拿到读锁的线程if (r 0) {firstReader current;firstReaderHoldCount 1;//不是第一个} else if (firstReader current) {firstReaderHoldCount;} else {HoldCounter rh cachedHoldCounter;if (rh null || rh.tid ! getThreadId(current))cachedHoldCounter rh readHolds.get();else if (rh.count 0)readHolds.set(rh);rh.count;}return 1;}//上面拿读锁失败进入这个函数不断自旋拿读锁return fullTryAcquireShared(current);
}if (exclusiveCount(c) ! 0 getExclusiveOwnerThread() ! current)return -1;低16位不等于0说明有写线程持有锁并且只有当ownerThread !自己时才返回-1。如果currentownerThread则这段代码不会返回。这是因为一个写线程可以再次去拿读锁也就是说一个线程在持有WriteLock后再去调用ReadLock.lock也是可以的。
compareAndSetState(c, c SHARED_UNIT))把state的高16位加1读锁的状态但因为是在高16位必须把1左移16位再加1。
firstReadercacheHoldConunter之类的变量只是一些统计变量在ReentrantRead-WriteLock对外的一些查询函数中会用到例如查询持有读锁的线程列表但对整个读写互斥机制没有影响。
protected final boolean tryReleaseShared(int unused) { Thread current Thread.currentThread();// ...for (;;) {int c getState();int nextc c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free.return nextc 0; }
}因为读锁是共享锁多个线程会同时持有读锁所以对读锁的释放不能直接减1而是需要通过一个for循环 CAS操作不断重试。这是tryReleaseShared和tryReleased的根本差异所在。
3Condition
3.1Condition与Lock的关系
Condition与Lock配合使用提供了比传统的Object.wait()和Object.notify()更灵活的线程协调机制。Condition的主要作用是允许一个或多个线程在特定条件下等待直到被其他线程通知。与Object的等待/通知机制相比Condition提供了以下优势
多条件支持一个锁可以有多个Condition对象每个对象可以代表一个不同的等待条件。更细粒度的控制使用Condition线程可以在等待时释放锁并在被唤醒后重新获得锁避免了锁的持有时间过长的问题。更好的可读性和维护性代码逻辑更加清晰易于理解。
【案例】Condition实现生产者和消费者对共享缓冲区的访问
ReentrantLock lock new ReentrantLock();
Condition notFull lock.newCondition();
Condition notEmpty lock.newCondition();
ListObject buffer new ArrayList();
int capacity 10;public void produce() {lock.lock();try {while (buffer.size() capacity) {notFull.await(); // 等待缓冲区不满}buffer.add(new Object());notEmpty.signal(); // 通知消费者} finally {lock.unlock();}
}public void consume() {lock.lock();try {while (buffer.isEmpty()) {notEmpty.await(); // 等待缓冲区不空}buffer.remove(0);notFull.signal(); // 通知生产者} finally {lock.unlock();}
}public interface Condition {void await() throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException; long awaitNanos(long nanosTimeout) throws InterruptedException;void awaitUninterruptibly();boolean awaitUntil(Date deadline) throws InterruptedException; void signal();void signalAll();
}wait()/notify()必须和synchronized一起使用Condition也必须和Lock一起使用。因此在Lock的接口中有一个与Condition相关的接口
public interface Lock { void lock();void lockInterruptibly() throws InterruptedException; // 所有的Condition都是从Lock中构造出来的Condition newCondition();boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock();
}3.2Condition数组实现阻塞队列
以ArrayBlockingQueue为例。如下面所示为一个用数组实现的阻塞队列执行put(…)操作的时候队列满了生产者线程被阻塞执行take()的时候队列为空消费者线程被阻塞。
public class ArrayBlockingQueueE extends AbstractQueueE implements BlockingQueueE, java.io.Serializable { //...final Object[] items;int takeIndex;int putIndex;int count;// 一把锁两个条件final ReentrantLock lock;private final Condition notEmpty; private final Condition notFull;public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity 0)throw new IllegalArgumentException(); this.items new Object[capacity];// 构造器中创建一把锁加两个条件lock new ReentrantLock(fair); // 构造器中创建一把锁加两个条件notEmpty lock.newCondition(); // 构造器中创建一把锁加两个条件notFull lock.newCondition(); }public void put(E e) throws InterruptedException { Objects.requireNonNull(e);final ReentrantLock lock this.lock; lock.lockInterruptibly();try {while (count items.length) // 非满条件阻塞队列容量已满 notFull.await();enqueue(e); } finally {lock.unlock(); }}private void enqueue(E e) { final Object[] items this.items; items[putIndex] e;if (putIndex items.length) putIndex 0; count;// put数据结束通知消费者非空条件 notEmpty.signal();}public E take() throws InterruptedException { final ReentrantLock lock this.lock;lock.lockInterruptibly(); try {while (count 0)// 阻塞于非空条件队列元素个数为0无法消费 notEmpty.await();return dequeue(); } finally {lock.unlock(); }}private E dequeue() { final Object[] items this.items; SuppressWarnings(unchecked)E e (E) items[takeIndex]; items[takeIndex] null;if (takeIndex items.length) takeIndex 0; count--;if (itrs ! null)itrs.elementDequeued();// 消费成功通知非满条件队列中有空间可以生产元素了。 notFull.signal();return e; }// ...
}3.3Condition实现原理
可以发现Condition的使用很方便避免了wait/notify的生产者通知生产者消费者通知消费者的问题。具体实现如下
由于Condition必须和Lock一起使用所以Condition的实现也是Lock的一部分。首先查看互斥锁和读写锁中Condition的构造方法
public class ReentrantLock implements Lock, java.io.Serializable { // ...public Condition newCondition() {return sync.newCondition(); }
}public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { // ...private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; // ...public static class ReadLock implements Lock, java.io.Serializable { // 读锁不支持Conditionpublic Condition newCondition() {// 抛异常throw new UnsupportedOperationException(); }}public static class WriteLock implements Lock, java.io.Serializable { // ...public Condition newCondition() {return sync.newCondition(); }// ... }// ...
}首先读写锁中的ReadLock是不支持Condition的读写锁的写锁和互斥锁都支持Condition。虽然它们各自调用的是自己的内部类Sync但内部类Sync都继承自AQS。因此上面的代码sync.newCondition最终都调用了AQS中的newCondition
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {public class ConditionObject implements Condition, java.io.Serializable {// Condition的所有实现都在ConditionObject类中 }
}public class ReentrantLock implements Lock, java.io.Serializable {abstract static class Sync extends AbstractQueuedSynchronizer { final ConditionObject newCondition() {return new ConditionObject(); }}
}每一个Condition对象上面都阻塞了多个线程。因此在ConditionObject内部也有一个双向链表组成的队列如下
public class ConditionObject implements Condition, java.io.Serializable { //transient 指示某个字段在序列化对象时不应被序列化。private transient Node firstWaiter;private transient Node lastWaiter;
}
static final class Node {volatile Node prev;volatile Node next;volatile Thread thread; Node nextWaiter;
}3.4await()实现分析
public final void await() throws InterruptedException {// 刚要执行await()操作收到中断信号抛异常if (Thread.interrupted())throw new InterruptedException();// 加入Condition的等待队列Node node addConditionWaiter();// 阻塞在Condition之前必须先释放锁否则会死锁int savedState fullyRelease(node);int interruptMode 0;while (!isOnSyncQueue(node)) {// 阻塞当前线程LockSupport.park(this);if ((interruptMode checkInterruptWhileWaiting(node)) ! 0)break;}// 重新获取锁if (acquiraeQueued(node, savedState) interruptMode ! THROW_IE)interruptMode REINTERRUPT;if (node.nextWaiter ! null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode ! 0)// 被中断唤醒抛中断异常reportInterruptAfterWait(interruptMode);
}线程调用await()的时候肯定已经先拿到了锁。所以在addConditionWaiter()内部对这个双向链表的操作不需要执行CAS操作线程是安全的代码如下
private Node addConditionWaiter() {// ...Node t lastWaiter;// ...Node node new Node(Node.CONDITION);if (t null)firstWaiter node;elset.nextWaiter node;lastWaiter node;return node;
}在线程执行wait操作之前必须先释放锁。也就是fullyRelease(node)否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。 线程从wait中被唤醒后必须用acquiraeQueued(node, savedState)方法重新拿锁。 checkInterruptWhileWaiting(node)代码在park(this)代码之后是为了检测在park期间是否收到过中断信号。当线程从park中醒来时有两种可能 一种是其他线程调用了unpark另一种是收到中断信号。 这里的await()方法是可以响应中断所以当发现自己被中断唤醒的而不是被unpark唤醒时会直接退出while循环await()方法也会返回。 isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候Node值在Condition的队列里而不在AQS的队列里。但执行notify操作的时候会放进AQS的同步队列。
3.5awaitUniterruptibly()实现分析
与await()不同awaitUninterruptibly()不会响应中断其方法的定义中不会有中断异常抛出下面分析其实现和await()的区别。
public final void awaitUninterruptibly() {Node node addConditionWaiter();int savedState fullyRelease(node);boolean interrupted false;while (!isOnSyncQueue(node)) {LockSupport.park(this);//当线程唤醒后如果被中断过仅记录不处理继续进行while循环if (Thread.interrupted())interrupted true;}if (acquireQueued(node, savedState) || interrupted)selfInterrupt();
}可以看出整体代码和await()类似区别在于收到异常后不会抛出异常而是继续执行while循环。
3.6notify()实现分析
public final void signal() {// 只有持有锁的线程才有资格调用signal()方法if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first firstWaiter;if (first ! null)// 发起通知doSignal(first);
}// 唤醒队列中的第1个线程
private void doSignal(Node first) {do {if ( (firstWaiter first.nextWaiter) null)lastWaiter null;first.nextWaiter null;} while (!transferForSignal(first) (first firstWaiter) ! null);
}final boolean transferForSignal(Node node) {if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))return false;// 先把Node放入互斥锁的同步队列中再调用unpark方法Node p enq(node);int ws p.waitStatus;if (ws 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}同await()一样在调用notify()的时候必须先拿到锁否则就会抛出上面的异常是因为前面执行await()的时候把锁释放了。然后从队列中取出firstWaiter唤醒它。在通过调用unpark唤醒它之前先用enq(node)方法把这个Node放入AQS的锁对应的阻塞队列中。也正因为如此才有了await()方法里面的判断条件while (!isOnSyncQueue(node)) 这个判断条件满足说明await线程不是被中断的而是被unpark唤醒的。
4StampedLock
4.1StampedLock使用场景
锁并发度ReentrantLock读与读互斥读与写互斥写与写互斥ReentrantReadWriteLock读与读不互斥读与写互斥写与写互斥StampedLock读与读不互斥读与写不互斥写与写互斥
可以看到从ReentrantLock到StampedLock并发度依次提高。StampedLock是如何做到“读”与“写”也不互斥、并发地访问的呢MySQL 高并发的核心机制 MVCC也就是一份数据多个版本此处的StampedLock有异曲同工之妙。
另一方面因为ReentrantLock采用的是“悲观读”的策略当第一个读线程拿到锁之后第二个、第三个读线程还可以拿到锁使得写线程一直拿不到锁可能导致写线程“饿死”。 虽然在其公平或非公平的实现中都尽量避免这种情形但还有可能发生。StampedLock引入了“乐观读”策略读的时候不加读锁读出来发现数据被修改了再升级为“悲观读”相当于降低了“读”的地位把抢锁的天平往“写”的一方倾斜了一下避免写线程被饿死。
class Point {private double x,y;private final StampedLock s1 new StampedLock();//多个线程调用该函数修改x,y的值void move(double deltaX, double deltaY) {long stamp s1.writeLock();try {x deltaX;y deltaY;} finally {s1.unlockWrite(stamp);}}//多个线程调用该函数求距离。使用“乐观读”将共享变量拷贝到线程栈中。//读的期间其他线程修改了共享变量读到脏数据放弃。升级为悲观锁。double distanceFromOrigin() {long stamp s1.tryOptimisticRead();double currentX x, currentY y;if (!s1.validate(stamp)) {stamp s1.readLock();try {currentX x;currentY y;} finally {s1.unlockRead(stamp);}}return Math.sqrt(currentX*currentX currentY*currentY);}
}首先执行move操作的时候要加锁。这个用法和ReadWriteLock的用法没有区别写操作和写操作也是互斥的。关键在于读的时候用了一个“乐观读”sl.tryOptimisticRead()相当于在读之前给数据的状态做了一个“快照”。然后把数据拷贝到内存里面在用之前再比对一次版本号。如果版本号变了则说明在读的期间有其他线程修改了数据。读出来的数据废弃重新获取读锁。 关键代码就是下面这三行
//在读之前获取数据的版本号。
//读将一份数据拷贝到线程的栈内存中
//读之后将读之前的版本号和当前版本号比对判断读出来数据是否可以使用期间没有被其他线程修改
long stamp s1.tryOptimisticRead();
double currentX x, currentY y;
if (!s1.validate(stamp)) {要说明的是这三行关键代码对顺序非常敏感不能有重排序。因为 state 变量已经是volatile所以可以禁止重排序但stamp并不是volatile 的。为此在 validate(stamp) 函数里面插入内存屏障。
4.2“乐观读”的实现原理
首先StampedLock是一个读写锁因此也会像读写锁那样把一个state变量分成两半分别表示读锁和写锁的状态。同时它还需要一个数据的version。但正如前面所说一次CAS没有办法操作两个变量所以这个state变量本身同时也表示了数据的version。下面先分析state变量。
public class StampedLock implements java.io.Serializable {private static final long RUNIT 1L;//第8位表示写锁private static final long WBIT 1L LG_READERS;//最低的7位表示读锁private static final long RBITS WBIT - 1L;//读锁的数目private static final long RFULL RBITS - 1L;//读锁和写锁的状态整合到一起private static final long ABITS RBITS | WBIT;private static final long SBITS ~RBITS; // note overlap with ABITS// Initial value for lock state; avoid failure value zeroprivate static final long ORIGIN WBIT 1;//state的初始值private transient volatile long state;用最低的8位表示读和写的状态其中第8位表示写锁的状态最低的7位表示读锁的状态。因为写锁只有一个bit位所以写锁是不可重入的。 初始值不为0而是把WBIT 向左移动了一位也就是上面的ORIGIN 常量构造函数如下所示。
// Initial value for lock state; avoid failure value zero
private static final long ORIGIN WBIT 1;public StampedLock() {state ORIGIN;
}为什么state的初始值不设为0呢初始值不是0的原因主要是为了确保在未加锁时写锁的版本号和乐观读锁的戳能明确表示出状态。
//在读之前获取数据的版本号。
//读将一份数据拷贝到线程的栈内存中
//读之后将读之前的版本号和当前版本号比对判断读出来数据是否可以使用期间没有被其他线程修改
long stamp s1.tryOptimisticRead();
double currentX x, currentY y;
if (!s1.validate(stamp)) {public long tryOptimisticRead() {long s;return (((s state) WBIT) 0L) ? (s SBITS) : 0L;
}
public boolean validate(long stamp) {U.loadFence();return (stamp SBITS) (state SBITS);
}当stateWBIT!0的时候说明有线程持有写锁上面的tryOptimisticRead会永远返回0。这样再调用validate(stamp)也就是validate(0)也会永远返回false。当有线程持有写锁的时候validate永远返回false无论写线程是否释放了写锁。因为无论是否释放了state回到初始值写锁state值都不为0所以validate(0)永远为false。
【问题】为什么上面的validate()函数不直接比较stampstate而要比较stateSBITSstateSBITS 呢 【答案】因为读锁和读锁是不互斥的所以即使在“乐观读”的时候state 值被修改了但如果它改的是第7位validate()还是会返回true。
另外要说明的一点是上面使用了内存屏障 U.loadFence()是因为在这行代码的下一行里面的stamp、SBITS变量不是volatile 的由此可以禁止其和前面的currentXXcurrentYY进行重排序。通过上面的分析可以发现state的设计非常巧妙。只通过一个变量既实现了读锁、写锁的状态记录还实现了数据的版本号的记录。
4.3悲观读/写“阻塞”和“自旋”策略实现差异
和ReadWriteLock一样StampedLock也要进行悲观的读锁和写锁操作。不过它不是基于AQS实现的而是内部重新实现了一个阻塞队列。
static final class WNode {volatile WNode prev;volatile WNode next;volatile WNode cowait; // list of linked readersvolatile Thread thread; // non-null while possibly parkedvolatile int status; // 0, WAITING, or CANCELLEDfinal int mode; // RMODE or WMODEWNode(int m, WNode p) { mode m; prev p; }
}
/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;这个阻塞队列和 AQS 里面的很像。刚开始的时候wheadwtailNULL然后初始化建一个空节点whead和wtail都指向这个空节点之后往里面加入一个个读线程或写线程节点。但基于这个阻塞队列实现的锁的调度策略和AQS很不一样也就是“自旋”。
在AQS里面当一个线程CAS state失败之后会立即加入阻塞队列并且进入阻塞状态。但在StampedLock中CAS state失败之后会不断自旋自旋足够多的次数之后如果还拿不到锁才进入阻塞状态。为此根据CPU的核数定义了自旋次数的常量值。如果是单核的CPU线程被调度的上下文切换可能会使自旋锁不如传统的阻塞锁有效。在多核情况下自旋策略可以更有效地利用 CPU 资源因为多个线程可以同时运行。
private static final int NCPU Runtime.getRuntime().availableProcessors();private static final int SPINS (NCPU 1) ? 1 6 : 0;下面以写锁的加锁也就是StampedLock的writeLock()函数为例来看一下自旋的实现。 public long writeLock() {long s, next; return ((((s state) ABITS) 0L U.compareAndSwapLong(this, STATE, s, next s WBIT)) ?next : acquireWrite(false, 0L));
}如上面代码所示当stateABITS0的时候说明既没有线程持有读锁也没有线程持有写锁此时当前线程才有资格通过CAS操作state。若操作不成功则调用acquireWrite()函数进入阻塞队列并进行自旋这个函数是整个加锁操作的核心代码如下。
private long acquireWrite(boolean interruptible, long deadline) {WNode node null, p;//入队列时自选for (int spins -1;;) { // spin while enqueuinglong m, s, ns;if ((m (s state) ABITS) 0L) {if (U.compareAndSwapLong(this, STATE, s, ns s WBIT))//自旋的时候拿到了锁函数返回return ns;}else if (spins 0)spins (m WBIT wtail whead) ? SPINS : 0;else if (spins 0) {if (LockSupport.nextSecondarySeed() 0)//不断自旋以一定的概率把spins值往下累减--spins;}else if ((p wtail) null) { //初始化队列WNode hd new WNode(WMODE, null);if (U.compareAndSwapObject(this, WHEAD, null, hd))wtail hd;}else if (node null)node new WNode(WMODE, p);else if (node.prev ! p)node.prev p;else if (U.compareAndSwapObject(this, WTAIL, p, node)) {p.next node;//for循环唯一的breakCAS tail成功成功加入队列尾部才会退出for循环break;}}for (int spins -1;;) {WNode h, np, pp; int ps;if ((h whead) p) {if (spins 0)spins HEAD_SPINS;else if (spins MAX_HEAD_SPINS)spins 1;for (int k spins;;) { // spin at headlong s, ns;//再次尝试拿锁if (((s state) ABITS) 0L) {if (U.compareAndSwapLong(this, STATE, s,ns s WBIT)) {whead node;node.prev null;return ns;}}else if (LockSupport.nextSecondarySeed() 0 --k 0) //不断自旋break;}}else if (h ! null) { // help release stale waitersWNode c; Thread w;//自己从阻塞中唤醒然后唤醒cowait中所有reader线程while ((c h.cowait) ! null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) (w c.thread) ! null)U.unpark(w);}}if (whead h) {if ((np node.prev) ! p) {if (np ! null)(p np).next node; // stale}else if ((ps p.status) 0)U.compareAndSwapInt(p, WSTATUS, 0, WAITING);else if (ps CANCELLED) {if ((pp p.prev) ! null) {node.prev pp;pp.next node;}}else {long time; // 0 argument to park means no timeoutif (deadline 0L)time 0L;else if ((time deadline - System.nanoTime()) 0L)return cancelWaiter(node, node, false);Thread wt Thread.currentThread();U.putObject(wt, PARKBLOCKER, this);node.thread wt;if (p.status 0 (p ! h || (state ABITS) ! 0L) whead h node.prev p)//进入阻塞状态之后被另外一个线程release唤醒接着往下执行这个for循环U.park(false, time);node.thread null;U.putObject(wt, PARKBLOCKER, null);if (interruptible Thread.interrupted())return cancelWaiter(node, node, true);}}}}整个acquireWrite()函数是两个大的for循环内部实现了非常复杂的自旋策略。在第一个大的for循环里面目的就是把该Node加入队列的尾部一边加入一边通过CAS操作尝试获得锁。如果获得了整个函数就会返回如果不能获得锁会一直自旋直到加入队列尾部。
在第二个大的for循环里也就是该Node已经在队列尾部了。这个时候如果发现自己刚好也在队列头部说明队列中除了空的Head节点就是当前线程了。此时再进行新一轮的自旋直到达到MAX_HEAD_SPINS次数然后进入阻塞。这里有一个关键点要说明当release()函数被调用之后会唤醒队列头部的第1个元素此时会执行第二个大的for循环里面的逻辑也就是接着for循环里面park()函数后面的代码往下执行。
另外一个不同于AQS的阻塞队列的地方是在每个WNode里面有一个cowait指针用于串联起所有的读线程。例如队列尾部阻塞的是一个读线程 1现在又来了读线程 2、3那么会通过cowait指针把1、2、3串联起来。1被唤醒之后2、3也随之一起被唤醒因为读和读之间不互斥。
释放锁 和读写锁的实现类似也是做了两件事情一是把state变量置回原位二是唤醒阻塞队列中的第一个节点。节点被唤醒之后会继续执行上面的第二个大的for循环自旋拿锁。如果成功拿到则出队列如果拿不到则再次进入阻塞等待下一次被唤醒。
// java.util.concurrent.locks.StampedLock#unlockWrite
public void unlockWrite(long stamp) {WNode h;if (state ! stamp || (stamp WBIT) 0L)throw new IllegalMonitorStateException();state (stamp WBIT) 0L ? ORIGIN : stamp;if ((h whead) ! null h.status ! 0)release(h);
}
// 唤醒队列的队首节点【头结点whead的后继节点】
private void release(WNode h) {if (h ! null) {WNode q; Thread w;U.compareAndSwapInt(h, WSTATUS, WAITING, 0); // 将头结点状态从-1变为0标识要唤醒其后继节点if ((q h.next) null || q.status CANCELLED) { // 判断头结点的后继节点是否为null或状态为取消for (WNode t wtail; t ! null t ! h; t t.prev) // 从队尾查找距头结点最近的状态为等待的节点if (t.status 0)q t; // 赋值}if (q ! null (w q.thread) ! null)U.unpark(w); // 唤醒队首节点}
}