Redisson 源码分析 —— 可靠分布式锁 RedLock

2020-12-05 · 芋道源码 · 转载 · · 本文共 3,079个字,预计阅读需要 11分钟。

转载于【芋道源码

1. 概述

《精尽 Redisson 源码分析 —— 可重入分布式锁 ReentrantLock》 中,艿艿臭长臭长的分享了 Redisson 是如何实现可重入的 ReentrantLock 锁,一切看起来很完美,我们能够正确的加锁,也能正确的释放锁。但是,我们来看一个 Redis 主从结构下的示例,Redis 分布式锁是如何失效的:

  • 1、客户端 A 从 Redis Master 获得到锁 anylock
  • 2、在 Redis Master 同步 anylock 到 Redis Slave 之前,Master 挂了。
  • 3、Redis Slave 晋升为新的 Redis Master 。
  • 4、客户端 B 从新的 Redis Master 获得到锁 anylock

此时,客户端 A 和 B 同时持有 anylock 锁,已经失效。当然,这个情况是极小概率事件。主要看胖友业务对分布式锁可靠性的诉求。

在 Redis 分布式锁存在失效的问题,Redis 的作者 Antirez 大神提出了红锁 RedLock 的想法。我们来看看它的描述。

FROM 《Redis 文档 —— Redis 分布式锁》

