This post is also available in English and alternative languages.在单服务器系统常用本地锁来避免并发带来的问题,然而当服务采用集群方式部署时,本地锁无法在多个服务器之间生效,这时候保证数据的一致性就需要分布式锁来实现。
1. 前言
Redis提供了lua脚本支持,用户可以向Redis发送lua脚本来执行自定义动作,并获取脚本的响应数据。Redis会单线程原子性执行lua脚本,并保证lua脚本在处理的过程中不会被任意其它请求打断。
2. 方式一:SETNX方式
相关命令:
命令 | 含义 | 备注 |
---|
SETNX key value | 指定的key不存在时,为key设置指定的值. 设置成功,返回1;设置失败,返回0; | 加锁命令 |
DEL key | 删除已存在的键。不存在的 key 会被忽略. 返回被删除key的数量 | 解锁命令 |
EXPIRE key timeout | 设置key的过期时间,key过期后将不再可用 设置成功返回1;当key不存在或者不能为key设置过期时间时返回0; | 锁超时 |
加锁、解锁、锁超时的伪代码如下:
1 2 3 4 5 6 7 8
| if(1 == setnx(key,value)){ expire(key,40); try{ }finally{ del(key) } }
|
2.1. 存在的问题
2.1.1. SETNX 和 EXPIRE 非原子性
当加锁成功后(setnx方法返回1),正准备设置过期时间的代码前(执行expire方法前),由于服务器宕机、网络抖动等原因导致锁没有设置过期时间(expire方法没有执行),于是该锁就无法过期也就成了死锁;
归根结底是由于SETNX
和EXPIRE
这两步是分开操作的单命令,不是原子性操作。
解决方案:使用Lua脚本保证原子性。
2.1.2. 锁误解除
场景:
- 线程A成功获取到锁并且设置了30秒的过期时间,但线程A的业务逻辑执行时间超过了30秒,redis中的锁过期自动释放了(线程A并未释放锁);
- 线程B得到了锁;
- 线程A终于执行完成,于是线程A使用DEL命令释放锁,但此时锁的真正持有者是线程B,也就是说线程A释放了线程B的锁;
解决方案:在value中添加当前线程的标识,释放锁(DEL命令)时,检查key对应value是否是当前线程所持有的锁,如果不是当前线程持有的锁,就不释放。
2.1.3. 超时解锁导致并发
- 线程A成功获取锁并设置过期时间30秒,但线程A执行时间超过30秒,锁过期自动释放;
- 此时线程B获取到锁,线程A和线程B并发执行;
A、B两个线程并发执行是不被允许的,有两种解决方式:
- 将锁的过期时间设置的足够长,确保代码逻辑在锁释放之前能够执行完成;
- 为获取锁的线程增加守护线程,未将要过期但未释放的锁增加有效时间;
2.1.4. 集群主备切换
为了保证Redis的可用性,一般采用主从方式部署;主从数据同步有异步和同步两种方式,主节点将指令记录在本地内存中,然后异步将 内存中的指令同步到从节点,从节点一边执行同步的指令流来达到和主节点一致的状态,一边向主节点反馈同步情况。
在这种集群部署方式中,当主节点挂掉时,从节点会取而代之,但客户端是没有感知的;
当客户端A(线程A)成功加锁,指令在主节点的内存中,还没来得及同步给从节点;此时主节点挂掉,从节点提升为新的主节点,这个新主节点中没有锁的数据,造成了锁数据丢失,其他客户端再去加锁就会成功。
使用SETNX组合其他命令的方式能够实现分布式锁,但或多或少存在一些问题,而且实现复杂度比较高。
3. 方式二:redisson
pom依赖:
1 2 3 4 5
| <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.5</version> </dependency>
|
3.1. 伪代码
碍于排版,完整示例代码在下面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| RLock redissonClientLock = redissonClient.getLock(KEY); boolean isLock = Boolean.FALSE; try { isLock = redissonClientLock.tryLock(500, 5000, TimeUnit.MILLISECONDS); if (isLock) { log.info("方法一正在支付"); TimeUnit.SECONDS.sleep(4); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { if (isLock) { log.info("方法一解锁"); redissonClientLock.unlock(); } }
|
运行期间,redis中保存的数据如下图:
3.2. 唯一key
观察上图会发现,有两个key,一个是左侧的key,另一个是value对象中的key;
左侧的key(9527)
对于redis而言,它就是一个普通的redis#key,通过这个key取出value,然后根据value中的内容判断是否能获取锁;
value对象中的key(ef4e671f-30f2-44d0-9291-aecb9fc1390c:108)
这个key是分布式锁中需要确保唯一性的;
redssion通过uuid + threadId
组合方式来保证唯一性,源码位于:org.redisson.RedissonLock#getLockName
。
上图中,value对象体中的value属性,是重入次数。
3.3. 获取锁
示例中通过redissonClientLock.tryLock(500, 18000, TimeUnit.MILLISECONDS)
方法获取锁;
跟随源码,其底层通过lua脚本实现,源码位置:org.redisson.RedissonLock#tryLockInnerAsync
属性 | 含义 |
---|
KEYS[1] | 分布式锁的key |
ARGV[1] | 锁的失效时间 |
ARGV[2] | value中的唯一值,uuid + threadId |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
|
大致流程:
3.4. 释放锁
示例中通过redissonClientLock.unlock()
方法释放锁;
跟随源码,其底层通过lua脚本实现,源码位置:org.redisson.RedissonLock#unlockInnerAsync
属性 | 含义 |
---|
KEYS[1] | 分布式锁的key |
KEYS[2] | 分布式锁解锁消息频道 |
ARGV[1] | redisson定义0表示解锁消息 |
ARGV[2] | 分布式锁的过期时间 |
ARGV[3] | value中的唯一值,uuid + threadId |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
|
3.5. 关于广播解锁消息
Redis中提供了"发布、订阅"模式的消息机制,其中有四种角色:发布者(Pub),订阅者(Sub),Channel,message。
和MQ类似,此处不展开;
加锁场景:如果线程没有获取到分布式锁,就会订阅指定的channel,当收到channel中的通知后,再去尝试竞争分布式锁。
解锁场景:当分布式锁被彻底释放后,会向指定channel发布消息,通知在等待的线程可以竞争分布式锁。
3.6. 完整示例代码:
创建两个controller接口,模拟两个用户操作账户,同时只允许一个用户进行操作,两个接口使用同一个业务key(用于分布式锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Slf4j @RestController @RequestMapping(value = "/redis/example/01") public class OrdinaryDistributedLockController {
public static final String KEY = "9527";
@Resource RedissonClient redissonClient;
@GetMapping(value = "/lock01") public String lock01() { log.info("===== 进入方法一,threadId:[{}] =====", Thread.currentThread().getId()); RLock redissonClientLock = redissonClient.getLock(KEY); boolean isLock = Boolean.FALSE; try { isLock = redissonClientLock.tryLock(500, 18000, TimeUnit.MILLISECONDS); if (isLock) { log.info("方法一正在支付,threadId:[{}]", Thread.currentThread().getId()); TimeUnit.SECONDS.sleep(15); } else { log.info("方法一没有在指定时间获取到锁,threadId:[{}]", Thread.currentThread().getId()); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { if (isLock) { log.info("方法一解锁,threadId:[{}]", Thread.currentThread().getId()); redissonClientLock.unlock(); } } return "方法一执行完了,threadId:[" + Thread.currentThread().getId() + "]"; }
@GetMapping(value = "/lock02") public String lock02() { log.info("===== 进入方法二,threadId:[{}] =====", Thread.currentThread().getId()); RLock redissonClientLock = redissonClient.getLock(KEY); boolean isLock = Boolean.FALSE; try { isLock = redissonClientLock.tryLock(500, 18000, TimeUnit.MILLISECONDS); if (isLock) { log.info("方法二正在支付,threadId:[{}]", Thread.currentThread().getId()); TimeUnit.SECONDS.sleep(15); } else { log.info("方法二没有在指定时间获取到锁,threadId:[{}]", Thread.currentThread().getId()); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { if (isLock) { log.info("方法二解锁,threadId:[{}]", Thread.currentThread().getId()); redissonClientLock.unlock(); } } return "方法二执行完了,threadId:[" + Thread.currentThread().getId() + "]"; } }
|
4. Reference