当前位置: 首页 > news >正文

深圳南山企业网站建设搜收录网

深圳南山企业网站建设,搜收录网,汽车网站页面设计,给女朋友做的网站内容同步工具类:Phaser介绍特性动态调整线程个数层次Phaser源码分析state 变量解析构造函数对state变量赋值阻塞方法arrive()awaitAdvance()业务场景实现CountDownLatch功能代码测试结果实现 CyclicBarrier功能代码展示测试结果总结介绍 一个可重复使用的同步屏障,功能…

同步工具类:Phaser

  • 介绍
    • 特性
      • 动态调整线程个数
      • 层次Phaser
  • 源码分析
    • state 变量解析
    • 构造函数对state变量赋值
    • 阻塞方法
    • arrive()
    • awaitAdvance()
  • 业务场景
    • 实现CountDownLatch功能
      • 代码
      • 测试结果
    • 实现 CyclicBarrier功能
      • 代码展示
      • 测试结果
  • 总结

介绍

一个可重复使用的同步屏障,功能类似于CyclicBarrier和CountDownLatch,但支持更灵活的使用。该工具类是 JDK 1.7才引入的。功能比 CyclicBarrier 和 CountDownLatch 都要强大。
CyclicBarrier 和 CountDownLatch 的学习地址

特性

动态调整线程个数