在 Redis 的分布式环境中,我们假设有 N 个 Redis master 。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在 Redis 单实例下怎么安全地获取和释放锁。我们确保将在每 N 个实例上使用此方法获取和释放锁。在这个样例中,我们假设有 5 个Redis master 节点,这是一个比较合理的设置,所以我们需要在 5 台机器上面或者 5 台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:

  • 1、获取当前 Unix 时间,以毫秒为单位。
  • 2、依次尝试从 N 个实例,使用相同的 key 和随机值获取锁。在步骤 2 ,当向 Redis 设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个 Redis 实例。
  • 3、客户端使用当前时间减去开始获取锁时间(步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  • 4、如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
  • 5、如果因为某些原因,获取锁失败(没有在至少 N/2 + 1 个 Redis 实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。

释放锁:

  • 1、释放锁比较简单,向所有的 Redis 实例发送释放锁命令即可,不用关心之前有没有从Redis实例成功获取到锁.

可能一看内容这么长,略微有点懵逼。重点理解,需要至少在 N/2 + 1 Redis 节点获得锁成功。这样,即使出现某个 Redis Master 未同步锁信息到 Redis Slave 节点之前,突然挂了,也不容易出现多个客户端获得相同锁,因为需要至少在 N/2 + 1 Redis 节点获得锁成功。

当然,极端情况下也有。我们以 3 个 Redis Master 节点举例子:

  • 1、客户端 A 从 3 个 Redis Master 获得到锁 anylock
  • 2、2 个 Redis Master 同步 anylock 到 Redis Slave 之前,Master 都挂了。
  • 3、2 个 Redis Slave 晋升为新的 Redis Master 。
  • 4、客户端 B 从新的 Redis Master 获得到锁 anylock

理论来说,出现 2 个 Redis Master 都挂了,并且数据都未同步到 Redis Slave 的情况,已经是小概率的事件。当然,哈哈哈哈,我们就是可爱的“杠精”,就是要扣一扣这个边界情况。

同时,我们也可以发现,在时候用 RedLock 的时候,Redis Master 越多,集群的可靠性就越高,性能也会越低。😈 架构设计中,从来没有银弹。我们想要得到更高的可靠性,往往需要失去一定的性能。

对了,有一点要注意,N 个 Redis Master 要毫无关联的。例如说,任一一个 Redis Master 都不能在同一个 Redis Cluster 中。再如下图,就是一个符合条件的:

FROM 《慢谈 Redis 实现分布式锁 以及 Redisson 源码解析》

Redis Master 示例

当然,推荐 N 是奇数个,因为 N / 2 + 1 嘛,哈哈。

推荐胖友阅读如下三篇文章,更进一步了解 RedLock :

2. RedissonRedLock

org.redisson.RedissonRedLock ,继承自联锁 RedissonMultiLock ,Redisson 对 RedLock 的实现类。代码如下:

  1. // RedissonRedLock.java
  2. public class RedissonRedLock extends RedissonMultiLock {
  3. /**
  4. * Creates instance with multiple {@link RLock} objects.
  5. * Each RLock object could be created by own Redisson instance.
  6. *
  7. * @param locks - array of locks
  8. */
  9. public RedissonRedLock(RLock... locks) {
  10. super(locks);
  11. }
  12. @Override
  13. protected int failedLocksLimit() {
  14. return locks.size() - minLocksAmount(locks);
  15. }
  16. protected int minLocksAmount(final List<RLock> locks) {
  17. return locks.size()/2 + 1;
  18. }
  19. @Override
  20. protected long calcLockWaitTime(long remainTime) {
  21. return Math.max(remainTime / locks.size(), 1);
  22. }
  23. @Override
  24. public void unlock() {
  25. unlockInner(locks);
  26. }
  27. }
  • RedissonMultiLock ,联锁,正如其名字“”,可以将多个 RLock 锁关联成一个联锁。使用示例如下:

    1. RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
    2. // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
    3. lock.lock(10, TimeUnit.SECONDS);
    4. // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
    5. boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
    6. ...
    7. lock.unlock();
  • RedissonRedLock ,是一个特殊的联锁,加锁时无需所有的 RLock 都成功,只需要满足 N / 2 + 1 个 RLock 即可。使用示例如下:

    1. RLock lock1 = redissonInstance1.getLock("lock1");
    2. RLock lock2 = redissonInstance2.getLock("lock2");
    3. RLock lock3 = redissonInstance3.getLock("lock3");
    4. RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
    5. // 同时加锁:lock1 lock2 lock3
    6. // 红锁在大部分节点上加锁成功就算成功。
    7. lock.lock();
    8. ...
    9. lock.unlock();
  • RedissonRedLock 构造方法,创建时,传入多个 RLock 对象。一般来说,每个 RLock 对应到一个 Redis Master 节点上。

  • #failedLocksLimit() 方法,允许失败加锁的数量。从 #minLocksAmount(final List<RLock> locks) 方法上,我们已经看到 N / 2 + 1 的要求。

  • #calcLockWaitTime(long remainTime) 方法,计算每次获得 RLock 的锁的等待时长。目前的计算规则是,总的等待时间 remainTime 平均分配到每个 RLock 上。

  • #unlock() 方法,解锁时,需要所有 RLock 都进行解锁。

3. RedissonMultiLock

org.redisson.RedissonMultiLock ,实现 RLock 接口,Redisson 对联锁 MultiLock 的实现类。

3.1 构造方法

  1. // RedissonMultiLock.java
  2. /**
  3. * RLock 数组
  4. */
  5. final List<RLock> locks = new ArrayList<>();
  6. /**
  7. * Creates instance with multiple {@link RLock} objects.
  8. * Each RLock object could be created by own Redisson instance.
  9. *
  10. * @param locks - array of locks
  11. */
  12. public RedissonMultiLock(RLock... locks) {
  13. if (locks.length == 0) {
  14. throw new IllegalArgumentException("Lock objects are not defined");
  15. }
  16. this.locks.addAll(Arrays.asList(locks));
  17. }

3.2 failedLocksLimit

#failedLocksLimit() 方法,允许失败加锁的数量。代码如下:

  1. // RedissonMultiLock.java
  2. protected int failedLocksLimit() {
  3. return 0;
  4. }

默认返回值是 0 ,也就是必须所有 RLock 都加锁成功。

在 RedissonRedLock 中,会重写该方法。

  1. // RedissonRedLock.java
  2. @Override
  3. protected int failedLocksLimit() {
  4. return locks.size() - minLocksAmount(locks);
  5. }
  6. protected int minLocksAmount(final List<RLock> locks) {
  7. return locks.size() / 2 + 1;
  8. }

3.3 calcLockWaitTime

#failedLocksLimit(long remainTime) 方法,计算每次获得 RLock 的锁的等待时长。代码如下:

  1. // RedissonMultiLock.java
  2. protected long calcLockWaitTime(long remainTime) {
  3. return remainTime;
  4. }

默认直接返回 remainTime ,也就是说,每次获得 RLock 的锁的等待时长都是 remainTime

在 RedissonRedLock 中,会重写该方法。

3.4 tryLock

#tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法,同步加锁,并返回加锁是否成功。代码如下:

  1. // RedissonMultiLock.java
  2. @Override
  3. public boolean tryLock() {
  4. try {
  5. // 同步获得锁
  6. return tryLock(-1, -1, null);
  7. } catch (InterruptedException e) {
  8. Thread.currentThread().interrupt();
  9. return false;
  10. }
  11. }
  12. @Override
  13. public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
  14. return tryLock(waitTime, -1, unit);
  15. }
  16. @Override
  17. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  18. // 计算新的锁的时长 newLeaseTime
  19. long newLeaseTime = -1;
  20. if (leaseTime != -1) {
  21. if (waitTime == -1) { // 如果无限等待,则直接使用 leaseTime 即可。
  22. newLeaseTime = unit.toMillis(leaseTime);
  23. } else { // 如果设置了等待时长,则为等待时间 waitTime * 2 。不知道为什么要 * 2 ?例如说,先获得到了第一个锁,然后在获得第二个锁的时候,阻塞等待了 waitTime ,那么可能第一个锁就已经自动过期,所以 * 2 避免这个情况。
  24. newLeaseTime = unit.toMillis(waitTime) * 2;
  25. }
  26. }
  27. long time = System.currentTimeMillis();
  28. // 计算剩余等待锁的时间 remainTime
  29. long remainTime = -1;
  30. if (waitTime != -1) {
  31. remainTime = unit.toMillis(waitTime);
  32. }
  33. // 计算每个锁的等待时间
  34. long lockWaitTime = calcLockWaitTime(remainTime);
  35. // 允许获得锁失败的次数
  36. int failedLocksLimit = failedLocksLimit();
  37. // 已经获得到锁的数组
  38. List<RLock> acquiredLocks = new ArrayList<>(locks.size());
  39. // 遍历 RLock 数组,逐个获得锁
  40. for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
  41. // 当前 RLock
  42. RLock lock = iterator.next();
  43. boolean lockAcquired; // 标记是否获得到锁
  44. try {
  45. // 如果等待时间 waitTime 为 -1(不限制),并且锁时长为 -1(不限制),则使用 #tryLock() 方法。
  46. if (waitTime == -1 && leaseTime == -1) {
  47. lockAcquired = lock.tryLock();
  48. // 如果任一不为 -1 时,则计算新的等待时间 awaitTime ,然后调用 #tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法。
  49. } else {
  50. long awaitTime = Math.min(lockWaitTime, remainTime);
  51. lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
  52. }
  53. } catch (RedisResponseTimeoutException e) {
  54. // 发生响应超时。因为无法确定实际是否获得到锁,所以直接释放当前 RLock
  55. unlockInner(Collections.singletonList(lock));
  56. // 标记未获得锁
  57. lockAcquired = false;
  58. } catch (Exception e) {
  59. // 标记未获得锁
  60. lockAcquired = false;
  61. }
  62. // 如果获得成功,则添加到 acquiredLocks 数组中
  63. if (lockAcquired) {
  64. acquiredLocks.add(lock);
  65. } else {
  66. // 如果已经到达最少需要获得锁的数量,则直接 break 。例如说,RedLock 只需要获得 N / 2 + 1 把。
  67. if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
  68. break;
  69. }
  70. // 当已经没有允许失败的数量,则进行相应的处理
  71. if (failedLocksLimit == 0) {
  72. // 释放所有的锁
  73. unlockInner(acquiredLocks);
  74. // 如果未设置阻塞时间,直接返回 false ,表示失败。因为是 tryLock ,只是尝试加锁一次,不会无限重试。
  75. if (waitTime == -1) {
  76. return false;
  77. }
  78. // 重置整个获得锁的过程,在剩余的时间里,重新来一遍
  79. // 重置 failedLocksLimit 变量
  80. failedLocksLimit = failedLocksLimit();
  81. // 重置 acquiredLocks 为空
  82. acquiredLocks.clear();
  83. // reset iterator
  84. // 重置 iterator 设置回迭代器的头
  85. while (iterator.hasPrevious()) {
  86. iterator.previous();
  87. }
  88. // failedLocksLimit 减一
  89. } else {
  90. failedLocksLimit--;
  91. }
  92. }
  93. // 计算剩余时间 remainTime
  94. if (remainTime != -1) {
  95. remainTime -= System.currentTimeMillis() - time;
  96. // 记录新的当前时间
  97. time = System.currentTimeMillis();
  98. // 如果没有剩余时间,意味着已经超时,释放所有加载成功的锁,并返回 false
  99. if (remainTime <= 0) {
  100. unlockInner(acquiredLocks);
  101. return false;
  102. }
  103. }
  104. }
  105. // 如果设置了锁的过期时间 leaseTime ,则重新设置每个锁的过期时间
  106. if (leaseTime != -1) {
  107. // 遍历 acquiredLocks 数组,创建异步设置过期时间的 Future
  108. List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
  109. for (RLock rLoc acquiredLocks) {
  110. RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
  111. futures.add(future);
  112. }
  113. // 阻塞等待所有 futures 完成
  114. for (RFuture<Boolean> rFutur futures) {
  115. rFuture.syncUninterruptibly();
  116. }
  117. }
  118. // 返回 true ,表示加锁成功
  119. return true;
  120. }
  • 超 100 行代码,是不是有点慌?!核心逻辑是,首先从 locks 数组中逐个获得锁(第 27 至 94 行),然后统一设置每个锁的过期时间(第 96 至 10 9 行)。当然,还是有很多细节,我们一点点来看。
  • 第 3 至 11 行:计算新的锁的时长 newLeaseTime 。注意,newLeaseTime 是用于遍历 locks 数组来获得锁设置的锁时长,最终在第 96 至 109 行的代码中,会设置真正的 leaseTime 锁的时长。整个的计算规则,看下艿艿添加的注释。
  • 第 14 至 18 行:计算剩余等待锁的时间 remainTime
  • 第 20 行:调用 #calcLockWaitTime(long remainTime) 方法,计算获得每个锁的等待时间。在 「2. RedissonRedLock」 中,我们已经看到,是 remainTime 进行平均分配。
  • 第 25 行:已经获得到锁的数组 acquiredLocks
  • 下面,我们分成阶段一(加锁)和阶段二(设置锁过期时间)来抽丝剥茧。
  • 【阶段一】第 26 至 94 行:遍历 RLock 数组,逐个获得锁。
  • 第 32 至 39 行:根据条件,调用对应的 RLock#tryLock(...) 方法,获得锁。一般情况下,RedissonRedLock 搭配 RedissonLock 使用,这块我们已经在 《精尽 Redisson 源码分析 —— 可重入分布式锁 ReentrantLock》 有个详细的解析了。
  • 第 40 至 44 行:如果发生响应 RedisResponseTimeoutException 超时异常时,因为无法确定实际是否获得到锁,所以直接调用 #unlockInner(Collection<RLock> locks) 方法,释放当前 RLock 。
  • 第 50 至 52 行:如果获得成功,则添加到 acquiredLocks 数组中。
  • 【重要】第 54 至 57 行:如果已经到达最少需要获得锁的数量,则直接 break 。例如说,RedLock 只需要获得 N / 2 + 1 把。
  • 第 77 至 80 行:failedLocksLimit 减一。
  • 第 59 至 76 行:当已经没有允许失败的数量,则进行相应的处理。
    • 第 62 行:因为已经失败了,所以调用 #unlockInner(Collection<RLock> locks) 方法,释放所有已经获得到的锁们。
    • 第 63 至 66 行:如果未设置阻塞时间,直接返回 false 加锁失败。因为是 tryLock 方法,只是尝试加锁一次,不会无限重试。
    • 第 67 至 76 行:重置整个获得锁的过程,在剩余的时间里,重新来一遍。因为已设置了阻塞时间,必须得用完!
  • 第 84 至 93 行:计算剩余时间 remainTime 。如果没有剩余的时间,意味着已经超时,则 调用 #unlockInner(Collection<RLock> locks) 方法,释放所有加载成功的锁,并返回 false 加锁失败。
  • 【阶段二】第 96 至 109 行:如果设置了锁的过期时间 leaseTime ,则重新设置每个锁的过期时间。
  • 第 98 至 103 行:遍历 acquiredLocks 数组,创建异步设置过期时间的 Future 。
  • 第 105 至 108 行:阻塞等待所有 futures 完成。如果任一一个 Future 执行失败,则会抛出异常。

