当前位置: 首页 > news >正文

方寸网站建设网站移动排名

方寸网站建设,网站移动排名,免费网页模板网站,放网站的图片做多大分辨率1 Redisson概述1.1 什么是Redisson#xff1f;Redisson是一个在Redis的基础上实现的Java驻内存数据网格#xff08;In-Memory Data Grid#xff09;。它不仅提供了一系列的分布式的Java常用对象#xff0c;还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, Sorted…1 Redisson概述1.1 什么是RedissonRedisson是一个在Redis的基础上实现的Java驻内存数据网格In-Memory Data Grid。它不仅提供了一系列的分布式的Java常用对象还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离Separation of Concern从而让使用者能够将精力更集中地放在处理业务逻辑上。一个基于Redis实现的分布式工具有基本分布式对象和高级又抽象的分布式服务为每个试图再造分布式轮子的程序员带来了大部分分布式问题的解决办法。Redisson和Jedis、Lettuce有什么区别倒也不是雷锋和雷锋塔Redisson和它俩的区别就像一个用鼠标操作图形化界面一个用命令行操作文件。Redisson是更高层的抽象Jedis和Lettuce是Redis命令的封装。Jedis是Redis官方推出的用于通过Java连接Redis客户端的一个工具包提供了Redis的各种命令支持Lettuce是一种可扩展的线程安全的 Redis 客户端通讯框架基于Netty支持高级的 Redis 特性比如哨兵集群管道自动重新连接和Redis数据模型。Spring Boot 2.x 开始 Lettuce 已取代 Jedis 成为首选 Redis 的客户端。Redisson是架设在Redis基础上通讯基于Netty的综合的、新型的中间件企业级开发中使用Redis的最佳范本Jedis把Redis命令封装好Lettuce则进一步有了更丰富的Api也支持集群等模式。但是两者也都点到为止只给了你操作Redis数据库的脚手架而Redisson则是基于Redis、Lua和Netty建立起了成熟的分布式解决方案甚至redis官方都推荐的一种工具集。2 分布式锁2.1 分布式锁怎么实现分布式锁是并发业务下的刚需虽然实现五花八门ZooKeeper有Znode顺序节点数据库有表级锁和乐/悲观锁Redis有setNx但是殊途同归最终还是要回到互斥上来本篇介绍Redisson那就以redis为例。2.2 怎么写一个简单的Redis分布式锁以Spring Data Redis为例用RedisTemplate来操作RedissetIfAbsent已经是setNx expire的合并命令如下:// 加锁 public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); } // 解锁防止删错别人的锁以uuid为value校验是否自己的锁 public void unlock(String lockName, String uuid) {if(uuid.equals(redisTemplate.opsForValue().get(lockName)){ redisTemplate.opsForValue().del(lockName); } }// 结构 if(tryLock){// todo }finally{unlock; }简单1.0版本完成聪明的小张一眼看出这是锁没错但get和del操作非原子性并发一旦大了无法保证进程安全。于是小张提议用Lua脚本2.3 Lua脚本是什么Lua脚本是redis已经内置的一种轻量小巧语言其执行是通过redis的eval/evalsha命令来运行把操作封装成一个Lua脚本如论如何都是一次执行的原子操作。于是2.0版本通过Lua脚本删除lockDel.lua如下:if redis.call(get, KEYS[1]) ARGV[1] then -- 执行删除操作return redis.call(del, KEYS[1]) else -- 不成功返回0return 0 enddelete操作时执行Lua命令:// 解锁脚本 DefaultRedisScriptObject unlockScript new DefaultRedisScript(); unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(lockDel.lua)));// 执行lua脚本解锁 redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);2.0似乎更像一把锁但好像又缺少了什么小张一拍脑袋synchronized和ReentrantLock都很丝滑因为他们都是可重入锁一个线程多次拿锁也不会死锁我们需要可重入。2.4 怎么保证可重入重入就是同一个线程多次获取同一把锁是允许的不会造成死锁这一点synchronized偏向锁提供了很好的思路synchronized的实现重入是在JVM层面JAVA对象头MARK WORD中便藏有线程ID和计数器来对当前线程做重入判断避免每次CAS。当一个线程访问同步块并获取锁时会在对象头和栈帧中的锁记录里存储偏向的线程ID以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁只需简单测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁。如果测试成功表示线程已经获得了锁。如果测试失败则需要再测试一下Mark Word中偏向锁标志是否设置成1没有则CAS竞争设置了则CAS将对象头偏向锁指向当前线程。再维护一个计数器同个线程进入则自增1离开再减1直到为0才能释放2.5 可重入锁仿造该方案我们需改造Lua脚本1.需要存储 锁名称lockName、获得该锁的线程id和对应线程的进入次数count2.加锁每次线程获取锁时判断是否已存在该锁不存在设置hash的key为线程idvalue初始化为1设置过期时间返回获取锁成功true存在继续判断是否存在当前线程id的hash key存在线程key的value 1重入次数增加1设置过期时间不存在返回加锁失败3.解锁每次线程来解锁时判断是否已存在该锁存在是否有该线程的id的hash key有则减1无则返回解锁失败减1后判断剩余count是否为0为0则说明不再需要这把锁执行del命令删除2.5.1 存储结构为了方便维护这个对象我们用Hash结构来存储这些字段。Redis的Hash类似Java的HashMap适合存储对象。hset lockname1 threadId 1设置一个名字为lockname1的hash结构该hash结构key为threadId值value为1hget lockname1 threadId获取lockname1的threadId的值存储结构为:lockname 锁名称key1 threadId 唯一键线程idvalue1 count 计数器记录该线程获取锁的次数redis中的结构2.5.2 计数器的加减当同一个线程获取同一把锁时我们需要对对应线程的计数器count做加减判断一个redis key是否存在可以用exists而判断一个hash的key是否存在可以用hexists而redis也有hash自增的命令hincrby每次自增1时 hincrby lockname1 threadId 1自减1时 hincrby lockname1 threadId -12.5.3 解锁的判断当一把锁不再被需要了每次解锁一次count减1直到为0时执行删除综合上述的存储结构和判断流程加锁和解锁Lua如下加锁 lock.lua:local key KEYS[1]; local threadId ARGV[1]; local releaseTime ARGV[2];-- lockname不存在 if(redis.call(exists, key) 0) thenredis.call(hset, key, threadId, 1);redis.call(expire, key, releaseTime);return 1; end;-- 当前线程已id存在 if(redis.call(hexists, key, threadId) 1) thenredis.call(hincrby, key, threadId, 1);redis.call(expire, key, releaseTime);return 1; end; return 0;解锁 unlock.lua:local key KEYS[1]; local threadId ARGV[1];-- lockname、threadId不存在 if (redis.call(hexists, key, threadId) 0) thenreturn nil; end;-- 计数器-1 local count redis.call(hincrby, key, threadId, -1);-- 删除lock if (count 0) thenredis.call(del, key);return nil; end;代码:/*** description 原生redis实现分布式锁**/ Getter Setter public class RedisLock {private RedisTemplate redisTemplate;private DefaultRedisScriptLong lockScript;private DefaultRedisScriptObject unlockScript;public RedisLock(RedisTemplate redisTemplate) {this.redisTemplate redisTemplate;// 加载加锁的脚本lockScript new DefaultRedisScript();this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(lock.lua)));this.lockScript.setResultType(Long.class);// 加载释放锁的脚本unlockScript new DefaultRedisScript();this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(unlock.lua)));}/*** 获取锁*/public String tryLock(String lockName, long releaseTime) {// 存入的线程信息的前缀String key UUID.randomUUID().toString();// 执行脚本Long result (Long) redisTemplate.execute(lockScript,Collections.singletonList(lockName),key Thread.currentThread().getId(),releaseTime);if (result ! null result.intValue() 1) {return key;} else {return null;}}/*** 解锁* param lockName* param key*/public void unlock(String lockName, String key) {redisTemplate.execute(unlockScript,Collections.singletonList(lockName),key Thread.currentThread().getId());} }至此已经完成了一把分布式锁符合互斥、可重入、防死锁的基本特点。严谨的小张觉得虽然当个普通互斥锁已经稳稳够用可是业务里总是又很多特殊情况的比如A进程在获取到锁的时候因业务操作时间太长锁释放了但是业务还在执行而此刻B进程又可以正常拿到锁做业务操作两个进程操作就会存在依旧有共享资源的问题。而且如果负责储存这个分布式锁的Redis节点宕机以后而且这个锁正好处于锁住的状态时这个锁会出现锁死的状态。小张不是杠精因为库存操作总有这样那样的特殊。所以我们希望在这种情况时可以延长锁的releaseTime延迟释放锁来直到完成业务期望结果这种不断延长锁过期时间来保证业务执行完成的操作就是锁续约。读写分离也是常见一个读多写少的业务为了性能常常是有读锁和写锁的。而此刻的扩展已经超出了一把简单轮子的复杂程度光是处理续约就够小张喝一壶何况在性能锁的最大等待时间、优雅无效锁申请、重试失败重试机制等方面还要下功夫研究。在小张苦思冥想时旁边的小白凑过来看了看小张很好奇都2021年了为什么不直接用redisson呢Redisson就有这把你要的锁。3 Redisson分布式锁号称简单的Redisson分布式锁的使用姿势是什么3.1 依赖!-- 原生本章使用-- dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.13.6/version /dependency!-- 另一种Spring集成starter本章未使用 -- dependencygroupIdorg.redisson/groupIdartifactIdredisson-spring-boot-starter/artifactIdversion3.13.6/version /dependency3.2 配置Configuration public class RedissionConfig {Value(${spring.redis.host})private String redisHost;Value(${spring.redis.password})private String password;private int port 6379;Beanpublic RedissonClient getRedisson() {Config config new Config();config.useSingleServer().setAddress(redis:// redisHost : port).setPassword(password);config.setCodec(new JsonJacksonCodec());return Redisson.create(config);} }3.3 启用分布式锁Resource private RedissonClient redissonClient;RLock rLock redissonClient.getLock(lockName); try {boolean isLocked rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);if (isLocked) {// TODO}} catch (Exception e) {rLock.unlock();}简洁明了只需要一个RLock既然推荐Redisson就往里面看看他是怎么实现的。4 RLockRLock是Redisson分布式锁的最核心接口继承了concurrent包的Lock接口和自己的RLockAsync接口RLockAsync的返回值都是RFuture是Redisson执行异步实现的核心逻辑也是Netty发挥的主要阵地。RLock如何加锁从RLock进入找到RedissonLock类找到tryLock方法再递进到干事的tryAcquireOnceAsync方法这是加锁的主要代码版本不一此处实现有差别和最新3.15.x有一定出入但是核心逻辑依然未变。此处以3.13.6为例private RFutureBoolean tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime ! -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {RFutureBoolean ttlRemainingFuture this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) - {if (e null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}}此处出现leaseTime时间判断的2个分支实际上就是加锁时是否设置过期时间未设置过期时间-1时则会有watchDog的锁续约下文一个注册了加锁事件的续约任务。我们先来看有过期时间tryLockInnerAsync部分evalWriteAsync是eval命令执行lua的入口T RFutureT tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {this.internalLockLeaseTime unit.toMillis(leaseTime);return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, if (redis.call(exists, KEYS[1]) 0) then redis.call(hset, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);, Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});}这里揭开真面目eval命令执行Lua脚本的地方此处的Lua脚本展开-- 不存在该key时 if (redis.call(exists, KEYS[1]) 0) then -- 新增该锁并且hash中该线程id对应的count置1redis.call(hincrby, KEYS[1], ARGV[2], 1); -- 设置过期时间redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; -- 存在该key 并且 hash中线程id的key也存在 if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then -- 线程重入次数redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);和前面我们写自定义的分布式锁的脚本几乎一致看来redisson也是一样的实现具体参数分析// keyName KEYS[1] Collections.singletonList(this.getName()) // leaseTime ARGV[1] this.internalLockLeaseTime // uuidthreadId组合的唯一值 ARGV[2] this.getLockName(threadId)总共3个参数完成了一段逻辑判断该锁是否已经有对应hash表存在• 没有对应的hash表则set该hash表中一个entry的key为锁名称value为1之后设置该hash表失效时间为leaseTime• 存在对应的hash表则将该lockName的value执行1操作也就是计算进入次数再设置失效时间leaseTime• 最后返回这把锁的ttl剩余时间也和上述自定义锁没有区别既然如此那解锁的步骤也肯定有对应的-1操作再看unlock方法同样查找方法名一路到protected RFutureBoolean unlockInnerAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, if (redis.call(exists, KEYS[1]) 0) then redis.call(publish, KEYS[2], ARGV[1]); return 1; end;if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil;end; local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); if (counter 0) then redis.call(pexpire, KEYS[1], ARGV[2]); return 0; else redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1; end; return nil;, Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});}掏出Lua部分-- 不存在key if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil; end; -- 计数器 -1 local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); if (counter 0) then -- 过期时间重设redis.call(pexpire, KEYS[1], ARGV[2]); return 0; else-- 删除并发布解锁消息redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1; end; return nil;该Lua KEYS有2个Arrays.asList(getName(), getChannelName())name 锁名称 channelName用于pubSub发布消息的channel名称ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)LockPubSub.UNLOCK_MESSAGEchannel发送消息的类别此处解锁为0internalLockLeaseTimewatchDog配置的超时时间默认为30slockName 这里的lockName指的是uuid和threadId组合的唯一值步骤如下1.如果该锁不存在则返回nil2.如果该锁存在则将其线程的hash key计数器-13.计数器counter0重置下失效时间返回0否则删除该锁发布解锁消息unlockMessage返回1其中unLock的时候使用到了Redis发布订阅PubSub完成消息通知。而订阅的步骤就在RedissonLock的加锁入口的lock方法里long threadId Thread.currentThread().getId();Long ttl this.tryAcquire(-1L, leaseTime, unit, threadId);if (ttl ! null) {// 订阅RFutureRedissonLockEntry future this.subscribe(threadId);if (interruptibly) {this.commandExecutor.syncSubscriptionInterrupted(future);} else {this.commandExecutor.syncSubscription(future);}// 省略当锁被其他线程占用时通过监听锁的释放通知在其他线程通过RedissonLock释放锁时会通过发布订阅pub/sub功能发起通知等待锁被其他线程释放也是为了避免自旋的一种常用效率手段。4.1 解锁消息为了一探究竟通知了什么通知后又做了什么进入LockPubSub。这里只有一个明显的监听方法onMessage其订阅和信号量的释放都在父类PublishSubscribe我们只关注监听事件的实际操作protected void onMessage(RedissonLockEntry value, Long message) {Runnable runnableToExecute;if (message.equals(unlockMessage)) {// 从监听器队列取监听线程执行监听回调runnableToExecute (Runnable)value.getListeners().poll();if (runnableToExecute ! null) {runnableToExecute.run();}// getLatch()返回的是Semaphore信号量此处是释放信号量// 释放信号量后会唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁value.getLatch().release();} else if (message.equals(readUnlockMessage)) {while(true) {runnableToExecute (Runnable)value.getListeners().poll();if (runnableToExecute null) {value.getLatch().release(value.getLatch().getQueueLength());break;}runnableToExecute.run();}}}发现一个是默认解锁消息一个是读锁解锁消息因为redisson是有提供读写锁的而读写锁读读情况和读写、写写情况互斥情况不同我们只看上面的默认解锁消息unlockMessage分支LockPubSub监听最终执行了2件事runnableToExecute.run() 执行监听回调value.getLatch().release(); 释放信号量Redisson通过LockPubSub监听解锁消息执行监听回调和释放信号量通知等待线程可以重新抢锁。这时再回来看tryAcquireOnceAsync另一分支private RFutureBoolean tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime ! -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {RFutureBoolean ttlRemainingFuture this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) - {if (e null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}}可以看到无超时时间时在执行加锁操作后还执行了一段费解的逻辑ttlRemainingFuture.onComplete((ttlRemaining, e) - {if (e null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);}}}) } } }) 此处涉及到Netty的Future/Promise-Listener模型Redisson中几乎全部以这种方式通信所以说Redisson是基于Netty通信机制实现的理解这段逻辑可以试着先理解在 Java 的 Future 中业务逻辑为一个 Callable 或 Runnable 实现类该类的 call()或 run()执行完毕意味着业务逻辑的完结在 Promise 机制中可以在业务逻辑中人工设置业务逻辑的成功与失败这样更加方便的监控自己的业务逻辑。这块代码的表面意义就是在执行异步加锁的操作后加锁成功则根据加锁完成返回的ttl是否过期来确认是否执行一段定时任务。这段定时任务的就是watchDog的核心。4.2 锁续约查看RedissonLock.this.scheduleExpirationRenewal(threadId)private void scheduleExpirationRenewal(long threadId) {RedissonLock.ExpirationEntry entry new RedissonLock.ExpirationEntry();RedissonLock.ExpirationEntry oldEntry (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry ! null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);this.renewExpiration();}}private void renewExpiration() {RedissonLock.ExpirationEntry ee (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee ! null) {Timeout task this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {RedissonLock.ExpirationEntry ent (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent ! null) {Long threadId ent.getFirstThreadId();if (threadId ! null) {RFutureBoolean future RedissonLock.this.renewExpirationAsync(threadId);future.onComplete((res, e) - {if (e ! null) {RedissonLock.log.error(Cant update lock RedissonLock.this.getName() expiration, e);} else {if (res) {RedissonLock.this.renewExpiration();}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);ee.setTimeout(task);}}拆分来看这段连续嵌套且冗长的代码实际上做了几步• 添加一个netty的Timeout回调任务每internalLockLeaseTime / 3毫秒执行一次执行的方法是renewExpirationAsync• renewExpirationAsync重置了锁超时时间又注册一个监听器监听回调又执行了renewExpirationrenewExpirationAsync 的Lua如下protected RFutureBoolean renewExpirationAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(pexpire, KEYS[1], ARGV[1]); return 1; end; return 0;, Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});}if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(pexpire, KEYS[1], ARGV[1]); return 1; end; return 0;重新设置了超时时间。Redisson加这段逻辑的目的是什么目的是为了某种场景下保证业务不影响如任务执行超时但未结束锁已经释放的问题。当一个线程持有了一把锁由于并未设置超时时间leaseTimeRedisson默认配置了30S开启watchDog每10S对该锁进行一次续约维持30S的超时时间直到任务完成再删除锁。这就是Redisson的锁续约也就是WatchDog实现的基本思路。4.3 流程概括通过整体的介绍流程简单概括A、B线程争抢一把锁A获取到后B阻塞B线程阻塞时并非主动CAS而是PubSub方式订阅该锁的广播消息A操作完成释放了锁B线程收到订阅消息通知B被唤醒开始继续抢锁拿到锁详细加锁解锁流程总结如下图5 公平锁以上介绍的可重入锁是非公平锁Redisson还基于Redis的队列List和ZSet实现了公平5.1 公平的定义是什么公平就是按照客户端的请求先来后到排队来获取锁先到先得也就是FIFO所以队列和容器顺序编排必不可少5.2 FairSync回顾JUC的ReentrantLock公平锁的实现/*** Sync object for fair locks*/ static final class FairSync extends Sync {private static final long serialVersionUID -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Dont grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();if (c 0) {if (!hasQueuedPredecessors() compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current getExclusiveOwnerThread()) {int nextc c acquires;if (nextc 0)throw new Error(Maximum lock count exceeded);setState(nextc);return true;}return false;} }AQS已经提供了整个实现是否公平取决于实现类取出节点逻辑是否顺序取AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架通过内置FIFO队列来完成资源获取线程的排队工作他自身没有实现同步接口仅仅定义了若干同步状态获取和释放的方法来供自定义同步组件使用上图支持独占和共享获取这是基于模版方法模式的一种设计给公平/非公平提供了土壤。我们用2张图来简单解释AQS的等待流程出自《JAVA并发编程的艺术》一张是同步队列FIFO双向队列管理 获取同步状态失败抢锁失败的线程引用、等待状态和前驱后继节点的流程图一张是独占式获取同步状态的总流程核心acquire(int arg)方法调用流程可以看出锁的获取流程AQS维护一个同步队列获取状态失败的线程都会加入到队列中进行自旋移出队列或停止自旋的条件是前驱节点为头节点切成功获取了同步状态。而比较另一段非公平锁类NonfairSync可以发现控制公平和非公平的关键代码在于hasQueuedPredecessors方法。static final class NonfairSync extends Sync {private static final long serialVersionUID 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);} }NonfairSync减少了了hasQueuedPredecessors判断条件该方法的作用就是查看同步队列中当前节点是否有前驱节点如果有比当前线程更早请求获取锁则返回true。保证每次都取队列的第一个节点线程来获取锁这就是公平规则5.2.1 为什么JUC以默认非公平锁呢因为当一个线程请求锁时只要获取来同步状态即成功获取。在此前提下刚释放的线程再次获取同步状态的几率会非常大使得其他线程只能在同步队列中等待。但这样带来的好处是非公平锁大大减少了系统线程上下文的切换开销。可见公平的代价是性能与吞吐量。Redis里没有AQS但是有List和zSet看看Redisson是怎么实现公平的。5.3 RedissonFairLockRedissonFairLock 用法依然很简单RLock fairLock redissonClient.getFairLock(lockName); fairLock.lock();RedissonFairLock继承自RedissonLock同样一路向下找到加锁实现方法tryLockInnerAsync。这里有2段冗长的Lua但是Debug发现公平锁的入口在 command RedisCommands.EVAL_LONG 之后此段Lua较长参数也多我们着重分析Lua的实现规则参数:-- lua中的几个参数 KEYS Arrays.ObjectasList(getName(), threadsQueueName, timeoutSetName) KEYS[1]: lock_name, 锁名称 KEYS[2]: redisson_lock_queue:{xxx} 线程队列 KEYS[3]: redisson_lock_timeout:{xxx} 线程id对应的超时集合ARGV internalLockLeaseTime, getLockName(threadId), currentTime threadWaitTime, currentTime ARGV[1]: {leaseTime} 过期时间 ARGV[2]: {Redisson.UUID}:{threadId} ARGV[3] 当前时间 线程等待时间:10:00:00 5000毫秒 10:00:05 ARGV[4] 当前时间10:00:00 部署服务器时间非redis-server服务器时间公平锁实现的Lua脚本-- 1.死循环清除过期key while true do -- 获取头节点local firstThreadId2 redis.call(lindex, KEYS[2], 0);-- 首次获取必空跳出循环if firstThreadId2 false then break;end;-- 清除过期keylocal timeout tonumber(redis.call(zscore, KEYS[3], firstThreadId2));if timeout tonumber(ARGV[4]) thenredis.call(zrem, KEYS[3], firstThreadId2);redis.call(lpop, KEYS[2]);elsebreak;end; end;-- 2.不存在该锁 不存在线程等待队列 || 存在线程等待队列而且第一个节点就是此线程ID)加锁部分主要逻辑 if (redis.call(exists, KEYS[1]) 0) and ((redis.call(exists, KEYS[2]) 0) or (redis.call(lindex, KEYS[2], 0) ARGV[2])) then-- 弹出队列中线程id元素删除Zset中该线程id对应的元素redis.call(lpop, KEYS[2]);redis.call(zrem, KEYS[3], ARGV[2]);local keys redis.call(zrange, KEYS[3], 0, -1);-- 遍历zSet所有key将key的超时时间(score) - 当前时间msfor i 1, #keys, 1 do redis.call(zincrby, KEYS[3], -tonumber(ARGV[3]), keys[i]);end;-- 加锁设置锁过期时间redis.call(hset, KEYS[1], ARGV[2], 1);redis.call(pexpire, KEYS[1], ARGV[1]);return nil; end;-- 3.线程存在重入判断 if redis.call(hexists, KEYS[1], ARGV[2]) 1 thenredis.call(hincrby, KEYS[1], ARGV[2],1);redis.call(pexpire, KEYS[1], ARGV[1]);return nil; end;-- 4.返回当前线程剩余存活时间 local timeout redis.call(zscore, KEYS[3], ARGV[2]);if timeout ~ false then-- 过期时间timeout的值在下方设置此处的减法算出的依旧是当前线程的ttlreturn timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]); end;-- 5.尾节点剩余存活时间 local lastThreadId redis.call(lindex, KEYS[2], -1); local ttl; -- 尾节点不空 尾节点非当前线程 if lastThreadId ~ false and lastThreadId ~ ARGV[2] then-- 计算队尾节点剩余存活时间ttl tonumber(redis.call(zscore, KEYS[3], lastThreadId)) - tonumber(ARGV[4]); else-- 获取lock_name剩余存活时间ttl redis.call(pttl, KEYS[1]); end;-- 6.末尾排队 -- zSet 超时时间score尾节点ttl 当前时间 5000ms 当前时间无则新增有则更新 -- 线程id放入队列尾部排队无则插入有则不再插入 local timeout ttl tonumber(ARGV[3]) tonumber(ARGV[4]); if redis.call(zadd, KEYS[3], timeout, ARGV[2]) 1 thenredis.call(rpush, KEYS[2], ARGV[2]); end; return ttl;5.4 公平锁加锁步骤通过以上Lua可以发现lua操作的关键结构是列表list和有序集合zSet。其中list维护了一个等待的线程队列redisson_lock_queue:{xxx}zSet维护了一个线程超时情况的有序集合redisson_lock_timeout:{xxx}尽管lua较长但是可以拆分为6个步骤1.队列清理保证队列中只有未过期的等待线程2.首次加锁hset加锁pexpire过期时间3.重入判断此处同可重入锁lua4.返回ttl5.计算尾节点ttl初始值为锁的剩余过期时间6.末尾排队ttl 2 * currentTime waitTime是score的默认值计算公式2.模拟如果模拟以下顺序就会明了redisson公平锁整个加锁流程假设 t1 10:00:00 t2 10:00:10 t3 10:00:20t1当线程1初次获取锁1.等待队列无头节点跳出死循环-22.不存在该锁 不存在线程等待队列 成立2.1 lpop和zerm、zincrby都是无效操作只有加锁生效说明是首次加锁加锁后返回nil加锁成功线程1获取到锁结束t2线程2尝试获取锁线程1未释放锁1.等待队列无头节点跳出死循环-22.不存在该锁 不成立-33.非重入线程 -44.score无值 -55.尾节点为空设置ttl初始值为lock_name的ttl - 66.按照ttl waitTime currentTime currentTime 来设置zSet超时时间score并且加入等待队列线程2为头节点score 20S 5000ms 10:00:10 10:00:10 10:00:35 10:00:10t3线程3尝试获取锁线程1未释放锁1.等待队列有头节点1.1未过期-22.不存在该锁不成立-33.非重入线程-44.score无值 -55.尾节点不为空 尾节点线程为2非当前线程5.1取出之前设置的score减去当前时间ttl score - currentTime -66.按照ttl waitTime currentTime currentTime 来设置zSet超时时间score并且加入等待队列score 10S 5000ms 10:00:20 10:00:20 10:00:35 10:00:20如此一来三个需要抢夺一把锁的线程完成了一次排队在list中排列他们等待线程id在zSet中存放过期时间便于排列优先级。其中返回ttl的线程2客户端、线程3客户端将会一直按一定间隔自旋重复执行该段Lua尝试加锁如此一来便和AQS有了异曲同工之处。而当线程1释放锁之后这里依旧有通过Pub/Sub发布解锁消息通知其他线程获取10:00:30 线程2尝试获取锁线程1已释放锁1.等待队列有头节点未过期-22.不存在该锁 等待队列头节点是当前线程 成立2.1删除当前线程的队列信息和zSet信息超时时间为线程2 10:00:35 10:00:10 - 10:00:30 10:00:15线程3 10:00:35 10:00:20 - 10:00:30 10:00:252.2线程2获取到锁重新设置过期时间加锁成功线程2获取到锁结束排队结构如图:公平锁的释放脚本和重入锁类似多了一步加锁开头的清理过期key的while true逻辑在此不再展开篇幅描述。由上可以看出Redisson公平锁的玩法类似于延迟队列的玩法核心都在Redis的List和zSet结构的搭配但又借鉴了AQS实现在定时判断头节点上如出一辙watchDog保证了锁的竞争公平和互斥。并发场景下lua脚本里zSet的score很好地解决了顺序插入的问题排列好优先级。并且为了防止因异常而退出的线程无法清理每次请求都会判断头节点的过期情况给予清理最后释放时通过CHANNEL通知订阅线程可以来获取锁重复一开始的步骤顺利交接到下一个顺序线程。6 总结Redisson整体实现分布式加解锁流程的实现稍显复杂作者Rui Gu对Netty和JUC、Redis研究深入利用了很多高级特性和语义值得深入学习本次介绍也只是单机Redis下锁实现。Redisson也提供了多机情况下的联锁MultiLockhttps://github.com/redisson/redisson/wiki/8.-分布式锁和同步器#81-可重入锁reentrant-lock和官方推荐的红锁RedLockhttps://github.com/redisson/redisson/wiki/8.-分布式锁和同步器#84-红锁redlock所以当你真的需要分布式锁时不妨先来Redisson里找找。
文章转载自:
http://www.morning.pbdnj.cn.gov.cn.pbdnj.cn
http://www.morning.wckrl.cn.gov.cn.wckrl.cn
http://www.morning.qgjgsds.com.cn.gov.cn.qgjgsds.com.cn
http://www.morning.jfzbk.cn.gov.cn.jfzbk.cn
http://www.morning.bttph.cn.gov.cn.bttph.cn
http://www.morning.xmrmk.cn.gov.cn.xmrmk.cn
http://www.morning.tgqzp.cn.gov.cn.tgqzp.cn
http://www.morning.kzhxy.cn.gov.cn.kzhxy.cn
http://www.morning.rykn.cn.gov.cn.rykn.cn
http://www.morning.hmqmm.cn.gov.cn.hmqmm.cn
http://www.morning.qgghj.cn.gov.cn.qgghj.cn
http://www.morning.jzyfy.cn.gov.cn.jzyfy.cn
http://www.morning.hwcgg.cn.gov.cn.hwcgg.cn
http://www.morning.slfmp.cn.gov.cn.slfmp.cn
http://www.morning.mpszk.cn.gov.cn.mpszk.cn
http://www.morning.bxqry.cn.gov.cn.bxqry.cn
http://www.morning.mhnd.cn.gov.cn.mhnd.cn
http://www.morning.rgqnt.cn.gov.cn.rgqnt.cn
http://www.morning.jltmb.cn.gov.cn.jltmb.cn
http://www.morning.jrhcp.cn.gov.cn.jrhcp.cn
http://www.morning.bbyqz.cn.gov.cn.bbyqz.cn
http://www.morning.yktr.cn.gov.cn.yktr.cn
http://www.morning.dnydy.cn.gov.cn.dnydy.cn
http://www.morning.fpqq.cn.gov.cn.fpqq.cn
http://www.morning.jqpq.cn.gov.cn.jqpq.cn
http://www.morning.wrkhf.cn.gov.cn.wrkhf.cn
http://www.morning.rszyf.cn.gov.cn.rszyf.cn
http://www.morning.kyjpg.cn.gov.cn.kyjpg.cn
http://www.morning.wdply.cn.gov.cn.wdply.cn
http://www.morning.bqdgr.cn.gov.cn.bqdgr.cn
http://www.morning.plxnn.cn.gov.cn.plxnn.cn
http://www.morning.tdwjj.cn.gov.cn.tdwjj.cn
http://www.morning.lkhgq.cn.gov.cn.lkhgq.cn
http://www.morning.cttgj.cn.gov.cn.cttgj.cn
http://www.morning.hpnhl.cn.gov.cn.hpnhl.cn
http://www.morning.qfths.cn.gov.cn.qfths.cn
http://www.morning.btmwd.cn.gov.cn.btmwd.cn
http://www.morning.kfjnx.cn.gov.cn.kfjnx.cn
http://www.morning.bytgy.com.gov.cn.bytgy.com
http://www.morning.lkfsk.cn.gov.cn.lkfsk.cn
http://www.morning.ngjpt.cn.gov.cn.ngjpt.cn
http://www.morning.yqyhr.cn.gov.cn.yqyhr.cn
http://www.morning.yprnp.cn.gov.cn.yprnp.cn
http://www.morning.lhgkr.cn.gov.cn.lhgkr.cn
http://www.morning.wxrbl.cn.gov.cn.wxrbl.cn
http://www.morning.qzqjz.cn.gov.cn.qzqjz.cn
http://www.morning.bxqry.cn.gov.cn.bxqry.cn
http://www.morning.sbkb.cn.gov.cn.sbkb.cn
http://www.morning.txjrc.cn.gov.cn.txjrc.cn
http://www.morning.htpjl.cn.gov.cn.htpjl.cn
http://www.morning.fkwgk.cn.gov.cn.fkwgk.cn
http://www.morning.htqrh.cn.gov.cn.htqrh.cn
http://www.morning.wqrdx.cn.gov.cn.wqrdx.cn
http://www.morning.hjwzpt.com.gov.cn.hjwzpt.com
http://www.morning.csznh.cn.gov.cn.csznh.cn
http://www.morning.xsszn.cn.gov.cn.xsszn.cn
http://www.morning.mcbqq.cn.gov.cn.mcbqq.cn
http://www.morning.mnsmb.cn.gov.cn.mnsmb.cn
http://www.morning.cplym.cn.gov.cn.cplym.cn
http://www.morning.dmnqh.cn.gov.cn.dmnqh.cn
http://www.morning.rtbx.cn.gov.cn.rtbx.cn
http://www.morning.nfzw.cn.gov.cn.nfzw.cn
http://www.morning.yrxcn.cn.gov.cn.yrxcn.cn
http://www.morning.pqchr.cn.gov.cn.pqchr.cn
http://www.morning.dmcxh.cn.gov.cn.dmcxh.cn
http://www.morning.ykrkq.cn.gov.cn.ykrkq.cn
http://www.morning.yrjkp.cn.gov.cn.yrjkp.cn
http://www.morning.btlsb.cn.gov.cn.btlsb.cn
http://www.morning.fjscr.cn.gov.cn.fjscr.cn
http://www.morning.rrgqq.cn.gov.cn.rrgqq.cn
http://www.morning.cwjsz.cn.gov.cn.cwjsz.cn
http://www.morning.mcpby.cn.gov.cn.mcpby.cn
http://www.morning.lwwnq.cn.gov.cn.lwwnq.cn
http://www.morning.fwgnq.cn.gov.cn.fwgnq.cn
http://www.morning.qfrmy.cn.gov.cn.qfrmy.cn
http://www.morning.slwqt.cn.gov.cn.slwqt.cn
http://www.morning.ywqsk.cn.gov.cn.ywqsk.cn
http://www.morning.bnlkc.cn.gov.cn.bnlkc.cn
http://www.morning.yrjym.cn.gov.cn.yrjym.cn
http://www.morning.elmtw.cn.gov.cn.elmtw.cn
http://www.tj-hxxt.cn/news/243375.html

