网站内容图片怎么做的,菏泽做网站建设的公司,社区服务流程的文献,法律网站建设价格CountDownLatch结构与核心实现分析
CountDownLatch是一个一次性同步工具#xff0c;基于AQS(AbstractQueuedSynchronizer)实现。它的核心思想是维护一个计数器#xff0c;当计数器归零时释放所有等待的线程。
主要组件结构
CountDownLatch
├── Sync (内部类#xff0c…CountDownLatch结构与核心实现分析
CountDownLatch是一个一次性同步工具基于AQS(AbstractQueuedSynchronizer)实现。它的核心思想是维护一个计数器当计数器归零时释放所有等待的线程。
主要组件结构
CountDownLatch
├── Sync (内部类继承AbstractQueuedSynchronizer)
│ ├── tryAcquireShared() - 尝试获取共享锁
│ └── tryReleaseShared() - 尝试释放共享锁
└── 核心方法├── await() - 等待计数归零├── countDown() - 计数减一└── getCount() - 获取当前计数内部同步器 (Sync类)
CountDownLatch通过内部类Sync继承AbstractQueuedSynchronizer来实现同步控制
private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count); // 将计数值设置为AQS的state}
}关键设计
使用AQS的state字段存储计数值利用AQS的共享模式实现多线程等待机制
等待机制 (await方法)
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}工作流程
调用AQS的acquireSharedInterruptibly()方法内部调用tryAcquireShared()检查状态如果计数不为0线程进入等待队列阻塞当计数归零时所有等待线程被唤醒
计数递减机制 (countDown方法)
public void countDown() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {signalNext(head);return true;}return false;} 核心实现在tryReleaseShared()
protected boolean tryReleaseShared(int releases) {for (;;) {int c getState();if (c 0) return false; // 已经为0无需操作int nextc c - 1;if (compareAndSetState(c, nextc)) // CAS原子更新return nextc 0; // 返回是否归零}
}关键特性
使用CAS循环确保原子性操作只有当计数归零时才返回true触发线程唤醒计数为0时的countDown()调用不会产生效果
AQS共享模式运用
CountDownLatch采用AQS的共享模式
tryAcquireShared()当state为0时返回1(成功)否则返回-1(失败)tryReleaseShared()递减state当变为0时返回true通知AQS唤醒等待线程
线程同步机制
等待线程管理
等待的线程被加入AQS的等待队列使用LockSupport.park()阻塞线程计数归零时AQS自动唤醒所有等待线程
内存可见性保证
通过AQS的volatile state字段确保内存可见性countDown()操作的happens-before关系确保线程安全
一次性特性
CountDownLatch的一次性设计体现在
计数器不能重置归零后永远保持为0后续的await()调用会立即返回这与CyclicBarrier的可重复使用形成对比
典型使用场景
启动信号一个线程等待多个线程准备完毕完成信号主线程等待多个工作线程完成任务分治任务将大任务拆分等待所有子任务完成 CyclicBarrier核心实现分析
CyclicBarrier采用ReentrantLock Condition的传统同步方式而非直接实现AQS框架
private final ReentrantLock lock new ReentrantLock();
private final Condition trip lock.newCondition();核心组件
Generation类代表栅栏的一个周期用于标识栅栏状态count字段当前等待的线程数从parties递减到0parties字段参与同步的线程总数固定不变
Generation周期管理
private static class Generation {boolean broken; // 栅栏是否被破坏
}每次栅栏被触发或重置时都会创建新的Generation实例实现周期性复用。
private void nextGeneration() {trip.signalAll(); // 唤醒所有等待线程count parties; // 重置计数器generation new Generation(); // 创建新周期
}核心等待逻辑 (dowait方法)
工作流程
获取锁使用ReentrantLock确保线程安全计数递减int index --count判断是否最后一个线程 如果index 0执行barrier action触发nextGeneration()否则进入trip.await()等待状态
int index --count;
if (index 0) { // 最后一个线程到达// 执行barrier actionnextGeneration(); // 开启新周期return 0;
}
// 其他线程等待
trip.await();CountDownLatch vs CyclicBarrier 对比分析
特性CountDownLatchCyclicBarrier底层实现AQS共享模式ReentrantLock Condition同步方式一次性计数器可重复使用的栅栏线程角色等待者vs触发者分离所有线程平等参与
使用模式对比
CountDownLatch主从模式
// 主线程等待工作线程触发
CountDownLatch latch new CountDownLatch(3);
// 工作线程: latch.countDown()
// 主线程: latch.await()CyclicBarrier协作模式
// 所有线程相互等待
CyclicBarrier barrier new CyclicBarrier(3);
// 每个线程: barrier.await()技术实现细节对比
计数逻辑差异
CountDownLatch计数从N递减到0不可重置CyclicBarrier计数从parties递减到0后自动重置
线程唤醒机制
CountDownLatch使用AQS的releaseShared()统一唤醒CyclicBarrier最后到达的线程执行signalAll()唤醒其他线程
异常处理策略
CountDownLatch单纯的计数递减异常不影响其他线程CyclicBarrier采用all-or-none模式一个线程异常会导致所有线程收到BrokenBarrierException Semaphore 结构与核心实现分析
总述
核心机制基于 AQS 的共享锁模式通过 state 管理许可证。策略差异公平与非公平模式影响线程获取许可证的顺序。灵活性支持批量操作、许可证调整及清空适用于多样化并发控制场景。优势轻量级资源控制无需显式锁且释放操作不限于持有线程。
整体结构
核心成员private final Sync sync基于 AQS 的同步器同步器层级 抽象类 Sync继承 AbstractQueuedSynchronizer实现基础的许可证操作获取/释放/调整。子类 NonfairSync非公平策略默认允许线程“插队”。子类 FairSync公平策略遵循 FIFO 顺序。
核心方法
方法作用acquire()阻塞获取 1 个许可响应中断acquireUninterruptibly()阻塞获取 1 个许可不响应中断tryAcquire()尝试立即获取 1 个许可非公平策略tryAcquire(timeout)限时获取 1 个许可支持中断release()释放 1 个许可availablePermits()返回当前可用许可数drainPermits()清空并返回所有可用许可reducePermits(reduction)减少许可证数量不可逆操作
底层实现机制
(1) 许可证存储
AQS 状态 state存储当前可用许可证数量。 Sync(int permits) {setState(permits); // 初始化许可证数量
}
(2) 许可证获取 非公平策略NonfairSync protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); // 直接尝试获取
}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available getState();int remaining available - acquires;if (remaining 0 || compareAndSetState(available, remaining))return remaining; // 负数表示失败}
} 特点允许新线程“插队”获取许可证。 公平策略FairSync protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors()) // 检查是否有等待线程return -1; // 有则拒绝获取int available getState();int remaining available - acquires;if (remaining 0 || compareAndSetState(available, remaining))return remaining;}
} 特点严格遵循 FIFO 顺序防止线程饥饿。
(3) 许可证释放
protected final boolean tryReleaseShared(int releases) {for (;;) {int current getState();int next current releases;if (next current) throw new Error(Overflow);if (compareAndSetState(current, next))return true; // 成功释放}
}
支持跨线程释放无“持有者”概念任何线程均可释放。
(4) 特殊操作
drainPermits() final int drainPermits() {for (;;) {int current getState();if (current 0 || compareAndSetState(current, 0))return current; // 返回被清空的许可数}
} reducePermits() final void reducePermits(int reductions) {for (;;) {int current getState();int next current - reductions;if (next current) throw new Error(Underflow);if (compareAndSetState(current, next))return;}
} 典型应用场景 // 连接池资源控制最大 100 个连接
class ConnectionPool {private static final int MAX_CONNECTIONS 100;private final Semaphore available new Semaphore(MAX_CONNECTIONS, true);public Connection getConnection() throws InterruptedException {available.acquire(); // 阻塞直到获取许可return createConnection();}public void releaseConnection(Connection conn) {closeConnection(conn);available.release(); // 释放许可}
}
作用通过许可证数量限制并发资源访问避免资源耗尽。