实际项目中用到的分布式锁
1.通过Jedis客户端实现的Redis分布式锁
-
定义一个注解,在需要加锁的方法上使用;
@Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RedisLock { String value(); //锁的资源,redis的key int expireSecs() default 3; //持锁时间,单位秒 int sleepMills() default 100; //重试的间隔时间,isRetry设置false忽略此项 boolean isRetry() default true; //是否重试 }
-
使用了注解后会进行的操作,也就是进行加锁与解锁;
@Aspect @Component public class DistributedLock { private static final String UNLOCKLUA = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end"; @Resource(name = "redisStringTemplate") private RedisTemplate<String, String> redisTemplate; @Around("@annotation(com.lcb.soa.manager.aspect.lock.RedisLock)") public Object doPoint(ProceedingJoinPoint joinPoint) throws Throwable { Method method = joinPoint.getTarget().getClass().getDeclaredMethod(joinPoint.getSignature().getName(), ((MethodSignature) joinPoint.getSignature()).getMethod().getParameterTypes()); RedisLock lock = method.getAnnotation(RedisLock.class); String value = UUID.randomUUID().toString(); ValueOperations<String, String> ops = redisTemplate.opsForValue(); //异步执行redis的回调函数,进行加锁操作 boolean flag = redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> { JedisCommands commands = (JedisCommands) redisConnection.getNativeConnection(); String result = commands.set(lock.value(), value, "NX", "EX", lock.expireSecs()); //result为空表示加锁失败,循环内进行重试 while (lock.isRetry() && StringUtils.isBlank(result)) { try { Thread.sleep(lock.sleepMills()); } catch (InterruptedException e) { } result = commands.set(lock.value(), value, "NX", "EX", lock.expireSecs()); } return StringUtils.isNotBlank(result); }); if (flag) { try { //执行目标方法 return joinPoint.proceed(); } finally { //目标方法执行完,finally块中执行解锁操作 redisTemplate.execute((RedisCallback<Long>) redisConnection-> { //判断是否为redis集群 if (redisConnection.getNativeConnection() instanceof JedisCluster) { JedisCluster jedisCluster = (JedisCluster) redisConnection.getNativeConnection(); jedisCluster.eval(UNLOCKLUA, Arrays.asList(lock.value()), Arrays.asList(value)); } else { Jedis jedis = (Jedis) redisConnection.getNativeConnection(); jedis.eval(UNLOCKLUA, Arrays.asList(lock.value()), Arrays.asList(value)); } if (StringUtils.isNotBlank(ops.get(lock.value()))){ log.warn("lock release failure, manual processing, please."); } return 0L; }); } } return null; } }
-
在需要加锁的方法上使用。
获取微信小程序二维码接口,加Redis分布式锁,保证共享资源在某一时刻的独占性。
@RedisLock("com.lcb.soa.manager.service.impl.base.BaseInfoService.getQrCode") public String getQrCode(MicroQrCodeRequest request) { ...... }
2.通过Redisson客户端实现Redis分布式锁
-
分布式锁回调接口
/** * 分布式锁回调接口 * * @param <T> */ public interface DistributedLockCallback<T> { /** * 需要加分布式锁的业务代码块 * @return */ T process(); /** * 获取分布式锁名称 * @return */ String getLockName(); }
-
定义分布式锁模板接口
/** * 分布式锁操作模板 * */ public interface DistributedLockTemplate { /** * 使用分布式锁,使用锁默认超时时间。 * @param callback * @return */ <T> T lock(DistributedLockCallback<T> callback); /** * 自定义锁的超时时间 * @param callback * @param leaseTime * @param timeUnit * @return */ <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit); }
-
单例的分布式锁模板
/** * Single Instance mode 分布式锁模板 * */ public class SingleDistributedLockTemplate implements DistributedLockTemplate { private static final long DEFAULT_TIMEOUT = 5; private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS; private RedissonClient redisson; public SingleDistributedLockTemplate(String server, String port) { Config config = new Config(); config.useSingleServer().setAddress(server + ":" + port); redisson = Redisson.create(config); } @Override public <T> T lock(DistributedLockCallback<T> callback) { return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT); } @Override public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit) { RLock lock = null; try { lock = redisson.getLock(callback.getLockName()); lock.lock(leaseTime, timeUnit); return callback.process(); } finally { if (lock != null) { lock.unlock(); } } } public void destroy() { if(redisson != null && !redisson.isShutdown()){ redisson.shutdown(); } } }
-
在需要加锁的时候,调用接口中的加锁方法,并实现回调接口。
获取访问权限的token,需要保证共享资源token的独占性,加分布式锁。
@Autowired private IAccessTokenService accessTokenService; @Autowired private DistributedLockTemplate dLockTemplate; @Override public AccessToken getAccessToken() { AccessToken accessToken = accessTokenService.getAccessTokenFromRemote(); if (accessToken != null) { return accessToken; } return dLockTemplate.lock(new DistributedLockCallback<AccessToken>() { @Override public AccessToken process() { return accessTokenService.getAccessTokenFromRemote(); } @Override public String getLockName() { return "com.lcb.soa.wechat.service.IAccessTokenService.getAccessTokenFromRemote()"; } }); }
3.数据库乐观锁实现分布式锁
创建订单时,对商品库存的操作,采用数据库乐观锁实现了分布式锁。
shop_inventory表:
列名 | 描述 |
---|---|
quantity | 库存的原始数量 |
quantity_freezed | 被冻结的库存数量 |
quantity_used | 成功下单后被使用的库存数量 |
quantity_actual | 剩余的真实的库存数量 |
shop_inventory_operation表:
列名 | 描述 |
---|---|
inventory_id | 库存表主键id |
operate_type | 操作类型:0:冻结,1:解冻,2:减库存,3:还库存 |
quantity | 一个订单锁定的库存数量,目前都为1 |
流程描述:每种操作类型,operation表都增加一条记录。创建订单,冻结库存;支付完成,减库存;未支付取消订单,解冻库存;支付完成后取消订单,还库存。
创建订单时,对库存还没有进行过任何操作,所以只需要判断当前剩余的库存量大于0即可创建成功;
对乐观锁的实现,主要是对创建成功后这条记录的判断。支付完成需要减库存,首先判断有一条冻结的操作记录;未支付取消订单需要解冻库存,判断有一条冻结的操作记录;支付完成后取消订单需要还库存,需要判断有一条减库存的操作记录。
具体代码:
-
冻结库存:
public Boolean lockInventory(Long orderId,java.util.Date effectDate, Integer storeId, Integer quantity ) { Boolean retValue = true; Inventory inventory = getInventory(effectDate,dateyMd,storeId); List<InventoryOperation> operations = getInventoryOperation(orderId); //有效的冻结操作记录条数 Integer lockedCountActive = 0; //减库存的 操作有效记录 Integer reduceCount = 0; for(InventoryOperation operation :operations){ //0:冻结,1:解冻,2:减库存,3:还库存 if(operation.getOperateType()==0){ lockedCountActive =lockedCountActive +1; } if(operation.getOperateType() ==2){ reduceCount = reduceCount + 1; } } //被锁定的操作记录数为0 if(lockedCountActive ==0){ //实际库存有余量大于0 if(inventory.getQuantityActual()>0){ //如果有减库存的操作,则不允许冻结(已减过库存的订单不允许再冻结库存) if(reduceCount = 0){ ...... //operation表添加冻结操作记录 } }else { retValue = false; } } return retValue; }
-
解冻库存:
主要的判断逻辑 lockedCountActive==1,
public Boolean unLockInventory(Long orderId,java.util.Date effectDate, Integer storeId, Integer quantity) { Boolean retValue =true; Inventory inventory = getInventory(effectDate,dateyMd,storeId); List<InventoryOperation> operations = getInventoryOperation(orderId); //有效的冻结操作记录条数 Integer lockedCountActive = 0; //有效的解冻操作记录条数 Integer unLockedCountActive = 0; //减库存操作记录条数 Integer reduceCount =0; for(InventoryOperation operation :operations){ //0:冻结,1:解冻,2:减库存,3:还库存 if(operation.getOperateType()==0){ lockedCountActive =lockedCountActive +1; } if(operation.getOperateType()==1){ unLockedCountActive = unLockedCountActive +1; } } //如果已冻结过库存则允许解冻 如果未解冻过则允许解冻 且未有过有效库存减少 if(lockedCountActive == 1&&unLockedCountActive == 0&& reduceCount==0){ //真实库存小于库存上限的情况下才允许解冻库存 if(inventory.getQuantityActual()< inventory.getQuantity()){ ...... //添加解冻操作记录,删除冻结操作记录 } } return retValue; }
-
减库存:
主要的判断逻辑lockedCountActive==1
public Boolean storeInventoryReduce(Long orderId,java.util.Date effectDate, Integer storeId, Integer quantity) { Boolean retValue = true; Inventory inventory = getInventoryByStoreIdDate(dateyMd,storeId); List<InventoryOperation> operations = getInventoryOperationByOrderId(orderId); //减过库存的记录数据 Integer reduceCount=0; //冻结库存 记录数 Integer lockedCountActive = 0; for(InventoryOperation inventoryOperation:operations){ //0:冻结,1:解冻,2:减库存,3:还库存 if(inventoryOperation.getOperateType()==2){ reduceCount = reduceCount+1; } if(inventoryOperation.getOperateType()==0){ lockedCountActive =lockedCountActive +1; } } //未减过库存才允许减库存,冻结过库存才允许 减库存 if(reduceCount == 0 &&lockedCountActive==1){ //可扣减库存不够了 if(inventory.getQuantity()>inventory.getQuantityUsed()){ ...... //添加减库存操作记录,删除冻结库存操作记录 }else{ retValue =false; } } return retValue; }
-
还库存:
主要的判断逻辑reduceCount==1
public Boolean storeInventoryAdd(Long orderId,Date effectDate, Integer storeId, Integer quantity) { Boolean retValue = true; Inventory inventory = getInventory(effectDate,dateyMd,storeId); List<InventoryOperation> operations = getInventoryOperation(orderId); //减过库存的记录条数 Integer reduceCount=0; //还过库存的记录条数 Integer addCount=0; for(InventoryOperation inventoryOperation:operations){ //2:减库存,3:还库存 if(inventoryOperation.getOperateType()==2){ reduceCount = reduceCount+1; } if(inventoryOperation.getOperateType()==3){ addCount =addCount+1; } } //未还过库存则允许还库存,且有过扣库存才允许还库存 if(addCount == 0&&reduceCount==1){ if(inventory.getQuantityActual()<inventory.getQuantity()){ ...... //添加还库存的操作,删除减库存的操作 } } return retValue; }