redis_分布式锁
2025-01-22 08:19:30    2.7k 字   
This post is also available in English and alternative languages.

在单服务器系统常用本地锁来避免并发带来的问题,然而当服务采用集群方式部署时,本地锁无法在多个服务器之间生效,这时候保证数据的一致性就需要分布式锁来实现。


1. 前言

Redis提供了lua脚本支持,用户可以向Redis发送lua脚本来执行自定义动作,并获取脚本的响应数据。Redis会单线程原子性执行lua脚本,并保证lua脚本在处理的过程中不会被任意其它请求打断。


2. 方式一:SETNX方式

相关命令:

命令含义备注
SETNX key value指定的key不存在时,为key设置指定的值.
设置成功,返回1;设置失败,返回0;
加锁命令
DEL key删除已存在的键。不存在的 key 会被忽略.
返回被删除key的数量
解锁命令
EXPIRE key timeout设置key的过期时间,key过期后将不再可用
设置成功返回1;当key不存在或者不能为key设置过期时间时返回0;
锁超时

加锁、解锁、锁超时的伪代码如下:

1
2
3
4
5
6
7
8
if(1 == setnx(key,value)){
expire(key,40);
try{
// 业务逻辑,do something
}finally{
del(key)
}
}

2.1. 存在的问题

2.1.1. SETNX 和 EXPIRE 非原子性

当加锁成功后(setnx方法返回1),正准备设置过期时间的代码前(执行expire方法前),由于服务器宕机、网络抖动等原因导致锁没有设置过期时间(expire方法没有执行),于是该锁就无法过期也就成了死锁;

归根结底是由于SETNXEXPIRE这两步是分开操作的单命令,不是原子性操作

解决方案:使用Lua脚本保证原子性。


2.1.2. 锁误解除

场景:

  1. 线程A成功获取到锁并且设置了30秒的过期时间,但线程A的业务逻辑执行时间超过了30秒,redis中的锁过期自动释放了(线程A并未释放锁);
  2. 线程B得到了锁;
  3. 线程A终于执行完成,于是线程A使用DEL命令释放锁,但此时锁的真正持有者是线程B,也就是说线程A释放了线程B的锁;

解决方案:在value中添加当前线程的标识,释放锁(DEL命令)时,检查key对应value是否是当前线程所持有的锁,如果不是当前线程持有的锁,就不释放。


2.1.3. 超时解锁导致并发

  1. 线程A成功获取锁并设置过期时间30秒,但线程A执行时间超过30秒,锁过期自动释放;
  2. 此时线程B获取到锁,线程A和线程B并发执行;

A、B两个线程并发执行是不被允许的,有两种解决方式:

  • 将锁的过期时间设置的足够长,确保代码逻辑在锁释放之前能够执行完成;
  • 为获取锁的线程增加守护线程,未将要过期但未释放的锁增加有效时间;

2.1.4. 集群主备切换

为了保证Redis的可用性,一般采用主从方式部署;主从数据同步有异步同步两种方式,主节点将指令记录在本地内存中,然后异步将 内存中的指令同步到从节点,从节点一边执行同步的指令流来达到和主节点一致的状态,一边向主节点反馈同步情况。

在这种集群部署方式中,当主节点挂掉时,从节点会取而代之,但客户端是没有感知的;

当客户端A(线程A)成功加锁,指令在主节点的内存中,还没来得及同步给从节点;此时主节点挂掉,从节点提升为新的主节点,这个新主节点中没有锁的数据,造成了锁数据丢失,其他客户端再去加锁就会成功。


使用SETNX组合其他命令的方式能够实现分布式锁,但或多或少存在一些问题,而且实现复杂度比较高。


3. 方式二:redisson

pom依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.5</version>
</dependency>

3.1. 伪代码

