郑州投资网站建设,合同网站开发 设计 后期维护,包装设计效果图生成器,福州网站seo推广优化作者简介#xff1a;大家好#xff0c;我是smart哥#xff0c;前中兴通讯、美团架构师#xff0c;现某互联网公司CTO 联系qq#xff1a;184480602#xff0c;加我进群#xff0c;大家一起学习#xff0c;一起进步#xff0c;一起对抗互联网寒冬 自定义Redis分布式锁的…作者简介大家好我是smart哥前中兴通讯、美团架构师现某互联网公司CTO 联系qq184480602加我进群大家一起学习一起进步一起对抗互联网寒冬 自定义Redis分布式锁的弊端
在上一篇我们自定义了一个Redis分布式锁用来解决多节点定时任务的拉取问题避免任务重复执行 但仍然存在很多问题
加锁操作不是原子性的setnx和expire两步操作不是原子性的中间宕机会导致死锁
public boolean tryLock(String lockKey, String value, long expireTime, TimeUnit timeUnit) {// 1.先setnxBoolean lock redisTemplate.opsForValue().setIfAbsent(lockKey, value);if (lock ! null lock) {// 2.再expireredisTemplate.expire(lockKey, expireTime, timeUnit);return true;} else {return false;}
} 当然啦高版本的SpringBoot Redis依赖其实提供了加锁的原子性操作
/*** 尝试上锁setNX expire** param lockKey 锁* param value 对应的值* param expireTime 过期时间* param timeUnit 时间单位* return*/
Override
public boolean tryLock(String lockKey, String value, long expireTime, TimeUnit timeUnit) {try {// 高版本SpringBoot的setIfAbsent可以设置4个参数一步到位redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, timeUnit);return true;} catch (Exception e) {e.printStackTrace();}return false;
}
从 Redis 2.6.12 版本开始现在6.x了... SET 命令的行为可以通过一系列参数来修改也因为 SET 命令可以通过参数来实现和 SETNX 、 SETEX 和 PSETEX 三个命令的效果所以将来的 Redis 版本可能会废弃并最终移除 SETNX 、 SETEX 和 PSETEX 这三个命令。 解锁操作不是原子性的可能造成不同节点之间互相删锁 虽然上一篇设计的unLock()不是原子操作但可以避免不同节点之间互相删锁
public boolean unLock(String lockKey, String value) {// 1.获取锁的value存的是MACHINE_IDString machineId (String) redisTemplate.opsForValue().get(lockKey);if (StringUtils.isNotEmpty(machineId) machineId.equals(value)) {// 2.只能删除当前节点设置的锁redisTemplate.delete(lockKey);return true;}return false;
}
畏难情绪作祟不想考虑锁续期的问题企图采用队列的方式缩减定时任务执行时间直接把任务丢到队列中。但实际上可能存在任务堆积个别情况下会出现上次已经拉取某个任务并丢到Redis队列中但由于队列比较繁忙该任务还未被执行数据库状态也尚未更改为status1已执行结果下次又拉取一遍重复执行简单的解决策略是虽然无法阻止入队但是出队消费时可以判断where status0后执行 引入Redis Message Queue会让系统变得更加复杂我之前就因为使用了上面的模型导致各种偶发性的BUG非常不好排查。一般来说定时任务应该设计得简单点 也就是说绕来绕去想要设计一个较完备的Redis分布式锁必须至少解决3个问题
加锁原子性setnx和expire要保证原子性否则会容易发生死锁解锁原子性不能误删别人的锁需要考虑业务/定时任务执行的时间并为锁续期 如果不考虑性能啥的加解锁原子性都可以通过lua脚本实现利用Redis单线程的特性 一次执行一个脚本要么成功要么失败不会和其他指令交错执行。 最难的是如何根据实际业务的执行时间给锁续期虽然我们已经通过判断MACHINE_ID避免了不同节点互相删除锁 但本质上我们需要的是 本文我们的主要目标就是实现锁续期 好在Redisson已经实现了所以目标又变成了解Redisson的锁续期机制。 Redisson案例
Redisson环境搭建
server:port: 8080spring:redis:host: password: database: 1# 调整控制台日志格式稍微精简一些非必要操作
logging:pattern:console: %d{yyyy-MM-dd HH:mm:ss} - %thread - %msg%n
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependency!--大家也可以单独引入Redisson依赖然后通过Configuration自己配置RedissonClient--dependencygroupIdorg.redisson/groupIdartifactIdredisson-spring-boot-starter/artifactIdversion3.13.6/version/dependency
/dependencies
然后就可以在test包下测试了~ lock()方法初探
Slf4j
RunWith(SpringRunner.class)
SpringBootTest
public class RLockTest {Autowiredprivate RedissonClient redissonClient;Testpublic void testRLock() throws InterruptedException {new Thread(this::testLockOne).start();new Thread(this::testLockTwo).start();TimeUnit.SECONDS.sleep(200);}public void testLockOne(){try {RLock lock redissonClient.getLock(bravo1988_distributed_lock);log.info(testLockOne尝试加锁...);lock.lock();log.info(testLockOne加锁成功...);log.info(testLockOne业务开始...);TimeUnit.SECONDS.sleep(50);log.info(testLockOne业务结束...);lock.unlock();log.info(testLockOne解锁成功...);} catch (InterruptedException e) {e.printStackTrace();}}public void testLockTwo() {try {RLock lock redissonClient.getLock(bravo1988_distributed_lock);log.info(testLockTwo尝试加锁...);lock.lock();log.info(testLockTwo加锁成功...);log.info(testLockTwo业务开始...);TimeUnit.SECONDS.sleep(50);log.info(testLockTwo业务结束...);lock.unlock();log.info(testLockTwo解锁成功...);} catch (InterruptedException e) {e.printStackTrace();}}}
结果
2023-12-21 14:24:33 - Thread-3 - testLockTwo尝试加锁...
2023-12-21 14:24:33 - Thread-2 - testLockOne尝试加锁... testLockOne()执行过程中testLockTwo()一直阻塞
2023-12-21 14:24:33 - Thread-2 - testLockOne加锁成功...
2023-12-21 14:24:33 - Thread-2 - testLockOne业务开始...
2023-12-21 14:25:23 - Thread-2 - testLockOne业务结束...
2023-12-21 14:25:23 - Thread-2 - testLockOne解锁成功... testLockOne()执行结束释放锁testLockTwo()抢到锁
2023-12-21 14:25:23 - Thread-3 - testLockTwo加锁成功...
2023-12-21 14:25:23 - Thread-3 - testLockTwo业务开始...
2023-12-21 14:26:13 - Thread-3 - testLockTwo业务结束...
2023-12-21 14:26:13 - Thread-3 - testLockTwo解锁成功... 通过上面的代码我们有以下疑问
lock()方法是原子性的吗lock()有设置过期时间吗是多少lock()实现锁续期了吗lock()方法怎么实现阻塞的又怎么被唤醒 先忘了这些跟着我们走一遍lock()源码就明白了。 lock()源码解析
lock()加锁去除异常的情况无非加锁成功、加锁失败两种情况我们先看加锁成功的情况。 流程概览
我们从这段最简单的代码入手
Slf4j
RunWith(SpringRunner.class)
SpringBootTest
public class RLockTest {Autowiredprivate RedissonClient redissonClient;Testpublic void testLockSuccess() throws InterruptedException {RLock lock redissonClient.getLock(bravo1988_distributed_lock);log.info(准备加锁...);lock.lock();log.info(加锁成功...);TimeUnit.SECONDS.sleep(300);}
}
大家跟着我们先打几个断点SpringBoot2.3.4 注意啊把截图中能看到的断点都打上。
OK接着大家自己启动DEBUG感受一下大致流程然后看下面的注释
// redisson.lock()
Override
public void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}
}// 为了方便辨认我直接把传进来的参数写在参数列表上
private void lock(long leaseTime-1, TimeUnit unitnull, boolean interruptiblyfalse) throws InterruptedException {// 获取当前线程idlong threadId Thread.currentThread().getId();// 尝试上锁。上锁成功返回null上锁失败返回ttlLong ttl tryAcquire(-1, leaseTime-1, unitnull, threadId666);// 上锁成功方法结束回到主线程执行业务啦后台有个定时任务在给当前锁续期if (ttl null) {return;}// 上锁成功就不走下面的流程了所以这里直接省略// 略加锁失败后续流程...
}// 尝试上锁。上锁成功返回null上锁失败返回【当前已经存在的锁】的ttl方便调用者判断多久之后能重新获取锁
private Long tryAcquire(long waitTime-1, long leaseTime-1, TimeUnit unitnull, long threadId666) {/*** 有两次调用1.tryAcquireAsync()返回Future 2.从Future获取异步结果异步结果就是ttl* 重点是tryAcquireAsync()*/return get(tryAcquireAsync(waitTime-1, leaseTime-1, unitnull, threadId666));
}// 获取过期时间非重点
protected final V V get(RFutureV future) {return commandExecutor.get(future);
}// 重点加锁后返回RFuture内部包含ttl。调用本方法可能加锁成功也可能加锁失败外界可以通过ttl判断
private T RFutureLong tryAcquireAsync(long waitTime-1, long leaseTime-1, TimeUnit unitnull, long threadId666) {// lock()默认leaseTime-1所以会跳过ifif (leaseTime ! -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 执行lua脚本尝试加锁并返回RFuture。这个方法是异步的其实是把任务提交给线程池RFutureLong ttlRemainingFuture tryLockInnerAsync(waitTime-1,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()30秒,TimeUnit.MILLISECONDS, threadId666, RedisCommands.EVAL_LONG);// 设置回调方法异步线程与Redis交互得到结果后会回调BiConsumer#accept()ttlRemainingFuture.onComplete((ttlRemaining, e) - {// 发生异常时直接returnif (e ! null) {return;}// 说明加锁成功if (ttlRemaining null) {// 启动额外的线程按照一定规则给当前锁续期scheduleExpirationRenewal(threadId);}});// 返回RFuture里面有ttlRemainingreturn ttlRemainingFuture;
}// 执行lua脚本尝试上锁
T RFutureT tryLockInnerAsync(long waitTime-1, long leaseTime30*1000, TimeUnit unit毫秒, long threadId666, RedisStrictCommandT command) {internalLockLeaseTime unit.toMillis(leaseTime);/*** 大家去看一下evalWriteAsync()的参数列表看看每个参数都代表什么就能理解KEYS[]和ARGV[]以及整个脚本什么意思了* 如果你仔细看lua脚本就会明白加锁成功时返回ttlRemainingnull加锁失败时返回ttlRemainingxxx上一个锁还剩多少时间** 另外我们自定义的Redis分布式锁采用了IdUtil生成节点id和getLockName(threadId)本质是一样的*/return evalWriteAsync(getName(), LongCodec.INSTANCE, command,if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);,Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}// 向Redis服务器发送脚本并返回RFuture大家可以近似看成往线程池提交一个任务然后将异步结果封装到CompletableFuture
protected T RFutureT evalWriteAsync(String key, Codec codec, RedisCommandT evalCommandType, String script, ListObject keys, Object... params) {CommandBatchService executorService createCommandBatchService();RFutureT result executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);if (!(commandExecutor instanceof CommandBatchService)) {executorService.executeAsync();}return result;
}
示意图 整个流程比较简单只有两个难点
lua脚本写了啥ttlRemainingFuture.onComplete()有什么作用 lua脚本解读
大家可以通过evalWriteAsync()的参数列表推导出KEYS、ARGV分别是什么
KEYS[] Collections.singletonList(getName())
ARGV[] internalLockLeaseTime, getLockName(threadId)
-- 如果不存在锁bravo1988_distributed_lock
if (redis.call(exists, KEYS[1]) 0) then-- 使用hincrby设置锁hincrby bravo1988_distributed_lock a1b2c3d4:666 1redis.call(hincrby, KEYS[1], ARGV[2], 1); -- 设置过期时间。ARGV[1]internalLockLeaseTimeredis.call(pexpire, KEYS[1], ARGV[1]); -- 返回nullreturn nil; end; -- 如果当前节点已经设置bravo1988_distributed_lock注意传了ARGV[2]节点id
if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then -- 就COUNT可重入锁redis.call(hincrby, KEYS[1], ARGV[2], 1); -- 设置过期时间。ARGV[1]internalLockLeaseTimeredis.call(pexpire, KEYS[1], ARGV[1]); -- 返回nullreturn nil;end; -- 已经存在锁且不是当前节点设置的就返回锁的过期时间ttl
return redis.call(pttl, KEYS[1]);
总的来说Redisson设计的分布式锁是采用hash结构
LOCK_NAME锁的KEY CLIENT_ID节点ID COUNT重入次数 回调函数的作用
之前我们已经学过CompletableFuture的回调机制 RFuture#onComplete()和它很相似
ttlRemainingFuture.onComplete((ttlRemaining, e) - {// 发生异常时直接returnif (e ! null) {return;}// 说明加锁成功if (ttlRemaining null) {// 启动额外的线程按照一定规则给当前锁续期scheduleExpirationRenewal(threadId);}});
onComplete()应该也是把回调函数推到stack中方便后面异步线程弹栈执行。 至此我们已经解决了之前的两个问题
lua脚本是什么意思见注释ttlRemainingFuture.onComplete()有什么作用设置回调函数等会儿会有线程调用 虽然在CompletableFuture中已经强调过这里还是要提一下被回调的不是onComplete(BiConsumer)而是BiConsumer#accept()。主线程在调用onComplete(BiConsumer)时把它作为参数传入然后被推入栈中
BiConsumer consumer (ttlRemaining, e) - {// 发生异常时直接returnif (e ! null) {return;}// 说明加锁成功if (ttlRemaining null) {// 启动额外的线程按照一定规则给当前锁续期scheduleExpirationRenewal(threadId);}
} Redisson异步回调机制
现在已经确定了尝试加锁后会返回RFuture并且我们可以通过RFuture做两件事
通过RFuture获取ttlRemaining也就是上一个锁的过期时间如果为null则本次加锁成功否则加锁失败需要等待通过RFuture设置回调函数 现在疑问是
异步线程是谁哪来的onComplete()设置的回调函数是干嘛的回调时的参数(ttlRemaining, e)哪来的 1、3两个问题非常难源码比较绕这里就带大家感性地体验一下有兴趣可以自己跟源码了解。清除刚才的全部断点只留下 再次DEBUG线程会先到达return ttlRemainingFuture随后回调BiConsumer#accept() 回调时线程变了 大家有兴趣可以自己顺着调用栈逆推回去还是比较复杂的涉及到NIO、Promise等源头还是在线程池但其中又设计了Listeners的收集和循环唤醒
protected T RFutureT evalWriteAsync(String key, Codec codec, RedisCommandT evalCommandType, String script, ListObject keys, Object... params) {CommandBatchService executorService createCommandBatchService();RFutureT result executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);if (!(commandExecutor instanceof CommandBatchService)) {executorService.executeAsync();}return result;
}
总之目前为止我们只需要知道 我们虽然不知道onComplete()具体如何实现回调比CompletableFuture复杂得多但是我们知道锁续期和RFuture的回调机制相关 Redisson如何实现锁续期 最终会进入
private void renewExpiration() {ExpirationEntry ee EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee null) {return;}/*** 启动一个定时器Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);* 执行规则是延迟internalLockLeaseTime/3后执行* 注意啊每一个定时任务只执行一遍而且是延迟执行。* * 那么问题就来了* 1.internalLockLeaseTime/3是多久呢* 2.如果定时任务只执行一遍似乎解决不了问题啊本质上和我们手动设置过期时间一样多久合适呢*/ Timeout task commandExecutor.getConnectionManager().newTimeout(new TimerTask() {Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent null) {return;}Long threadId ent.getFirstThreadId();if (threadId null) {return;}// 定时任务的目的是重新执行一遍lua脚本完成锁续期把锁的ttl拨回到30sRFutureBoolean future renewExpirationAsync(threadId);// 设置了一个回调future.onComplete((res, e) - {if (e ! null) {log.error(Cant update lock getName() expiration, e);// 如果宕机了就不会续期了return;}// 如果锁还存在没有unLock说明业务还没结束递归调用当前方法不断续期if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}/**
* 重新执行evalWriteAsync()和加锁时的lua脚本比较类似但有点不同
* 这里设置expire的参数也是internalLockLeaseTime
*
* 看来我们不得不去调查一下internalLockLeaseTime了
*/
protected RFutureBoolean renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(pexpire, KEYS[1], ARGV[1]); return 1; end; return 0;,Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));
}
如果你给renewExpirationAsync()打上断点会发现每隔10秒定时任务就会执行一遍 联想到定时任务的delay是internalLockLeaseTime/3所以推测internalLockLeaseTime为30秒。
点击internalLockLeaseTime很容易跳转到对应的字段 再顺着getLockWatchdogTimeout()跳转很快就会发现 确实是30秒。 梳理一下所谓的Watchdog锁续期机制
lock()第一次成功加锁时设置的锁过期时间默认30秒这个值来自Watchdog变量
// 重点
private T RFutureLong tryAcquireAsync(long waitTime-1, long leaseTime-1, TimeUnit unitnull, long threadId666) {// lock()默认leaseTime-1所以会跳过ifif (leaseTime ! -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 执行lua脚本加锁返回RFuture。第二个参数就是leaseTime来自LockWatchdogTimeoutRFutureLong ttlRemainingFuture tryLockInnerAsync(waitTime-1,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()30秒,TimeUnit.MILLISECONDS, threadId666, RedisCommands.EVAL_LONG);// 设置回调方法ttlRemainingFuture.onComplete((ttlRemaining, e) - {// 发生异常时直接returnif (e ! null) {return;}// 说明加锁成功if (ttlRemaining null) {// 启动额外的线程按照一定规则给当前锁续期scheduleExpirationRenewal(threadId);}});// 返回RFuture里面有ttlRemainingreturn ttlRemainingFuture;
}// 执行lua脚本上锁
T RFutureT tryLockInnerAsync(long waitTime-1, long leaseTime30*1000, TimeUnit unit毫秒, long threadId666, RedisStrictCommandT command) {// 略...
}
onComplete()设置回调等Redis调用回来后异步线程回调BiConsumer#accept()进入scheduleExpirationRenewal(threadId)开始每隔internalLockLeaseTime/3时间就给锁续期 和加锁一样执行lua脚本其实很快所以这里的future.onComplete()虽说是异步但很快就会被调用然后就会递归调用renewExpiration()然后又是一个TimerTask()隔internalLockLeaseTime/3后又给锁续期。 也就是说Redisson的Watchdog定时任务虽然只延迟执行一次但每次调用都会递归所以相当于重复延迟执行。 还记得之前学习CompletableFuture时我写的一行注释吗 也就是说只要主线程的任务不结束就会一直给锁续期。 锁释放有两种情况
任务结束主动unLock()删除锁
redisson.lock();
task();
redisson.unLock();
任务结束不调用unLock()但由于守护线程已经结束不会有后台线程继续给锁续期过了30秒自动过期 上面我们探讨的都是加锁成功的流程直接ttlnull就返回了后面一大坨都是加锁失败时的判断逻辑其中涉及到
while(true)死循环阻塞等待释放锁时Redis的Publish通知在后面的unLock流程会看到其他节点收到锁释放的信号后重新争抢锁 整个过程还是非常复杂的大家有精力可以自行百度了解后面介绍unLock()时也会涉及一部分加锁失败相关内容。 unLock()源码解析
有了lock()的经验unLock()就简单多了 相信大家还是能推断出KEYS[]和ARGV[]这里就直接给出答案了
-- 参数解释
-- KEYS[1] bravo1988_distributed_lock
-- KEYS[2] getChannelName()
-- ARGV[1] LockPubSub.UNLOCK_MESSAGE
-- ARGV[2] internalLockLeaseTime
-- ARGV[3] getLockName(threadId)-- 锁已经不存在返回null
if (redis.call(hexists, KEYS[1], ARGV[3]) 0) thenreturn nil;end;-- 锁还存在执行COUNT--重入锁的反向操作
local counter redis.call(hincrby, KEYS[1], ARGV[3], -1);-- COUNT--后仍然大于0之前可能重入了多次
if (counter 0) then-- 设置过期时间redis.call(pexpire, KEYS[1], ARGV[2]);return 0;
-- COUNT--后小于等于0删除锁并向对应的Channel发送消息NIO消息类型是LockPubSub.UNLOCK_MESSAGE锁释放啦快来抢~
else redis.call(del, KEYS[1]);redis.call(publish, KEYS[2], ARGV[1]);return 1;end;return nil;
也就是说当一个锁被释放时原先持有锁的节点会通过NIO的Channel发送LockPubSub.UNLOCK_MESSAGE告诉其他订阅的Client我已经释放锁啦快来抢啊此时原本阻塞的其他节点就会重新竞争锁。 而所谓重入和反重入简单来说就是
// 加锁三次
redisson.lock();
redisson.lock();
redisson.lock();
// 执行业务
executeTask();
// 相应的就要解锁三次
redisson.unLock();
redisson.unLock();
redisson.unLock();
实际开发不会这样调用但有时会出现子父类方法调用或者同一个线程反复调用使用同一把锁的多个方法就会发生锁的重入COUNT而当这些方法执行完毕逐个弹栈的过程中就会逐个unLock()解锁COUNT--。 lock(leaseTime, unit)自定义过期时间、且不续期
lock()默认会开启定时任务对锁进行续期但Redisson还提供了另一个lock方法 两个lock()唯一的区别是内部调用lock()时一个传了leaseTime-1另一个传了我们自己的leaseTime。对于外部调用者来说
redisson.lock();
redisson.lock(-1, null);
这两种写法其实一样。 当然了通常会传入有意义的leaseTime 这种写法除了更改了锁的默认ttl时间外还阉割了锁续期功能。也就是说10秒后如果任务还没执行完就会和我们手写的Redis分布式锁一样自动释放锁。 为什么锁续期的功能失效了呢留给大家自己解答这里只给出参考答案
// 重点
private T RFutureLong tryAcquireAsync(long waitTime-1, long leaseTime-1, TimeUnit unitnull, long threadId666) {// lock()默认leaseTime-1会跳过这个if执行后面的代码。但如果是lock(10, TimeUnit.SECONDS)会执行if并跳过后面的代码。if (leaseTime ! -1) {// 其实和下面的tryLockInnerAsync()除了时间不一样外没什么差别return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 但由于上面直接return了所以下面的都不会执行/*RFutureLong ttlRemainingFuture tryLockInnerAsync(waitTime-1,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()30秒,TimeUnit.MILLISECONDS, threadId666, RedisCommands.EVAL_LONG);// 设置回调方法不会执行ttlRemainingFuture.onComplete((ttlRemaining, e) - {// 发生异常时直接returnif (e ! null) {return;}// 说明加锁成功if (ttlRemaining null) {// 启动额外的线程按照一定规则给当前锁续期scheduleExpirationRenewal(threadId);}});// 不会执行return ttlRemainingFuture;*/
}// 执行lua脚本加锁
T RFutureT tryLockInnerAsync(long waitTime-1, long leaseTime30*1000, TimeUnit unit毫秒, long threadId666, RedisStrictCommandT command) {// 略...
}
也就是说直接执行lua加锁就返回了没有机会启动定时任务和递归... tryLock()系列让调用者自行决定加锁失败后的操作
之前我们已经观察到如果多个节点都调用lock()那么没获取到锁的节点线程会阻塞直到原先持有锁的节点删除锁并publish LockPubSub.UNLOCK_MESSAGE 。 但如果调用者不希望阻塞呢他有可能想着如果加锁失败我就直接放弃。 是啊毕竟尝试加锁的目的可能完全相反
在保证线程安全的前提下尽量让所有线程都执行成功在保证线程安全的前提下只让一个线程执行成功 前者适用于秒杀、下单等操作希望尽最大努力达成后者适用于定时任务只要让一个节点去执行没有获取锁的节点应该fast-fail快速失败。 也就是说节点获锁失败后理论上可以有各种各样的处理方式
阻塞等待直接放弃试N次再放弃... 但lock、lock(leaseTime, timeUnit)替我们写死了阻塞等待。即使lock(leaseTime, unit)其实也是阻塞等待只不过不会像lock()一样不断续期。 究其原因主要是lock()这些方法对于加锁失败的判断是在内部写死的 而tryLock()方法则去掉了这层中间判断把结果直接呈递到调用者面前让调用者自己决定加锁失败后如何处理 tryLock()直接返回true加锁成功和false加锁失败后续如何处理全凭各个节点自己做出决定。
Test
public void testTryLock() {RLock lock redissonClient.getLock(bravo1988_distributed_lock);boolean b lock.tryLock();if (b) {// 业务操作...}// 调用立即结束不阻塞
}
这样讲可能有点抽象大家可以分别点进lock()和tryLock()自行体会。总之tryLock()中间少了一大块逻辑因为它不插手结果的判断。 另外tryLock()在加锁成功的情况下其实和lock()是一样的也会触发锁续期 如果你不希望触发锁续期可以像lock(leaseTime, unit)一样指定过期时间还可以指定加锁失败后等待多久
Test
public void testLockSuccess() throws InterruptedException {RLock lock redissonClient.getLock(bravo1988_distributed_lock);// 基本等同于lock()加锁成功也【会自动锁续期】但获锁失败【立即返回false】交给调用者判断是否阻塞或放弃lock.tryLock();// 加锁成功仍然【会自动锁续期】但获锁失败【会等待10秒】看看这10秒内当前锁是否释放如果是否则尝试加锁lock.tryLock(10, TimeUnit.SECONDS);// 加锁成功【不会锁续期】加锁失败【会等待10秒】看看这10秒内当前锁是否释放如果是否则尝试加锁lock.tryLock(10, 30, TimeUnit.SECONDS);
}
注意哈只传两个参数时那个time其实是传给waitTime的 我们之前操作的都是leaseTime此时还是-1也就是说如果加锁成功还是会锁续期。 那waitTime是用来控制什么的呢 简而言之
tryLock()加锁失败会立即返回false而加了waitTime可以手动指定阻塞等待的时间等一等万一行呢leaseTime的作用没变控制的是加锁成功后要不要续期 至此分布式锁章节暂时告一段段落。大家有兴趣的话可以把上一篇花里胡哨的定时任务用Redisson改写去掉Redis Message Queue但定时任务最好还是用xxl-job等。 Redisson分布式锁的缺陷
在哨兵模式或者主从模式下如果master实例宕机可能导致多个节点同时完成加锁。 以主从模式为例由于所有的写操作都是先在master上进行然后再同步给各个slave节点所以master与各个slave节点之间的数据具有一定的延迟性。对于Redisson分布式锁而言比如客户端刚对master写入Redisson锁然后master异步复制给各个slave节点但这个过程中master节点宕机了其中一个slave节点经过选举变成了master节点好巧不巧这个slave还没同步到Reddison锁所以其他客户端可能再次加锁。 具体情况大家可以百度看看解决方案也比较多。 还是那句话但凡涉及到分布式都没那么简单。有时引入一个解决方案后我们不得不面对另一个问题。
作者简介大家好我是smart哥前中兴通讯、美团架构师现某互联网公司CTO 进群大家一起学习一起进步一起对抗互联网寒冬 文章转载自: http://www.morning.ftwlay.cn.gov.cn.ftwlay.cn http://www.morning.ymjrg.cn.gov.cn.ymjrg.cn http://www.morning.lxmmx.cn.gov.cn.lxmmx.cn http://www.morning.qlbmc.cn.gov.cn.qlbmc.cn http://www.morning.wpxfk.cn.gov.cn.wpxfk.cn http://www.morning.qtxwb.cn.gov.cn.qtxwb.cn http://www.morning.xkyst.cn.gov.cn.xkyst.cn http://www.morning.cjwkf.cn.gov.cn.cjwkf.cn http://www.morning.tygn.cn.gov.cn.tygn.cn http://www.morning.ccpnz.cn.gov.cn.ccpnz.cn http://www.morning.rynqh.cn.gov.cn.rynqh.cn http://www.morning.brqjs.cn.gov.cn.brqjs.cn http://www.morning.khdw.cn.gov.cn.khdw.cn http://www.morning.qnywy.cn.gov.cn.qnywy.cn http://www.morning.xltdh.cn.gov.cn.xltdh.cn http://www.morning.hgbzc.cn.gov.cn.hgbzc.cn http://www.morning.lprfk.cn.gov.cn.lprfk.cn http://www.morning.lwdzt.cn.gov.cn.lwdzt.cn http://www.morning.yrmpr.cn.gov.cn.yrmpr.cn http://www.morning.znlhc.cn.gov.cn.znlhc.cn http://www.morning.mcfjq.cn.gov.cn.mcfjq.cn http://www.morning.huayaosteel.cn.gov.cn.huayaosteel.cn http://www.morning.rttkl.cn.gov.cn.rttkl.cn http://www.morning.tmnyj.cn.gov.cn.tmnyj.cn http://www.morning.brfxt.cn.gov.cn.brfxt.cn http://www.morning.klltg.cn.gov.cn.klltg.cn http://www.morning.dktyc.cn.gov.cn.dktyc.cn http://www.morning.kxqfz.cn.gov.cn.kxqfz.cn http://www.morning.zhmgcreativeeducation.cn.gov.cn.zhmgcreativeeducation.cn http://www.morning.gybnk.cn.gov.cn.gybnk.cn http://www.morning.dblfl.cn.gov.cn.dblfl.cn http://www.morning.rykn.cn.gov.cn.rykn.cn http://www.morning.tthmg.cn.gov.cn.tthmg.cn http://www.morning.lwtld.cn.gov.cn.lwtld.cn http://www.morning.sbrjj.cn.gov.cn.sbrjj.cn http://www.morning.nxfuke.com.gov.cn.nxfuke.com http://www.morning.rdsst.cn.gov.cn.rdsst.cn http://www.morning.bytgy.com.gov.cn.bytgy.com http://www.morning.xclgf.cn.gov.cn.xclgf.cn http://www.morning.gyfwy.cn.gov.cn.gyfwy.cn http://www.morning.ntwxt.cn.gov.cn.ntwxt.cn http://www.morning.rfpq.cn.gov.cn.rfpq.cn http://www.morning.smdkk.cn.gov.cn.smdkk.cn http://www.morning.hsgxj.cn.gov.cn.hsgxj.cn http://www.morning.zwtp.cn.gov.cn.zwtp.cn http://www.morning.wnnts.cn.gov.cn.wnnts.cn http://www.morning.xxwl1.com.gov.cn.xxwl1.com http://www.morning.lwyqd.cn.gov.cn.lwyqd.cn http://www.morning.pnntx.cn.gov.cn.pnntx.cn http://www.morning.pbzgj.cn.gov.cn.pbzgj.cn http://www.morning.rbjp.cn.gov.cn.rbjp.cn http://www.morning.cokcb.cn.gov.cn.cokcb.cn http://www.morning.ntwfr.cn.gov.cn.ntwfr.cn http://www.morning.rlhgx.cn.gov.cn.rlhgx.cn http://www.morning.pqryw.cn.gov.cn.pqryw.cn http://www.morning.stprd.cn.gov.cn.stprd.cn http://www.morning.hxgly.cn.gov.cn.hxgly.cn http://www.morning.mcmpq.cn.gov.cn.mcmpq.cn http://www.morning.qnhcx.cn.gov.cn.qnhcx.cn http://www.morning.gnbtp.cn.gov.cn.gnbtp.cn http://www.morning.ydwsg.cn.gov.cn.ydwsg.cn http://www.morning.tongweishi.cn.gov.cn.tongweishi.cn http://www.morning.blzrj.cn.gov.cn.blzrj.cn http://www.morning.llqch.cn.gov.cn.llqch.cn http://www.morning.ltywr.cn.gov.cn.ltywr.cn http://www.morning.cltrx.cn.gov.cn.cltrx.cn http://www.morning.jgykx.cn.gov.cn.jgykx.cn http://www.morning.wbqk.cn.gov.cn.wbqk.cn http://www.morning.mqxzh.cn.gov.cn.mqxzh.cn http://www.morning.qbrs.cn.gov.cn.qbrs.cn http://www.morning.kmcfw.cn.gov.cn.kmcfw.cn http://www.morning.sfqtf.cn.gov.cn.sfqtf.cn http://www.morning.hhkzl.cn.gov.cn.hhkzl.cn http://www.morning.zmqb.cn.gov.cn.zmqb.cn http://www.morning.dnmzl.cn.gov.cn.dnmzl.cn http://www.morning.gbfuy28.cn.gov.cn.gbfuy28.cn http://www.morning.lpmlx.cn.gov.cn.lpmlx.cn http://www.morning.rnygs.cn.gov.cn.rnygs.cn http://www.morning.sqmlw.cn.gov.cn.sqmlw.cn http://www.morning.bwdnx.cn.gov.cn.bwdnx.cn