3.5 tryLockAsync

#tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) 方法,异步加锁,并返回加锁是否成功。代码如下:

  1. // RedissonMultiLock.java
  2. @Override
  3. public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit) {
  4. return tryLockAsync(waitTime, leaseTime, unit, Thread.currentThread().getId());
  5. }
  6. @Override
  7. public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  8. // 创建 RPromise 对象,用于通知结果
  9. RPromise<Boolean> result = new RedissonPromise<Boolean>();
  10. // 创建 LockState 对象
  11. LockState state = new LockState(waitTime, leaseTime, unit, threadId);
  12. // <X> 发起异步加锁
  13. state.tryAcquireLockAsync(locks.listIterator(), result);
  14. // 返回结果
  15. return result;
  16. }
  • 这个的逻辑在 <X> 处,调用 LockState#tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) 方法,发起异步加锁。

🔥 LockState 是 RedissonMultiLock 的内部类,实现异步加锁的逻辑。构造方法如下:

  1. // RedissonMultiLock.LockState.java
  2. class LockState {
  3. private final long newLeaseTime;
  4. private final long lockWaitTime;
  5. private final List<RLock> acquiredLocks;
  6. private final long waitTime;
  7. private final long threadId;
  8. private final long leaseTime;
  9. private final TimeUnit unit;
  10. private long remainTime;
  11. private long time = System.currentTimeMillis();
  12. private int failedLocksLimit;
  13. LockState(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  14. this.waitTime = waitTime;
  15. this.leaseTime = leaseTime;
  16. this.unit = unit;
  17. this.threadId = threadId;
  18. // 计算新的锁的时长 newLeaseTime
  19. if (leaseTime != -1) {
  20. if (waitTime == -1) {
  21. newLeaseTime = unit.toMillis(leaseTime);
  22. } else {
  23. newLeaseTime = unit.toMillis(waitTime) * 2;
  24. }
  25. } else {
  26. newLeaseTime = -1;
  27. }
  28. // 计算剩余等待锁的时间 remainTime
  29. remainTime = -1;
  30. if (waitTime != -1) {
  31. remainTime = unit.toMillis(waitTime);
  32. }
  33. // 计算每个锁的等待时间
  34. lockWaitTime = calcLockWaitTime(remainTime);
  35. // 允许获得锁失败的次数
  36. failedLocksLimit = failedLocksLimit();
  37. // 已经获得到锁的数组
  38. acquiredLocks = new ArrayList<>(locks.size());
  39. }
  40. // ... 省略其它方法
  41. }