碍于排版,完整示例代码在下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RLock redissonClientLock = redissonClient.getLock(KEY);
boolean isLock = Boolean.FALSE;
try {
// 500ms拿不到锁, 就认为获取锁失败;5000ms即5s是锁失效时间。
isLock = redissonClientLock.tryLock(500, 5000, TimeUnit.MILLISECONDS);
if (isLock) {
// 模拟支付
log.info("方法一正在支付");
TimeUnit.SECONDS.sleep(4);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 没有获取到锁就不用释放
if (isLock) {
log.info("方法一解锁");
redissonClientLock.unlock();
}
}

运行期间,redis中保存的数据如下图:

image-20220824175524360

3.2. 唯一key

观察上图会发现,有两个key,一个是左侧的key,另一个是value对象中的key;

  • 左侧的key(9527)

    对于redis而言,它就是一个普通的redis#key,通过这个key取出value,然后根据value中的内容判断是否能获取锁;

  • value对象中的key(ef4e671f-30f2-44d0-9291-aecb9fc1390c:108)

    这个key是分布式锁中需要确保唯一性的;

    redssion通过uuid + threadId组合方式来保证唯一性,源码位于:org.redisson.RedissonLock#getLockName

上图中,value对象体中的value属性,是重入次数。


3.3. 获取锁

示例中通过redissonClientLock.tryLock(500, 18000, TimeUnit.MILLISECONDS)方法获取锁;

跟随源码,其底层通过lua脚本实现,源码位置:org.redisson.RedissonLock#tryLockInnerAsync

属性含义
KEYS[1]分布式锁的key
ARGV[1]锁的失效时间
ARGV[2]value中的唯一值,uuid + threadId
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 向所有redis实例发送如下指令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 分布式锁的KEY不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 执行hset命令(hset REDLOCK_KEY uuid+threadId 1)
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 设置过期时间(毫秒单位)
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 重入次数加1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 再次设置过期时间(毫秒单位)
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 分布式锁存在但value不匹配,说明锁被其他线程持有,当前线程无法获取锁,返回锁的过期时间(秒单位)
"return redis.call('pttl', KEYS[1]);",
/*
getName(): KEYS[1] 分布式锁的key
internalLockLeaseTime:ARGV[1] 锁的失效时间
getLockName(threadId):ARGV[2] value中的唯一值(uuid + threadId)
*/
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

大致流程:

方式一的加锁过程(图片来源:https://blog.csdn.net/w372426096/article/details/103761286)

3.4. 释放锁

示例中通过redissonClientLock.unlock()方法释放锁;

跟随源码,其底层通过lua脚本实现,源码位置:org.redisson.RedissonLock#unlockInnerAsync

属性含义
KEYS[1]分布式锁的key
KEYS[2]分布式锁解锁消息频道
ARGV[1]redisson定义0表示解锁消息
ARGV[2]分布式锁的过期时间
ARGV[3]value中的唯一值,uuid + threadId
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 向所有redis实例发送如下指令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 分布式锁KEY不存在直接结束
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 分布式锁存在,并且value匹配,重入次数减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果重入数量还是大于0,说明锁还被持有着
"if (counter > 0) then " +
// 设置分布式锁KEY的失效时间(毫秒单位)
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 重入数量小于等于0,说明锁被释放(超时)了。
// 删除分布式锁KEY
"redis.call('del', KEYS[1]); " +
// 广播解锁消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
/*
getName():KEYS[1]
getChannelName():KEYS[2]
LockPubSub.UNLOCK_MESSAGE:ARGV[1]
internalLockLeaseTime:ARGV[2]
getLockName(threadId):ARGV[3]
*/
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

3.5. 关于广播解锁消息

Redis中提供了"发布、订阅"模式的消息机制,其中有四种角色:发布者(Pub),订阅者(Sub),Channel,message。

和MQ类似,此处不展开;

加锁场景:如果线程没有获取到分布式锁,就会订阅指定的channel,当收到channel中的通知后,再去尝试竞争分布式锁。

解锁场景:当分布式锁被彻底释放后,会向指定channel发布消息,通知在等待的线程可以竞争分布式锁。


3.6. 完整示例代码:

创建两个controller接口,模拟两个用户操作账户,同时只允许一个用户进行操作,两个接口使用同一个业务key(用于分布式锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Slf4j
@RestController
@RequestMapping(value = "/redis/example/01")
public class OrdinaryDistributedLockController {

public static final String KEY = "9527";

@Resource
RedissonClient redissonClient;

@GetMapping(value = "/lock01")
public String lock01() {
log.info("===== 进入方法一,threadId:[{}] =====", Thread.currentThread().getId());
RLock redissonClientLock = redissonClient.getLock(KEY);
boolean isLock = Boolean.FALSE;
try {
// 500ms拿不到锁, 就认为获取锁失败;5000ms即5s是锁失效时间。
isLock = redissonClientLock.tryLock(500, 18000, TimeUnit.MILLISECONDS);
if (isLock) {
// 模拟支付
log.info("方法一正在支付,threadId:[{}]", Thread.currentThread().getId());
TimeUnit.SECONDS.sleep(15);
} else {
log.info("方法一没有在指定时间获取到锁,threadId:[{}]", Thread.currentThread().getId());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 没有获取到锁就不用释放
if (isLock) {
log.info("方法一解锁,threadId:[{}]", Thread.currentThread().getId());
redissonClientLock.unlock();
}
}
return "方法一执行完了,threadId:[" + Thread.currentThread().getId() + "]";
}

@GetMapping(value = "/lock02")
public String lock02() {
log.info("===== 进入方法二,threadId:[{}] =====", Thread.currentThread().getId());
RLock redissonClientLock = redissonClient.getLock(KEY);
boolean isLock = Boolean.FALSE;
try {
// 500ms拿不到锁, 就认为获取锁失败;5000ms即5s是锁失效时间。
isLock = redissonClientLock.tryLock(500, 18000, TimeUnit.MILLISECONDS);
if (isLock) {
// 模拟支付
log.info("方法二正在支付,threadId:[{}]", Thread.currentThread().getId());
TimeUnit.SECONDS.sleep(15);
} else {
log.info("方法二没有在指定时间获取到锁,threadId:[{}]", Thread.currentThread().getId());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 没有获取到锁就不用释放
if (isLock) {
log.info("方法二解锁,threadId:[{}]", Thread.currentThread().getId());
redissonClientLock.unlock();
}
}
return "方法二执行完了,threadId:[" + Thread.currentThread().getId() + "]";
}
}

4. Reference