Redisson分布式锁
# 1 Redisson使用示例
redission支持4种连接redis方式,分别为单机、主从、Sentinel、Cluster 集群。项目中使用的是集群模式。
@Configuration
public class RedissonConfig {
@Autowired
private RedisConfigBean redisConfigBean;
@Bean
public Redisson redisson() {
List<String> clusterNodes = new ArrayList<>();
for (Map<String, String> node : redisConfigBean.getNodesInfo()) {
//redisson版本是3.5,集群的ip前面要加上“redis://”,不然会报错,3.2版本可不加
clusterNodes.add("redis://" + node.get("ip") + ":" + node.get("port"));
}
Config config = new Config();
ClusterServersConfig clusterServersConfig = config.useClusterServers();
//添加集群节点
clusterServersConfig.addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
if (!StringUtils.isEmpty(redisConfigBean.getPassword())) {
//设置密码
clusterServersConfig.setPassword(redisConfigBean.getPassword());
}
return (Redisson) Redisson.create(config);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
@ConfigurationProperties(
prefix = "redis"
)
@RefreshScope
public class RedisConfigBean {
private Integer cacheExpireTime;
private Integer randomRange;
private String password;
private Integer timeoutInMillis;
private Integer readTimeoutInMillis;
private Integer dbIndex;
private Integer maxTotal;
private List<Map<String, String>> nodesInfo;
public RedisConfigBean() {
}
public List<RedisNode> getNodesInfoList() {
if (CollectionUtils.isEmpty(this.nodesInfo)) {
throw new RedisException("redis nodes is empty");
} else {
List<RedisNode> list = new ArrayList();
this.nodesInfo.forEach((map) -> {
list.add(new RedisNode((String) map.get("ip"), Integer.parseInt((String) map.get("port"))));
});
return list;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
redis:
maxTotal: 2048
####### 使用RedisUtil里的setEx、putListCacheWithExpireTime、expire方法时,随机增加最大值为cacheExpireTime过期时间(秒)
randomRange: 10
####### redis过期时间(秒)
cacheExpireTime: 7200
####### JedisClient连接超时时间(毫秒)
timeoutInMillis: 3000
####### JedisClient读取超时时间(毫秒)
readTimeoutInMillis: 2500
password: 89OBm#i9
nodesInfo:
- ip: 10.30.9.111
port: 6379
2
3
4
5
6
7
8
9
10
11
12
13
14
分布式锁业务应用
- 根据单号和费用类型加锁,处理预付款流水业务。
String key = "IBK_CUSTOMER:BUSINESS_NO_FEE_TYPE_CODE:" + billNO + feeTypeCode;
RLock lock = redisson.getLock(key)
。。。
2
3
- 根据业务员编码获取账号信息,锁账号信息。(防止并发更新金额,造成数据覆盖,金额错乱)
String lockKey = "IBK_CUSTOMER:FUNDS_CHANGE:" + dto.getAccountNo();
RLock accountLock = redisson.getLock(lockKey);
try {
accountLock.lock();
fundsChangeService.doAccountFundsChange(dto);
} catch (Exception e) {
log.error("金额变更失败,fundsChangeDTO:【{}】 errorMsg:【{}】", dto, e);
msgErrLogService.msgErrLogProcess(dto.getOrderNo(), JSON.toJSONString(dto), MsgTypeEnum.OUT_FUNDS_CHANGE.getCode(), e.toString());
throw new ServiceException(IbkCustomerAccountCodeEnum.IBK_CUSTOMER_ACCOUNT_OPERATE_ERROR);
} finally {
accountLock.unlock();
}
2
3
4
5
6
7
8
9
10
11
12
13
- 流水和账户余额在同一方法提交事务
@Transactional(rollbackFor = Exception.class)
public void doAccountFundsChange(FundsChangeDTO fundsChangeDTO) {
BigDecimal amount = accountMapper.queryConcurrentAmount(fundsChangeDTO.getAccountNo());
fundsChangeDTO.setLastAmount(amount);
fundsChangeDTO.setId(idGenerator.generate());
recordMapper.doFundsChangeRecord(fundsChangeDTO);
accountMapper.doFundsChange(fundsChangeDTO);
}
2
3
4
5
6
7
8
9
10
# 2 分布式锁演变过程
# 2.1 SETNX
存在问题:
1)客户端所在节点奔溃,无法正确释放锁。
2)业务逻辑异常,无法释放锁。
# 2.2 超时设置
设置超时时间,到点锁自动释放。
SETNX lock:168 1 // 获取锁(integer) 1>
EXPIRE lock:168 60 // 60s 自动删除(integer) 1
存在问题:
1)「加锁」、「设置超时」是两个命令,不是原子操作。可能出现执行了第一条命令,第二条执行失败的情况。
解决方案:
Redis 2.6.x之后,官方拓展了SET命令的参数,支持设置超时时间,并且满足原子性。
set key_name random_value nx px 30000
nx 表示只有key_name不存在才能设值成功。
px 30000 表示30秒后自动过期。
# 2.3 只能释放自己的锁
存在问题:
自己的锁可能被别人释放。
比如:
1.线程1获取锁成功并设置30秒后超时。
2.线程1由于某些原因执行很慢(网络问题、fullGC问题等...),超过30秒还没执行完,此时Redis因为锁过期自动释放了锁。
3.线程2获取锁执行自己业务。
4.线程1执行完自己业务释放锁,结果此时释放成线程2的锁。
解决方案:
加锁的时候设置一个「唯一标识」作为value,释放锁的时候用自己的唯一标识和value作比较,匹配上才能释放锁。
加锁:
set key_name random_value nx px 30000
释放锁:
if (redis.get("key_name").equals(random_value)) {
//比对成功则删除
redis.del("key_name");
}
问题: 释放锁时这种写法存在一个问题,get和del是两个操作,存在原子性问题。
可以通过Lua脚本实现原子性:
// 获取锁的 value 与 ARGV[1] 是否匹配,匹配则执行
delif redis.call("get",KEYS[1]) == ARGV[1]
then return redis.call("del",KEYS[1])
else return 0
end
# 2.4 正确设置锁超时
超时时间的设置一般为:通过多轮压测,取平均时间的3 ~ 5倍。
但即使这样仍然可能出现问题,可以通过以下方式完善超时时间设置:
给获取锁的线程添加一个守护线程,该守护线程定期检测锁的失效时间,如果锁快要失效,但是业务还没执行完,就对这个锁进行续期,重新设置超时时间。
# 2.5 实现可重入锁
通过redis hash结构实现可重入锁。
加锁:
1.加锁时先使用redis exists判断key_name这个锁是否存在。
2.如果锁不存在,使用hincrby创建一个key_name的hash表,random_value对应的value_count初始化为0再加1。
3.如果key_name存在,用hexists判断random_value这个键存不存在,如果random_value存在,value_count使用hincrby加1,否则加锁失败。
解锁:
1.不存在key_name或不存在random_value,解锁失败。
2.存在指定random_value,则使用hincrby减1,当value_count小于等于0,使用del删除这把锁。释放锁成功。
# 3 Redis分布式锁存在什么缺点?
由于redis集群同步数据的方式是异步,假设master节点获取到锁之后未完成数据同步就挂了,这个时候在新的master节点依然可以获取锁,所以多个客户端会同时获取到锁。