🔥 LockState#tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) 方法,发起异步加锁。代码如下:

整体逻辑,和 「3.4 tryLock(long waitTime, long leaseTime, TimeUnit unit)」 方法的逻辑基本一致,所以艿艿就不啰嗦详细说,而是告诉它们的对等关系。

  1. // RedissonMultiLock.LockState.java
  2. void tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) {
  3. // 如果迭代 iterator 的尾部,则重新设置每个锁的过期时间
  4. if (!iterator.hasNext()) {
  5. checkLeaseTimeAsync(result);
  6. return;
  7. }
  8. // 获得下一个 RLock 对象
  9. RLock lock = iterator.next();
  10. // 创建 RPromise 对象
  11. RPromise<Boolean> lockAcquiredFuture = new RedissonPromise<Boolean>();
  12. // 如果等待时间 waitTime 为 -1(不限制),并且锁时长为 -1(不限制),则使用 #tryLock() 方法。
  13. if (waitTime == -1 && leaseTime == -1) {
  14. lock.tryLockAsync(threadId)
  15. .onComplete(new TransferListener<Boolean>(lockAcquiredFuture));
  16. // 如果任一不为 -1 时,则计算新的等待时间 awaitTime ,然后调用 #tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法。
  17. } else {
  18. long awaitTime = Math.min(lockWaitTime, remainTime);
  19. lock.tryLockAsync(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS, threadId)
  20. .onComplete(new TransferListener<Boolean>(lockAcquiredFuture));
  21. }
  22. lockAcquiredFuture.onComplete((res, e) -> {
  23. // 如果 res 非空,设置 lockAcquired 是否获得到锁
  24. boolean lockAcquired = false;
  25. if (res != null) {
  26. lockAcquired = res;
  27. }
  28. // 发生响应超时。因为无法确定实际是否获得到锁,所以直接释放当前 RLock
  29. if (e instanceof RedisResponseTimeoutException) {
  30. unlockInnerAsync(Collections.singletonList(lock), threadId);
  31. }
  32. // 如果获得成功,则添加到 acquiredLocks 数组中
  33. if (lockAcquired) {
  34. acquiredLocks.add(lock);
  35. } else {
  36. // 如果已经到达最少需要获得锁的数量,则直接 break 。例如说,RedLock 只需要获得 N / 2 + 1 把。
  37. if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
  38. checkLeaseTimeAsync(result);
  39. return;
  40. }
  41. // 当已经没有允许失败的数量,则进行相应的处理
  42. if (failedLocksLimit == 0) {
  43. // 创建释放所有的锁的 Future
  44. unlockInnerAsync(acquiredLocks, threadId).onComplete((r, ex) -> {
  45. // 如果发生异常,则通过 result 回调异常
  46. if (ex != null) {
  47. result.tryFailure(ex);
  48. return;
  49. }
  50. // 如果未设置阻塞时间,直接通知 result 失败。因为是 tryLock ,只是尝试加锁一次,不会无限重试。
  51. if (waitTime == -1) {
  52. result.trySuccess(false);
  53. return;
  54. }
  55. // 重置整个获得锁的过程,在剩余的时间里,重新来一遍
  56. // 重置 failedLocksLimit 变量
  57. failedLocksLimit = failedLocksLimit();
  58. // 重置 acquiredLocks 为空
  59. acquiredLocks.clear();
  60. // reset iterator
  61. // 重置 iterator 设置回迭代器的头
  62. while (iterator.hasPrevious()) {
  63. iterator.previous();
  64. }
  65. // 校验剩余时间是否足够
  66. checkRemainTimeAsync(iterator, result);
  67. });
  68. return;
  69. } else {
  70. failedLocksLimit--;
  71. }
  72. }
  73. // 校验剩余时间是否足够
  74. checkRemainTimeAsync(iterator, result);
  75. });
  76. }
  • 第 2 至 6 行:如果迭代 iterator 的尾部,则调用 #checkLeaseTimeAsync(RPromise<Boolean> result) 方法,重新设置每个锁的过期时间。代码如下:

    1. // RedissonMultiLock.LockState.java
    2. private void checkLeaseTimeAsync(RPromise<Boolean> result) {
    3. // 如果设置了锁的过期时间 leaseTime ,则重新设置每个锁的过期时间
    4. if (leaseTime != -1) {
    5. // 创建 AtomicInteger 计数器,用于回调逻辑的计数,从而判断是不是所有回调都执行完了
    6. AtomicInteger counter = new AtomicInteger(acquiredLocks.size());
    7. // 遍历 acquiredLocks 数组,逐个设置过期时间
    8. for (RLock rLock : acquiredLocks) {
    9. // 创建异步设置过期时间的 RFuture
    10. RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
    11. future.onComplete((res, e) -> {
    12. // 如果发生异常,则通过 result 回调异常
    13. if (e != null) {
    14. result.tryFailure(e);
    15. return;
    16. }
    17. // 如果全部成功,则通过 result 回调加锁成功
    18. if (counter.decrementAndGet() == 0) {
    19. result.trySuccess(true);
    20. }
    21. });
    22. }
    23. return;
    24. }
    25. // 如果未设置了锁的过期时间 leaseTime ,则通过 result 回调加锁成功
    26. result.trySuccess(true);
    27. }
  • 第 8 至 79 行:对标到 「3.4 tryLock(long waitTime, long leaseTime, TimeUnit unit)」 的第 28 至 81 行。

  • 第 73 和 82 行:调用 #checkRemainTimeAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) 方法,校验剩余时间是否足够。代码如下:

    1. // RedissonMultiLock.LockState.java
    2. private void checkRemainTimeAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) {
    3. // 如果设置了等待超时时间,计算剩余时间 remainTime
    4. if (remainTime != -1) {
    5. remainTime += -(System.currentTimeMillis() - time);
    6. // 记录新的当前时间
    7. time = System.currentTimeMillis();
    8. // 如果没有剩余时间,意味着已经超时,释放所有加载成功的锁
    9. if (remainTime <= 0) {
    10. // 创建释放所有已获得到锁们的 Future
    11. unlockInnerAsync(acquiredLocks, threadId).onComplete((res, e) -> {
    12. // 如果发生异常,则通过 result 回调异常
    13. if (e != null) {
    14. result.tryFailure(e);
    15. return;
    16. }
    17. // 如果全部成功,则通过 result 回调加锁成功
    18. result.trySuccess(false);
    19. });
    20. // return 返回结束
    21. return;
    22. }
    23. }
    24. // <Y> 如果未设置等待超时时间,则继续加锁下一个 RLock
    25. tryAcquireLockAsync(iterator, result);
    26. }

