电脑做ppt模板下载网站,天津网站设计诺亚科技,温州中小企业网站制作,广州联雅做的网站怎么样文章目录 前言非公平锁公平锁 前言
上一篇说过#xff0c;zookeeper是一个类似文件系统的数据结构#xff0c;每个节点都可以看做是一个文件目录#xff0c;也就是说#xff0c;我们所创建的节点是唯一的#xff0c;那么分布式锁的原理就是基于这个来的。
代码仓库… 文章目录 前言非公平锁公平锁 前言
上一篇说过zookeeper是一个类似文件系统的数据结构每个节点都可以看做是一个文件目录也就是说我们所创建的节点是唯一的那么分布式锁的原理就是基于这个来的。
代码仓库https://gitee.com/LIRUIYI/test-zk.git
非公平锁
通过创建节点并判断节点是否存在实现分布式锁。
创建临时节点如/lock临时节点是当出现异常没有删除导致永久加锁的情况发生当节点不存在创建节点成功也就意味着枷锁成功如果创建失败那么就是加锁失败其他需要加锁的线程监听/lock节点get -w /lock当节点/lock删除后zookeeper会通知到监听的节点
这种方式的加锁不能保证需要加锁的线程能得到锁的概率一样他们是随机的有可能最先排队的加锁线程到最后都不能得到锁这就是非公平锁。
以原始客户端实现非公平锁
这里zookeeper地址和上面不一样因为之前是桥接模式自动获取的有时ip会变动所以改NAT模式设定了静态ip
package com.liry.zk;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;/*** 分布式锁 - 非公平锁实现** author ALI* since 2022/12/25*/
Slf4j
public class NonfairSyncLock {private static final String CONNECT_STR 192.168.17.128:2181;private static final int TIME_OUT 30000;private static final String LOCK_PATH /lock;private static ZooKeeper zookeeper null;/*** 获取客户端*/public static ZooKeeper getClient() throws IOException, InterruptedException {synchronized (LOCK_PATH) {final CountDownLatch latch new CountDownLatch(1);if (zookeeper null) {Watcher startWatcher watchedEvent - {if (watchedEvent.getState() Watcher.Event.KeeperState.SyncConnected) {log.info(与zookeeper建立连接);latch.countDown();}};zookeeper new ZooKeeper(CONNECT_STR, TIME_OUT, startWatcher);latch.await();} else if (zookeeper.getState() ! ZooKeeper.States.CONNECTED) {latch.await();}}return zookeeper;}/*** 加锁*/public void lock() {while (true) {if (tryLock()) {return;}// 加锁失败增加监听然后阻塞try {watch();} catch (Exception e) {throw new RuntimeException(e);}}}/*** 解锁*/public void unlock() {try {getClient().delete(LOCK_PATH, -1);} catch (Exception e) {throw new RuntimeException(e);}}/*** 尝试加锁*/private boolean tryLock() {try {getClient().create(LOCK_PATH, lock.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);} catch (Exception e) {log.error(加锁失败);return false;}return true;}/*** 监听锁节点* 当节点存在时线程阻塞当节点不存在直接退出监听*/private void watch() throws InterruptedException, KeeperException, IOException {CountDownLatch latch new CountDownLatch(1);Watcher dataWatch event - {if (event.getType() Watcher.Event.EventType.NodeDeleted) {latch.countDown();}};try {getClient().getData(LOCK_PATH, dataWatch, null);} catch (KeeperException.NoNodeException e) {// 当创建监听时节点不存在说明有线程解锁了那么直接退出监听步骤去争抢锁return;}latch.await();}} static int count 0;public static void main(String[] args) throws InterruptedException {nonFairLock();}private static void nonFairLock() throws InterruptedException {NonfairSyncLock lock new NonfairSyncLock();CountDownLatch latch new CountDownLatch(1000);ListThread threadList IntStream.range(0, 1000).mapToObj(d - new Thread(() - {lock.lock();count 1;latch.countDown();lock.unlock();}, 线程- d)).collect(Collectors.toList());threadList.forEach(Thread::start);latch.await();System.out.println(最终结果应是1000 count);}公平锁
相对于非公平锁的实现这个方式较为复杂一点。 先创建一个根节点/lock 再在/lock下创建临时有序节点有序节点是因为所有需要加锁的节点需要按先来后到的顺序才能公平 然后每个有序节点都监听它的前一个节点如图当前一个节点被删除表示解锁了那么zookeeper会通知到监听的节点也就是下一个需要加锁的线程
这个在curator中已经有实现了分布式锁不局限于zookeeper了解其原理就行
static int count 0;public static void main(String[] args) throws InterruptedException {
// nonFairLock();fairLock();}private static void fairLock() throws InterruptedException {// 使用curator客户端RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);CuratorFramework client CuratorFrameworkFactory.builder().connectString(92.168.17.128:2181).sessionTimeoutMs(3000).connectionTimeoutMs(3000).retryPolicy(retryPolicy).build();client.start();InterProcessMutex lock new InterProcessMutex(client, /lock);CountDownLatch latch new CountDownLatch(1000);ListThread threadList IntStream.range(0, 1000).mapToObj(d - new Thread(() - {try {lock.acquire();} catch (Exception e) {throw new RuntimeException(e);}count 1;latch.countDown();try {lock.release();} catch (Exception e) {throw new RuntimeException(e);}}, 线程- d)).collect(Collectors.toList());threadList.forEach(Thread::start);latch.await();System.out.println(最终结果应是1000 count);}