相关文章:

  • 微信网站是多少深圳宝安高端网站建设公司
  • 上饶做网站最好的公司网站优化可以自己做么
  • 申请免费网站建设泰安中推网络科技有限公司
  • 网站必须备案吗视觉中国网站建设公司
  • 沈阳思路网站制作最新章节 62.一起来做网站吧
  • flash网站项目背景明港网站建设
  • 网页设计网站长沙王也天与葛优
  • 成都龙泉建设有限公司网站汕头集团做网站方案
  • 百度站长平台网页版nginx wordpress
  • 网站建设方案设计室内装修效果图大全2023图片
  • 注册网站给谁交钱北京网页设计公司兴田德润挺好
  • 响应式网站开发框架遵义做网站的公司
  • 杭州网站建设培训学校wordpress分类目录单个调用
  • wap网站制作开发公司高端医疗网站建设
  • 许昌做网站公司专业做网站哪家好广州网络推广平台
  • 郑州网站建设的软件网站建设产品图
  • 杭州网站建设方案推广上海做衣服版的网站
  • 泉州网站建设泉州建站公司外包
  • 苏州网站开发建设广州天河酒店网站建设
  • 网站建设资源wordpress 多站点模式
  • 深圳宝安网站建设公司推荐营销软文范例大全100字
  • wordpress全站静态化下载一个网站的源码下载
  • 网站开发p6云南省建设厅网站人员查询
  • 百度seo关键词排名推荐韶关seo
  • 科技网站小编网站县区分站点建设
  • 尚品网站建设怎么修改收录网站的标题
  • 在浏览器播放视频成都seo整站
  • 政协 网站建设预约网站模板
  • 聊城做网站的公司新闻30个适合大学生创业的项目
  • 外贸网站 英文摄影作品网站源码