😈 总的来说,还是蛮简单的不是,哈哈哈哈。

3.6 lock

#lockInterruptibly(long leaseTime, TimeUnit unit) 方法,同步加锁,不返回加锁是否成功。代码如下:

  1. // RedissonMultiLock.java
  2. @Override
  3. public void lock() {
  4. try {
  5. lockInterruptibly();
  6. } catch (InterruptedException e) {
  7. Thread.currentThread().interrupt();
  8. }
  9. }
  10. @Override
  11. public void lock(long leaseTime, TimeUnit unit) {
  12. try {
  13. lockInterruptibly(leaseTime, unit);
  14. } catch (InterruptedException e) {
  15. Thread.currentThread().interrupt();
  16. }
  17. }
  18. @Override
  19. public void lockInterruptibly() throws InterruptedException {
  20. lockInterruptibly(-1, null);
  21. }
  22. 1: @Override
  23. 2: public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
  24. 3: // 计算 waitTime 时间
  25. 4: long baseWaitTime = locks.size() * 1500;
  26. 5: long waitTime = -1;
  27. 6: if (leaseTime == -1) { // 如果未设置超时时间,则直接使用 baseWaitTime
  28. 7: waitTime = baseWaitTime;
  29. 8: } else {
  30. 9: leaseTime = unit.toMillis(leaseTime);
  31. 10: waitTime = leaseTime;
  32. 11: if (waitTime <= 2000) { // 保证最小 waitTime 时间是 2000
  33. 12: waitTime = 2000;
  34. 13: } else if (waitTime <= baseWaitTime) { // 在 [waitTime / 2, waitTime) 之间随机
  35. 14: waitTime = ThreadLocalRandom.current().nextLong(waitTime / 2, waitTime);
  36. 15: } else { // 在 [baseWaitTime, waitTime) 之间随机
  37. 16: waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
  38. 17: }
  39. 18: }
  40. 19:
  41. 20: // 死循环,直到加锁成功
  42. 21: while (true) {
  43. 22: if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
  44. 23: return;
  45. 24: }
  46. 25: }
  47. 26: }

