深圳南山企业网站建设搜收录网
同步工具类:Phaser
- 介绍
- 特性
- 动态调整线程个数
- 层次Phaser
- 源码分析
- state 变量解析
- 构造函数对state变量赋值
- 阻塞方法
- arrive()
- awaitAdvance()
- 业务场景
- 实现CountDownLatch功能
- 代码
- 测试结果
- 实现 CyclicBarrier功能
- 代码展示
- 测试结果
- 总结
介绍
一个可重复使用的同步屏障,功能类似于CyclicBarrier和CountDownLatch,但支持更灵活的使用。该工具类是 JDK 1.7才引入的。功能比 CyclicBarrier 和 CountDownLatch 都要强大。
CyclicBarrier 和 CountDownLatch 的学习地址
特性
动态调整线程个数
CyclicBarrier 所要同步的线程个数是在构造函数中指定的,之后不能更改,而Phaser可以在运行期间动态地调整要同步的线程个数。用来修改同步线程个数的函数有:
public int register() {//注册一个线程return doRegister(1);}
public int bulkRegister(int parties) {if (parties < 0)throw new IllegalArgumentException();if (parties == 0)return getPhase();//注册多个线程return doRegister(parties);}
public int arriveAndDeregister() {//解注册,减少线程个数return doArrive(ONE_DEREGISTER);}
层次Phaser
即 多个Phaser 可以组成 如下的树状结构,可以通过在构造函数中传入父Phaser 来实现
public Phaser(Phaser parent) {//传入Phaser 作为 该对象的父 节点this(parent, 0);}
如上图,如果传Phaser 参数过来,对应的线程数量会相加。对于树状Phaser 上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser 是一样的,具体来讲,Phaser 并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册,当子Phaser中注册的参与者数量等于0时,会自动向父节点解除注册。父Phaser把子Phaser当作一个正常参与的线程即可。
源码分析
state 变量解析
Phaser 没有基于AQS来实现,但具备AQS的核心特性:state 变量,CAS 操作,阻塞队列等。
首先看下state 变量。
/*** Primary state representation, holding four bit-fields:* state 分为了四个部分。* unarrived -- the number of parties yet to hit barrier (bits 0-15)* parties -- the number of parties to wait (bits 16-31)* phase -- the generation of the barrier (bits 32-62)* terminated -- set if barrier is terminated (bit 63 / sign)** Except that a phaser with no registered parties is* distinguished by the otherwise illegal state of having zero* parties and one unarrived parties (encoded as EMPTY below).** To efficiently maintain atomicity, these values are packed into* a single (atomic) long. Good performance relies on keeping* state decoding and encoding simple, and keeping race windows* short.** All state updates are performed via CAS except initial* registration of a sub-phaser (i.e., one with a non-null* parent). In this (relatively rare) case, we use built-in* synchronization to lock while first registering with its* parent.** The phase of a subphaser is allowed to lag that of its* ancestors until it is actually accessed -- see method* reconcileState.*/private volatile long state;
从英文注释中我们可以得知:这个64位的state变量被拆分成4部分。如图所示
最高位0表示未同步完成,1表示同步完成。初始最高位为0.可以看下如下几个函数对state 的各个部分组成的获取。获取当前轮数(32-62位)
/*** Returns the current phase number. The maximum phase number is* {@code Integer.MAX_VALUE}, after which it restarts at* zero. Upon termination, the phase number is negative,* in which case the prevailing phase prior to termination* may be obtained via {@code getPhase() + Integer.MIN_VALUE}.** @return the phase number, or a negative value if terminated*/public final int getPhase() {//无符号右移(高位补0) 32位。//然后强制int .则最高位还是 是否为同步信息。//如果是负数,说明最高位是1,已经完成了同步return (int)(root.state >>> PHASE_SHIFT);}private static final int PHASE_SHIFT = 32;
判断当前轮数同步是否完成。
/*** Returns {@code true} if this phaser has been terminated.** @return {@code true} if this phaser has been terminated*/public boolean isTerminated() {//判断整个值是否是负数即可。最高位也是符号位。return root.state < 0L;}
获取总注册线程数
/*** Returns the number of parties registered at this phaser.** @return the number of parties*/public int getRegisteredParties() {return partiesOf(state);}private static int partiesOf(long s) {//先将s 转成int.相当于把高位 是否完成同步和轮数舍弃了。//只剩下低位的总线程数和未达到线程数。然后向右无符号右移16位(高位补0)。//就只剩下了总线程数return (int)s >>> PARTIES_SHIFT;}private static final int PARTIES_SHIFT = 16;
获取未达到的线程数
/*** Returns the number of registered parties that have not yet* arrived at the current phase of this phaser. If this phaser has* terminated, the returned value is meaningless and arbitrary.** @return the number of unarrived parties*/public int getUnarrivedParties() {return unarrivedOf(reconcileState());}
private static int unarrivedOf(long s) {// 首先将state强转为int,舍弃高位的 是否完成同步和轮数信息int counts = (int)s;//不为空的话,则和 0xffff 进行与操作,截取 低16位。即未到达的线程数return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);}private static final int EMPTY = 1;private static final int UNARRIVED_MASK = 0xffff; // to mask ints
构造函数对state变量赋值
如果能理解state 变量的分布图。64位的划分,划分为4个部分。 那么就很容易理解构造函数的初始了。
/*** Creates a new phaser with the given parent and number of* registered unarrived parties. When the given parent is non-null* and the given number of parties is greater than zero, this* child phaser is registered with its parent.** @param parent the parent phaser* @param parties the number of parties required to advance to the* next phase* @throws IllegalArgumentException if parties less than zero* or greater than the maximum number of parties supported*/public Phaser(Phaser parent, int parties) {//初始化未到达的线程数,不能大于2 的16次方。上面有个state 变量划分图if (parties >>> PARTIES_SHIFT != 0)throw new IllegalArgumentException("Illegal number of parties");//初始轮数为0int phase = 0;this.parent = parent;//判断parent 是否为null。不为null.则把自己注册到父对象中if (parent != null) {final Phaser root = parent.root;this.root = root;this.evenQ = root.evenQ;this.oddQ = root.oddQ;if (parties != 0)phase = parent.doRegister(1);}else {this.root = this;this.evenQ = new AtomicReference<QNode>();this.oddQ = new AtomicReference<QNode>();}//如果parties为0,则赋值EMPTY.//不为0,则将 phare 左移 32位 轮数赋值// 将parties 左移16位 总线程数数,初始值和未达到数一样// parties 未达到线程数// 最后将三个数 进行或运算 (只有有一个是1,则结果为1)this.state = (parties == 0) ? (long)EMPTY :((long)phase << PHASE_SHIFT) |((long)parties << PARTIES_SHIFT) |((long)parties);}private static final int PARTIES_SHIFT = 16;private static final int PHASE_SHIFT = 32;private static final int EMPTY = 1;
阻塞方法
如下图所示,右边的主线程会调用awaitAdvance()进行阻塞。左边的arrive()会对state进行cas的累减操作。当未到达的线程数减到0时,唤醒右边阻塞的主线程。
在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表。Treiber Stack是一个单向链表,出栈和入栈都在链表头部,所以只需要一个head指针,而不需要tail指针。
Treiber Stack 代码
static final class QNode implements ForkJoinPool.ManagedBlocker {final Phaser phaser;final int phase;final boolean interruptible;final boolean timed;boolean wasInterrupted;long nanos;final long deadline;volatile Thread thread; // nulled to cancel wait 每个Node 对应一个Thread 对象QNode next; //单项链表,QNode(Phaser phaser, int phase, boolean interruptible,boolean timed, long nanos) {this.phaser = phaser;this.phase = phase;this.interruptible = interruptible;this.nanos = nanos;this.timed = timed;this.deadline = timed ? System.nanoTime() + nanos : 0L;thread = Thread.currentThread();}
}
/*** Heads of Treiber stacks for waiting threads. To eliminate* contention when releasing some threads while adding others, we* use two of them, alternating across even and odd phases.* Subphasers share queues with root to speed up releases.*///为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Stack。private final AtomicReference<QNode> evenQ;private final AtomicReference<QNode> oddQ;private AtomicReference<QNode> queueFor(int phase) {//当phase 为奇数的时候,阻塞线程放在addQ里面// 为偶数的时候,阻塞线程放在eventQ里面return ((phase & 1) == 0) ? evenQ : oddQ;}
arrive()
对state 变量进行操作,然后唤醒线程。
arrive()和arriveAndDeregister()内部调用的都是doArrive() 函数。区别在于前者只是把未达到线程数减一。后者则把未到达线程数和下一轮的总线程数都减一。
public int arrive() {return doArrive(ONE_ARRIVAL);}private static final int ONE_ARRIVAL = 1;
public int arriveAndDeregister() {return doArrive(ONE_DEREGISTER);}//65536private static final int ONE_PARTY = 1 << PARTIES_SHIFT;//65537private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
doArrive()
把未到达线程数减一,减一之和,还未到1(空置设置的是1,这里和jdk1.7有点区别)。什么都不做。直接返回。如果到1后。说明所有线程到达。 然后会处理两件事:
1)重置state,把state 的未到达线程个数重置到总的注册的线程数中。同时phaser加1
2).唤醒队列中的线程
private int doArrive(int adjust) {final Phaser root = this.root;for (;;) {long s = (root == this) ? state : reconcileState();int phase = (int)(s >>> PHASE_SHIFT);//小于0,直接返回if (phase < 0)return phase;int counts = (int)s;int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);if (unarrived <= 0)throw new IllegalStateException(badArrive(s));if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {//CAS 减一if (unarrived == 1) {//所有线程到达long n = s & PARTIES_MASK; // base of next stateint nextUnarrived = (int)n >>> PARTIES_SHIFT;if (root == this) {//父 Phaser 为空,调用自己的 onAdvance()if (onAdvance(phase, nextUnarrived))n |= TERMINATION_BIT;//最高位置为1else if (nextUnarrived == 0)n |= EMPTY;elsen |= nextUnarrived;//下一轮的未到达数等于总的线程数int nextPhase = (phase + 1) & MAX_PHASE;n |= (long)nextPhase << PHASE_SHIFT;//重置stateUNSAFE.compareAndSwapLong(this, stateOffset, s, n);//释放所有线程releaseWaiters(phase);}else if (nextUnarrived == 0) { // propagate deregistration//父 Phaser 不为空,调用父 的doArrive()并且下个未到达==0.//则把未到的线程和下一轮未到的线程都减一phase = parent.doArrive(ONE_DEREGISTER);UNSAFE.compareAndSwapLong(this, stateOffset,s, s | EMPTY);}else// 父 Phaser 不为空,调用父 的doArrive() 减一个phase = parent.doArrive(ONE_ARRIVAL);}//没有全部到达,直接返回return phase;}}}
releaseWaiters()
private void releaseWaiters(int phase) {QNode q; // first element of queueThread t; // its thread//根据phase是奇数还是偶数来找对应的栈AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;//遍历整个栈while ((q = head.get()) != null &&//如果当前节点的phase 不等于当前Phaser的phase,说明该节点不是当前轮的,而是前一轮的。//需要被释放并唤醒q.phase != (int)(root.state >>> PHASE_SHIFT)) {if (head.compareAndSet(q, q.next) &&(t = q.thread) != null) {q.thread = null;LockSupport.unpark(t);}}}
awaitAdvance()
internalAwaitAdvance 方法主要是调用了ForkJoinPool.manageBlock函数。目的是把Node对应的线程阻塞。
public int awaitAdvance(int phase) {final Phaser root = this.root;//root==this 表示没有树状结构。只有一个Phaserlong s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0)//phase 已经结束,不用阻塞了,直接返回return phase;if (p == phase)//阻塞在这一层上面return root.internalAwaitAdvance(phase, null);return p;}
业务场景
实现CountDownLatch功能
前面我们用 CountDownLatch实现了 主线程等待10 个线程预加载数据完成后再执行 加载其他组件功能。
代码可以参考之前的CountDownLatch文章.
代码
public class DoThing extends Thread{private final Phaser startPhaser;private final Phaser donePhaser;public DoThing(Phaser startSignal, Phaser doneSignal) {this.startPhaser = startSignal;this.donePhaser = doneSignal;}@Overridepublic void run() {try {//开始阻塞了,等待主线程开启System.out.println(Thread.currentThread().getName()+" 开始阻塞等待了...");startPhaser.awaitAdvance(startPhaser.getPhase());doWork();donePhaser.arrive();} catch (InterruptedException ex) {}}public void doWork() throws InterruptedException {System.out.println(Thread.currentThread().getName() + " 开始干活了,DB 中的数据加载到本地缓存中");Thread.sleep(1000);}
}
/*** @author :echo_黄诗* @description:利用Phaser 来实现 CountDownLatch 功能* @date :2023/3/3 18:50*/
public class Run {public static void main(String[] args) {Phaser startPhaser = new Phaser(1);Phaser donePhaser = new Phaser(10);for (int i = 0; i < 10; ++i) // create and start threadsnew Thread(new DoThing(startPhaser, donePhaser)).start();doSomethingElse();//开始加载数据,startPhaser.arrive();//主线程阻塞,等待数据加载完成donePhaser.awaitAdvance(donePhaser.getPhase());doSomethingElse();System.out.println("数据加载完成,开始启动其他组件,包括dubbo组件");}public static void doSomethingElse(){try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}
}
测试结果
实现 CyclicBarrier功能
前面CyclicBarrier章节 我们利用 CyclicBarrier 实现了10个求职者 一起来了后开始笔试,然后一起面试的功能。之前的CyclicBarrier的代码可以参考.
代码展示
工具类
public class Utils {/*** 模拟在路上方法*/public static void doOnTheWay() {doCostTime(2000);}/*** 模拟笔试过程*/public static void doWriteExam() {doCostTime(3000);}/*** 模拟面试过程*/public static void doInterview() {doCostTime(5000);}private static void doCostTime(int time) {Random random = new Random();try {//随机休眠时间int count = random.nextInt(time);// System.out.println(count);Thread.sleep(count);} catch (InterruptedException e) {e.printStackTrace();}}
}
面试类
public class Interview extends Thread{private Phaser phaser;public Interview(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {phaser.arrive();Utils.doInterview();System.out.println(Thread.currentThread().getName() + " 面试完啦.....");}
}
笔试类
public class WriteExam extends Thread{private Phaser phaser;public WriteExam(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {Utils.doWriteExam();phaser.arrive();System.out.println(Thread.currentThread().getName() + " 笔试做完了....");}
}
来公司路上
public class OnTheWay extends Thread{private Phaser phaser;public OnTheWay(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {Utils.doOnTheWay();System.out.println(Thread.currentThread().getName() + " 已经来公司了...");phaser.arrive();}
}
运行类
此处需要注意,因为是主线程执行 awaitAdvance。所以需要先执行子线程。不然主线程执行就直接阻塞了。
子线程都没有机会执行。因为子线程是靠主线程启动的。
/*** @author :echo_黄诗* @description:用Phaser 实现CyclicBarrier功能* @date :2023/3/3 19:08*/
public class Run {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(10,20,100, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(100));Phaser phaser=new Phaser(10);for (int i=0;i<10;i++){threadPoolExecutor.execute(new OnTheWay(phaser));}phaser.awaitAdvance(phaser.getPhase());for (int i=0;i<10;i++){threadPoolExecutor.execute(new WriteExam(phaser));}phaser.awaitAdvance(phaser.getPhase());for (int i=0;i<10;i++){threadPoolExecutor.execute(new Interview(phaser));}phaser.awaitAdvance(phaser.getPhase());}
}
测试结果
从打印截图可以看出,使用Phaser 能很好的实现了CyclicBarrier的同步阻塞功能。
但是目前还不能实现其回调函数的功能。
总结
Phaser 相比CountDownLatch ,Semaphore,CyclicBarrier 的源码,实现上还是复杂的多。但使用上面比较简单。这里只是给大家一个例子。知道该如何用。目前分析源码也不是很透彻。如果要彻底弄清楚,可以参考 Java并发实现原理这本教程。