CyclicBarrier 所要同步的线程个数是在构造函数中指定的,之后不能更改,而Phaser可以在运行期间动态地调整要同步的线程个数。用来修改同步线程个数的函数有:

 public int register() {//注册一个线程return doRegister(1);}
 public int bulkRegister(int parties) {if (parties < 0)throw new IllegalArgumentException();if (parties == 0)return getPhase();//注册多个线程return doRegister(parties);}
 public int arriveAndDeregister() {//解注册,减少线程个数return doArrive(ONE_DEREGISTER);}

层次Phaser

即 多个Phaser 可以组成 如下的树状结构,可以通过在构造函数中传入父Phaser 来实现
在这里插入图片描述

 public Phaser(Phaser parent) {//传入Phaser 作为 该对象的父 节点this(parent, 0);}

如上图,如果传Phaser 参数过来,对应的线程数量会相加。对于树状Phaser 上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser 是一样的,具体来讲,Phaser 并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册,当子Phaser中注册的参与者数量等于0时,会自动向父节点解除注册。父Phaser把子Phaser当作一个正常参与的线程即可。

源码分析

state 变量解析

Phaser 没有基于AQS来实现,但具备AQS的核心特性:state 变量CAS 操作阻塞队列等。
首先看下state 变量。

    /*** Primary state representation, holding four bit-fields:*  state 分为了四个部分。* unarrived  -- the number of parties yet to hit barrier (bits  0-15)* parties    -- the number of parties to wait            (bits 16-31)* phase      -- the generation of the barrier            (bits 32-62)* terminated -- set if barrier is terminated             (bit  63 / sign)** Except that a phaser with no registered parties is* distinguished by the otherwise illegal state of having zero* parties and one unarrived parties (encoded as EMPTY below).** To efficiently maintain atomicity, these values are packed into* a single (atomic) long. Good performance relies on keeping* state decoding and encoding simple, and keeping race windows* short.** All state updates are performed via CAS except initial* registration of a sub-phaser (i.e., one with a non-null* parent).  In this (relatively rare) case, we use built-in* synchronization to lock while first registering with its* parent.** The phase of a subphaser is allowed to lag that of its* ancestors until it is actually accessed -- see method* reconcileState.*/private volatile long state;

从英文注释中我们可以得知:这个64位的state变量被拆分成4部分。如图所示

 ffewaeseafwefafef最高位0表示未同步完成,1表示同步完成。初始最高位为0.可以看下如下几个函数对state 的各个部分组成的获取。获取当前轮数(32-62位)

    /*** Returns the current phase number. The maximum phase number is* {@code Integer.MAX_VALUE}, after which it restarts at* zero. Upon termination, the phase number is negative,* in which case the prevailing phase prior to termination* may be obtained via {@code getPhase() + Integer.MIN_VALUE}.** @return the phase number, or a negative value if terminated*/public final int getPhase() {//无符号右移(高位补0) 32位。//然后强制int .则最高位还是 是否为同步信息。//如果是负数,说明最高位是1,已经完成了同步return (int)(root.state >>> PHASE_SHIFT);}private static final int  PHASE_SHIFT     = 32;

判断当前轮数同步是否完成。

    /*** Returns {@code true} if this phaser has been terminated.** @return {@code true} if this phaser has been terminated*/public boolean isTerminated() {//判断整个值是否是负数即可。最高位也是符号位。return root.state < 0L;}

获取总注册线程数

    /*** Returns the number of parties registered at this phaser.** @return the number of parties*/public int getRegisteredParties() {return partiesOf(state);}private static int partiesOf(long s) {//先将s 转成int.相当于把高位 是否完成同步和轮数舍弃了。//只剩下低位的总线程数和未达到线程数。然后向右无符号右移16位(高位补0)。//就只剩下了总线程数return (int)s >>> PARTIES_SHIFT;}private static final int  PARTIES_SHIFT   = 16;

获取未达到的线程数

    /*** Returns the number of registered parties that have not yet* arrived at the current phase of this phaser. If this phaser has* terminated, the returned value is meaningless and arbitrary.** @return the number of unarrived parties*/public int getUnarrivedParties() {return unarrivedOf(reconcileState());}
    private static int unarrivedOf(long s) {// 首先将state强转为int,舍弃高位的 是否完成同步和轮数信息int counts = (int)s;//不为空的话,则和 0xffff 进行与操作,截取 低16位。即未到达的线程数return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);}private static final int  EMPTY           = 1;private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints

构造函数对state变量赋值

如果能理解state 变量的分布图。64位的划分,划分为4个部分。 那么就很容易理解构造函数的初始了。

    /*** Creates a new phaser with the given parent and number of* registered unarrived parties.  When the given parent is non-null* and the given number of parties is greater than zero, this* child phaser is registered with its parent.** @param parent the parent phaser* @param parties the number of parties required to advance to the* next phase* @throws IllegalArgumentException if parties less than zero* or greater than the maximum number of parties supported*/public Phaser(Phaser parent, int parties) {//初始化未到达的线程数,不能大于2 的16次方。上面有个state 变量划分图if (parties >>> PARTIES_SHIFT != 0)throw new IllegalArgumentException("Illegal number of parties");//初始轮数为0int phase = 0;this.parent = parent;//判断parent 是否为null。不为null.则把自己注册到父对象中if (parent != null) {final Phaser root = parent.root;this.root = root;this.evenQ = root.evenQ;this.oddQ = root.oddQ;if (parties != 0)phase = parent.doRegister(1);}else {this.root = this;this.evenQ = new AtomicReference<QNode>();this.oddQ = new AtomicReference<QNode>();}//如果parties为0,则赋值EMPTY.//不为0,则将 phare 左移 32位  轮数赋值//      将parties 左移16位   总线程数数,初始值和未达到数一样//       parties            未达到线程数// 最后将三个数 进行或运算 (只有有一个是1,则结果为1)this.state = (parties == 0) ? (long)EMPTY :((long)phase << PHASE_SHIFT) |((long)parties << PARTIES_SHIFT) |((long)parties);}private static final int  PARTIES_SHIFT   = 16;private static final int  PHASE_SHIFT     = 32;private static final int  EMPTY           = 1;

阻塞方法

如下图所示,右边的主线程会调用awaitAdvance()进行阻塞。左边的arrive()会对state进行cas的累减操作。当未到达的线程数减到0时,唤醒右边阻塞的主线程。
在这里插入图片描述
在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表。Treiber Stack是一个单向链表,出栈和入栈都在链表头部,所以只需要一个head指针,而不需要tail指针。
Treiber Stack 代码

static final class QNode implements ForkJoinPool.ManagedBlocker {final Phaser phaser;final int phase;final boolean interruptible;final boolean timed;boolean wasInterrupted;long nanos;final long deadline;volatile Thread thread; // nulled to cancel wait 每个Node 对应一个Thread 对象QNode next; //单项链表,QNode(Phaser phaser, int phase, boolean interruptible,boolean timed, long nanos) {this.phaser = phaser;this.phase = phase;this.interruptible = interruptible;this.nanos = nanos;this.timed = timed;this.deadline = timed ? System.nanoTime() + nanos : 0L;thread = Thread.currentThread();}
}
    /*** Heads of Treiber stacks for waiting threads. To eliminate* contention when releasing some threads while adding others, we* use two of them, alternating across even and odd phases.* Subphasers share queues with root to speed up releases.*///为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Stack。private final AtomicReference<QNode> evenQ;private final AtomicReference<QNode> oddQ;private AtomicReference<QNode> queueFor(int phase) {//当phase 为奇数的时候,阻塞线程放在addQ里面//        为偶数的时候,阻塞线程放在eventQ里面return ((phase & 1) == 0) ? evenQ : oddQ;}

arrive()

对state 变量进行操作,然后唤醒线程。
arrive()和arriveAndDeregister()内部调用的都是doArrive() 函数。区别在于前者只是把未达到线程数减一。后者则把未到达线程数和下一轮的总线程数都减一。

   public int arrive() {return doArrive(ONE_ARRIVAL);}private static final int  ONE_ARRIVAL     = 1;
   public int arriveAndDeregister() {return doArrive(ONE_DEREGISTER);}//65536private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;//65537private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;

doArrive()
把未到达线程数减一,减一之和,还未到1(空置设置的是1,这里和jdk1.7有点区别)。什么都不做。直接返回。如果到1后。说明所有线程到达。 然后会处理两件事:
1)重置state,把state 的未到达线程个数重置到总的注册的线程数中。同时phaser加1
2).唤醒队列中的线程

 private int doArrive(int adjust) {final Phaser root = this.root;for (;;) {long s = (root == this) ? state : reconcileState();int phase = (int)(s >>> PHASE_SHIFT);//小于0,直接返回if (phase < 0)return phase;int counts = (int)s;int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);if (unarrived <= 0)throw new IllegalStateException(badArrive(s));if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {//CAS 减一if (unarrived == 1) {//所有线程到达long n = s & PARTIES_MASK;  // base of next stateint nextUnarrived = (int)n >>> PARTIES_SHIFT;if (root == this) {//父 Phaser 为空,调用自己的 onAdvance()if (onAdvance(phase, nextUnarrived))n |= TERMINATION_BIT;//最高位置为1else if (nextUnarrived == 0)n |= EMPTY;elsen |= nextUnarrived;//下一轮的未到达数等于总的线程数int nextPhase = (phase + 1) & MAX_PHASE;n |= (long)nextPhase << PHASE_SHIFT;//重置stateUNSAFE.compareAndSwapLong(this, stateOffset, s, n);//释放所有线程releaseWaiters(phase);}else if (nextUnarrived == 0) { // propagate deregistration//父 Phaser 不为空,调用父 的doArrive()并且下个未到达==0.//则把未到的线程和下一轮未到的线程都减一phase = parent.doArrive(ONE_DEREGISTER);UNSAFE.compareAndSwapLong(this, stateOffset,s, s | EMPTY);}else// 父 Phaser 不为空,调用父 的doArrive() 减一个phase = parent.doArrive(ONE_ARRIVAL);}//没有全部到达,直接返回return phase;}}}

releaseWaiters()

 private void releaseWaiters(int phase) {QNode q;   // first element of queueThread t;  // its thread//根据phase是奇数还是偶数来找对应的栈AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;//遍历整个栈while ((q = head.get()) != null &&//如果当前节点的phase 不等于当前Phaser的phase,说明该节点不是当前轮的,而是前一轮的。//需要被释放并唤醒q.phase != (int)(root.state >>> PHASE_SHIFT)) {if (head.compareAndSet(q, q.next) &&(t = q.thread) != null) {q.thread = null;LockSupport.unpark(t);}}}

awaitAdvance()

internalAwaitAdvance 方法主要是调用了ForkJoinPool.manageBlock函数。目的是把Node对应的线程阻塞。

 public int awaitAdvance(int phase) {final Phaser root = this.root;//root==this 表示没有树状结构。只有一个Phaserlong s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0)//phase 已经结束,不用阻塞了,直接返回return phase;if (p == phase)//阻塞在这一层上面return root.internalAwaitAdvance(phase, null);return p;}

业务场景

实现CountDownLatch功能

前面我们用 CountDownLatch实现了 主线程等待10 个线程预加载数据完成后再执行 加载其他组件功能。
代码可以参考之前的CountDownLatch文章.

代码

public class DoThing extends Thread{private final Phaser startPhaser;private final Phaser donePhaser;public DoThing(Phaser startSignal, Phaser doneSignal) {this.startPhaser = startSignal;this.donePhaser = doneSignal;}@Overridepublic void run() {try {//开始阻塞了,等待主线程开启System.out.println(Thread.currentThread().getName()+" 开始阻塞等待了...");startPhaser.awaitAdvance(startPhaser.getPhase());doWork();donePhaser.arrive();} catch (InterruptedException ex) {}}public void doWork() throws InterruptedException {System.out.println(Thread.currentThread().getName() + " 开始干活了,DB 中的数据加载到本地缓存中");Thread.sleep(1000);}
}
/*** @author :echo_黄诗* @description:利用Phaser 来实现 CountDownLatch 功能* @date :2023/3/3 18:50*/
public class Run {public static void main(String[] args) {Phaser startPhaser = new Phaser(1);Phaser donePhaser = new Phaser(10);for (int i = 0; i < 10; ++i) // create and start threadsnew Thread(new DoThing(startPhaser, donePhaser)).start();doSomethingElse();//开始加载数据,startPhaser.arrive();//主线程阻塞,等待数据加载完成donePhaser.awaitAdvance(donePhaser.getPhase());doSomethingElse();System.out.println("数据加载完成,开始启动其他组件,包括dubbo组件");}public static void doSomethingElse(){try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}
}

测试结果

在这里插入图片描述

实现 CyclicBarrier功能

前面CyclicBarrier章节 我们利用 CyclicBarrier 实现了10个求职者 一起来了后开始笔试,然后一起面试的功能。之前的CyclicBarrier的代码可以参考.

代码展示

工具类

public class Utils {/*** 模拟在路上方法*/public static void doOnTheWay() {doCostTime(2000);}/*** 模拟笔试过程*/public static void doWriteExam() {doCostTime(3000);}/*** 模拟面试过程*/public static void doInterview() {doCostTime(5000);}private static void doCostTime(int time) {Random random = new Random();try {//随机休眠时间int count = random.nextInt(time);// System.out.println(count);Thread.sleep(count);} catch (InterruptedException e) {e.printStackTrace();}}
}

面试类

public class Interview extends Thread{private Phaser phaser;public Interview(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {phaser.arrive();Utils.doInterview();System.out.println(Thread.currentThread().getName() + "  面试完啦.....");}
}

笔试类

public class WriteExam extends Thread{private Phaser phaser;public WriteExam(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {Utils.doWriteExam();phaser.arrive();System.out.println(Thread.currentThread().getName() + " 笔试做完了....");}
}

来公司路上

public class OnTheWay extends Thread{private Phaser phaser;public OnTheWay(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {Utils.doOnTheWay();System.out.println(Thread.currentThread().getName() + " 已经来公司了...");phaser.arrive();}
}

运行类
此处需要注意,因为是主线程执行 awaitAdvance。所以需要先执行子线程。不然主线程执行就直接阻塞了。
子线程都没有机会执行。因为子线程是靠主线程启动的。

/*** @author :echo_黄诗* @description:用Phaser 实现CyclicBarrier功能* @date :2023/3/3 19:08*/
public class Run {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(10,20,100, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(100));Phaser phaser=new Phaser(10);for (int i=0;i<10;i++){threadPoolExecutor.execute(new OnTheWay(phaser));}phaser.awaitAdvance(phaser.getPhase());for (int i=0;i<10;i++){threadPoolExecutor.execute(new WriteExam(phaser));}phaser.awaitAdvance(phaser.getPhase());for (int i=0;i<10;i++){threadPoolExecutor.execute(new Interview(phaser));}phaser.awaitAdvance(phaser.getPhase());}
}

测试结果

从打印截图可以看出,使用Phaser 能很好的实现了CyclicBarrier的同步阻塞功能。
但是目前还不能实现其回调函数的功能。
在这里插入图片描述

总结

Phaser 相比CountDownLatch ,Semaphore,CyclicBarrier 的源码,实现上还是复杂的多。但使用上面比较简单。这里只是给大家一个例子。知道该如何用。目前分析源码也不是很透彻。如果要彻底弄清楚,可以参考 Java并发实现原理这本教程。

http://www.tj-hxxt.cn/news/38459.html

相关文章:

  • Wordpress转jekyll上海seo网站排名优化公司
  • 想让网站被谷歌收录怎么做百度推广官方投诉电话
  • 怎么做app和网站购物车网上销售平台
  • 门户网站建设目标软文媒体
  • 南昌手机网站建设网络精准营销推广
  • 查工程项目的网站互联网网络推广
  • 做类似淘宝网站怎么做友情链接交换要注意哪些问题
  • 网站正能量点进去就能看关键词搜索排名工具
  • 企业网站建设实验感想西安网站设计
  • 模板网站的好处搜索引擎优化的英文缩写是什么
  • 网站策划的最终体现是撰写网站策划书google关键词seo
  • 如何做电商网站设计长春seo结算
  • 信阳市两学一做网站免费找精准客户软件
  • 怎么做游戏自动充值的网站网络营销工具有哪些
  • 盐城哪家做网站的正规十大外贸电商平台
  • 哪个网站可以做破案h5打开百度网页
  • 企业网站图片渐变效果怎么做信阳网站seo
  • 接做网站需要问什么万网官网域名注册
  • 做网站 长百度竞价关键词优化
  • 南通网站设计网店营销策略有哪些
  • wordpress汉化制作优化网站怎么做
  • 360建网站好不好?360站长
  • 网站主持人外贸网站制作公司
  • 自己搭建app托管平台网站优化与seo
  • 宜昌做网站优化关键词优化推广排名
  • 淮安建设局网站下载优化大师app
  • 网站开发的计划书百度扫一扫网页版
  • 青岛网站制作关键词网站查询
  • 信贷员在哪个网站做推广列举五种网络营销模式
  • 浙江网站建设情况分析网站优化效果