3.7 lockAsync

#lockAsync(long leaseTime, TimeUnit unit, long threadId) 方法,异步加锁,不返回加锁是否成功。代码如下:

  1. // RedissonMultiLock.java
  2. @Override
  3. public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit) {
  4. return lockAsync(leaseTime, unit, Thread.currentThread().getId());
  5. }
  6. 1: @Override
  7. 2: public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long threadId) {
  8. 3: // 计算 waitTime 时间
  9. 4: long baseWaitTime = locks.size() * 1500;
  10. 5: long waitTime;
  11. 6: if (leaseTime == -1) { // 如果未设置超时时间,则直接使用 baseWaitTime
  12. 7: waitTime = baseWaitTime;
  13. 8: } else {
  14. 9: leaseTime = unit.toMillis(leaseTime);
  15. 10: waitTime = leaseTime;
  16. 11: if (waitTime <= 2000) { // 保证最小 waitTime 时间是 2000
  17. 12: waitTime = 2000;
  18. 13: } else if (waitTime <= baseWaitTime) { // 在 [waitTime / 2, waitTime) 之间随机
  19. 14: waitTime = ThreadLocalRandom.current().nextLong(waitTime / 2, waitTime);
  20. 15: } else { // 在 [baseWaitTime, waitTime) 之间随机
  21. 16: waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
  22. 17: }
  23. 18: }
  24. 19:
  25. 20: // 创建 RPromise 对象
  26. 21: RPromise<Void> result = new RedissonPromise<Void>();
  27. 22: // 执行异步加锁
  28. 23: tryLockAsync(threadId, leaseTime, TimeUnit.MILLISECONDS, waitTime, result);
  29. 24: return result;
  30. 25: }
  • 第 3 至 18 行:计算 waitTime 时间。和 #lockInterruptibly(long leaseTime, TimeUnit unit) 方法,看到的逻辑是一致的。

  • 第 23 行:调用 #tryLockAsync(long threadId, long leaseTime, TimeUnit unit, long waitTime, RPromise<Void> result) 方法,执行异步加锁。代码如下:

    1. // RedissonMultiLock.java
    2. protected void tryLockAsync(long threadId, long leaseTime, TimeUnit unit, long waitTime, RPromise<Void> result) {
    3. // <X1> 执行异步加锁锁
    4. tryLockAsync(waitTime, leaseTime, unit, threadId).onComplete((res, e) -> {
    5. // 如果发生异常,则通过 result 回调异常
    6. if (e != null) {
    7. result.tryFailure(e);
    8. return;
    9. }
    10. // <X2> 如果加锁成功,则通知 result 成功
    11. if (res) {
    12. result.trySuccess(null);
    13. // <X3> 如果加锁失败,则递归调用 tryLockAsync 方法
    14. } else {
    15. tryLockAsync(threadId, leaseTime, unit, waitTime, result);
    16. }
    17. });
    18. }

3.8 unlock

#unlock() 方法,同步解锁。代码如下:

  1. // RedissonMultiLock.java
  2. @Override
  3. public void unlock() {
  4. // 创建 RFuture 数组
  5. List<RFuture<Void>> futures = new ArrayList<>(locks.size());
  6. // 逐个创建异步解锁 Future,并添加到 futures 数组中
  7. for (RLock lock : locks) {
  8. futures.add(lock.unlockAsync());
  9. }
  10. // 同步阻塞 futures ,全部释放完成
  11. for (RFuture<Void> future : futures) {
  12. future.syncUninterruptibly();
  13. }
  14. }

在 RedissonMultiLock 类中,存在一个 #unlockInner(Collection<RLock> locks) 方法,同步解锁指定 RLock 数组。代码如下:

  1. // RedissonMultiLock.java
  2. protected void unlockInner(Collection<RLock> locks) {
  3. // 创建 RFuture 数组
  4. List<RFuture<Void>> futures = new ArrayList<>(locks.size());
  5. // 逐个创建异步解锁 Future,并添加到 futures 数组中
  6. for (RLock lock : locks) {
  7. futures.add(lock.unlockAsync());
  8. }
  9. // 同步阻塞 futures ,全部释放完成
  10. for (RFuture<Void> unlockFuture : futures) {
  11. unlockFuture.awaitUninterruptibly();
  12. }
  13. }

在 RedissonMultiLock 类中,存在一个 #unlockInner(Collection<RLock> locks) 方法,异步解锁指定 RLock 数组。代码如下:

  1. // RedissonMultiLock.java
  2. protected RFuture<Void> unlockInnerAsync(Collection<RLock> locks, long threadId) {
  3. // 如果 locks 为空,直接返回成功的 RFuture
  4. if (locks.isEmpty()) {
  5. return RedissonPromise.newSucceededFuture(null);
  6. }
  7. // 创建 RPromise 对象
  8. RPromise<Void> result = new RedissonPromise<Void>();
  9. // 创建 AtomicInteger 计数器,用于回调逻辑的计数,从而判断是不是所有回调都执行完了
  10. AtomicInteger counter = new AtomicInteger(locks.size());
  11. // 遍历 acquiredLocks 数组,逐个解锁
  12. for (RLock lock : locks) {
  13. lock.unlockAsync(threadId).onComplete((res, e) -> {
  14. // 如果发生异常,则通过 result 回调异常
  15. if (e != null) {
  16. result.tryFailure(e);
  17. return;
  18. }
  19. // 如果全部成功,则通过 result 回调解锁成功
  20. if (counter.decrementAndGet() == 0) {
  21. result.trySuccess(null);
  22. }
  23. });
  24. }
  25. return result;
  26. }

3.9 未实现的方法

在 RedissonMultiLock 中,因为一些方法暂时没必要实现,所以就都未提供。如下:

  1. // RedissonMultiLock.java
  2. @Override
  3. public Condition newCondition() {
  4. throw new UnsupportedOperationException();
  5. }
  6. @Override
  7. public RFuture<Boolean> forceUnlockAsync() {
  8. throw new UnsupportedOperationException();
  9. }
  10. @Override
  11. public RFuture<Integer> getHoldCountAsync() {
  12. throw new UnsupportedOperationException();
  13. }
  14. @Override
  15. public String getName() {
  16. throw new UnsupportedOperationException();
  17. }
  18. @Override
  19. public boolean forceUnlock() {
  20. throw new UnsupportedOperationException();
  21. }
  22. @Override
  23. public boolean isLocked() {
  24. throw new UnsupportedOperationException();
  25. }
  26. @Override
  27. public RFuture<Boolean> isLockedAsync() {
  28. throw new UnsupportedOperationException();
  29. }
  30. @Override
  31. public boolean isHeldByThread(long threadId) {
  32. throw new UnsupportedOperationException();
  33. }
  34. @Override
  35. public boolean isHeldByCurrentThread() {
  36. throw new UnsupportedOperationException();
  37. }
  38. @Override
  39. public int getHoldCount() {
  40. throw new UnsupportedOperationException();
  41. }
  42. @Override
  43. public RFuture<Long> remainTimeToLiveAsync() {
  44. throw new UnsupportedOperationException();
  45. }
  46. @Override
  47. public long remainTimeToLive() {
  48. throw new UnsupportedOperationException();
  49. }

彩蛋

一开始,以为 RedLock 红锁的代码会比较复杂,所以在撸这块的源码时,有点懵逼。一度计划,准备花小 1 天的时间来研究和输出这篇博客。结果发现,竟然是个纸老虎,哈哈哈。