项目中用到的分布式锁

2019/05/01 分布式

实际项目中用到的分布式锁

1.通过Jedis客户端实现的Redis分布式锁

  • 定义一个注解,在需要加锁的方法上使用;

    @Documented
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface RedisLock {
      
        String value();   //锁的资源,redis的key
      
        int expireSecs() default 3;  //持锁时间,单位秒
      
        int sleepMills() default 100;   //重试的间隔时间,isRetry设置false忽略此项
      
        boolean isRetry() default true;     //是否重试
    }
    
  • 使用了注解后会进行的操作,也就是进行加锁与解锁;

    @Aspect
    @Component
    public class DistributedLock {
      
        private static final String UNLOCKLUA = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end";
      
        @Resource(name = "redisStringTemplate")
        private RedisTemplate<String, String> redisTemplate;
      
        @Around("@annotation(com.lcb.soa.manager.aspect.lock.RedisLock)")
        public Object doPoint(ProceedingJoinPoint joinPoint) throws Throwable {
      
            Method method = joinPoint.getTarget().getClass().getDeclaredMethod(joinPoint.getSignature().getName(),
                    ((MethodSignature) joinPoint.getSignature()).getMethod().getParameterTypes());
            RedisLock lock = method.getAnnotation(RedisLock.class);
            String value = UUID.randomUUID().toString();
            ValueOperations<String, String> ops = redisTemplate.opsForValue();
      
            //异步执行redis的回调函数,进行加锁操作
            boolean flag = redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
                JedisCommands commands = (JedisCommands) redisConnection.getNativeConnection();
                String result = commands.set(lock.value(), value, "NX", "EX", lock.expireSecs());
      
                //result为空表示加锁失败,循环内进行重试
                while (lock.isRetry() && StringUtils.isBlank(result)) {
                    try {
                        Thread.sleep(lock.sleepMills());
                    } catch (InterruptedException e) {
                    }
                    result = commands.set(lock.value(), value, "NX", "EX", lock.expireSecs());
                }
                return StringUtils.isNotBlank(result);
            });
      
            if (flag) {
                try {
                    //执行目标方法
                    return joinPoint.proceed();
                } finally {
                    //目标方法执行完,finally块中执行解锁操作
                    redisTemplate.execute((RedisCallback<Long>) redisConnection-> {
                        //判断是否为redis集群
                        if (redisConnection.getNativeConnection() instanceof JedisCluster) {
                            JedisCluster jedisCluster = (JedisCluster) redisConnection.getNativeConnection();
                            jedisCluster.eval(UNLOCKLUA, Arrays.asList(lock.value()), Arrays.asList(value));
                        } else {
                            Jedis jedis = (Jedis) redisConnection.getNativeConnection();
                            jedis.eval(UNLOCKLUA, Arrays.asList(lock.value()), Arrays.asList(value));
                        }
                        if (StringUtils.isNotBlank(ops.get(lock.value()))){
                            log.warn("lock release failure, manual processing, please.");
                        }
                        return 0L;
                    });
                }
            }
            return null;
        }
    }
    
  • 在需要加锁的方法上使用。

    获取微信小程序二维码接口,加Redis分布式锁,保证共享资源在某一时刻的独占性。

    @RedisLock("com.lcb.soa.manager.service.impl.base.BaseInfoService.getQrCode")
    public String getQrCode(MicroQrCodeRequest request) {
        ......
    }
    

2.通过Redisson客户端实现Redis分布式锁

  • 分布式锁回调接口

    /**
     * 分布式锁回调接口
     *
     * @param <T>
     */
    public interface DistributedLockCallback<T> {
       /**
        * 需要加分布式锁的业务代码块
        * @return
        */
       T process();
         
       /**
        * 获取分布式锁名称
        * @return
        */
       String getLockName();
    }
    
  • 定义分布式锁模板接口

    /**
     * 分布式锁操作模板
     *
     */
    public interface DistributedLockTemplate {
       /**
        * 使用分布式锁,使用锁默认超时时间。
        * @param callback
        * @return
        */
       <T> T lock(DistributedLockCallback<T> callback);
         
       /**
        * 自定义锁的超时时间
        * @param callback
        * @param leaseTime
        * @param timeUnit
        * @return
        */
       <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit);
    }
    
  • 单例的分布式锁模板

    /**
     * Single Instance mode 分布式锁模板
     *
     */
    public class SingleDistributedLockTemplate implements DistributedLockTemplate {
         
       private static final long     DEFAULT_TIMEOUT   = 5;
        private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
          
        private RedissonClient redisson;
          
       public SingleDistributedLockTemplate(String server, String port) {
          Config config = new Config();
          config.useSingleServer().setAddress(server + ":" + port);
          redisson = Redisson.create(config);
       }
      
       @Override
       public <T> T lock(DistributedLockCallback<T> callback) {
          return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT);
       }
      
       @Override
       public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit) {
          RLock lock = null;
          try {
             lock = redisson.getLock(callback.getLockName());
             lock.lock(leaseTime, timeUnit);
             return callback.process();
          } finally {
             if (lock != null) {
                lock.unlock();
             }
          }
       }
         
       public void destroy() {
          if(redisson != null && !redisson.isShutdown()){
             redisson.shutdown();
          }     
       }
    }
    
  • 在需要加锁的时候,调用接口中的加锁方法,并实现回调接口。

    获取访问权限的token,需要保证共享资源token的独占性,加分布式锁。

    @Autowired
    private IAccessTokenService accessTokenService;
      
    @Autowired
    private DistributedLockTemplate dLockTemplate;
      
    @Override
    public AccessToken getAccessToken() {
       AccessToken accessToken = accessTokenService.getAccessTokenFromRemote();
       if (accessToken != null) {
          return accessToken;
       }
      
       return dLockTemplate.lock(new DistributedLockCallback<AccessToken>() {
      
          @Override
          public AccessToken process() {
             return accessTokenService.getAccessTokenFromRemote();
          }
      
          @Override
          public String getLockName() {
             return "com.lcb.soa.wechat.service.IAccessTokenService.getAccessTokenFromRemote()";
          }
       });
    }
    

3.数据库乐观锁实现分布式锁

创建订单时,对商品库存的操作,采用数据库乐观锁实现了分布式锁。

shop_inventory表:

列名 描述
quantity 库存的原始数量
quantity_freezed 被冻结的库存数量
quantity_used 成功下单后被使用的库存数量
quantity_actual 剩余的真实的库存数量

shop_inventory_operation表:

列名 描述
inventory_id 库存表主键id
operate_type 操作类型:0:冻结,1:解冻,2:减库存,3:还库存
quantity 一个订单锁定的库存数量,目前都为1

流程描述:每种操作类型,operation表都增加一条记录。创建订单,冻结库存;支付完成,减库存;未支付取消订单,解冻库存;支付完成后取消订单,还库存。

创建订单时,对库存还没有进行过任何操作,所以只需要判断当前剩余的库存量大于0即可创建成功;

对乐观锁的实现,主要是对创建成功后这条记录的判断。支付完成需要减库存,首先判断有一条冻结的操作记录;未支付取消订单需要解冻库存,判断有一条冻结的操作记录;支付完成后取消订单需要还库存,需要判断有一条减库存的操作记录。

具体代码:

  • 冻结库存:

    public Boolean lockInventory(Long orderId,java.util.Date effectDate, Integer storeId, Integer quantity ) {
    		Boolean retValue = true;
    		Inventory inventory = getInventory(effectDate,dateyMd,storeId);
    		List<InventoryOperation> operations = getInventoryOperation(orderId);
    		//有效的冻结操作记录条数
    		Integer lockedCountActive = 0;
    		//减库存的 操作有效记录
    		Integer reduceCount = 0;
    		for(InventoryOperation operation :operations){
    			//0:冻结,1:解冻,2:减库存,3:还库存
    			if(operation.getOperateType()==0){
    				lockedCountActive =lockedCountActive +1;
    			}
    			if(operation.getOperateType() ==2){
    				reduceCount = reduceCount + 1;
    			}
    		}
    		//被锁定的操作记录数为0
    		if(lockedCountActive ==0){
    			//实际库存有余量大于0
    			if(inventory.getQuantityActual()>0){
    				//如果有减库存的操作,则不允许冻结(已减过库存的订单不允许再冻结库存)
    				if(reduceCount = 0){
                        ...... //operation表添加冻结操作记录
    				}
    			}else {
    				retValue = false;
    			}
    		}
    		return retValue;
    	}
    
  • 解冻库存:

    主要的判断逻辑 lockedCountActive==1

    public Boolean unLockInventory(Long orderId,java.util.Date effectDate, Integer storeId, Integer quantity) {
       Boolean retValue =true;
       Inventory inventory = getInventory(effectDate,dateyMd,storeId);
       List<InventoryOperation> operations = getInventoryOperation(orderId);
       //有效的冻结操作记录条数
       Integer lockedCountActive = 0;
       //有效的解冻操作记录条数
       Integer unLockedCountActive = 0;
       //减库存操作记录条数
       Integer reduceCount =0;
       for(InventoryOperation operation :operations){
          //0:冻结,1:解冻,2:减库存,3:还库存
          if(operation.getOperateType()==0){
             lockedCountActive =lockedCountActive +1;
          }
          if(operation.getOperateType()==1){
             unLockedCountActive = unLockedCountActive +1;
          }
       }
       //如果已冻结过库存则允许解冻 如果未解冻过则允许解冻 且未有过有效库存减少
       if(lockedCountActive == 1&&unLockedCountActive == 0&& reduceCount==0){
          //真实库存小于库存上限的情况下才允许解冻库存
          if(inventory.getQuantityActual()< inventory.getQuantity()){
             ...... //添加解冻操作记录,删除冻结操作记录
          }
       }
       return retValue;
    }
    
  • 减库存:

    主要的判断逻辑lockedCountActive==1

    public Boolean storeInventoryReduce(Long orderId,java.util.Date effectDate, Integer storeId, Integer quantity) {
       Boolean retValue = true;
         
       Inventory inventory = getInventoryByStoreIdDate(dateyMd,storeId);
       List<InventoryOperation> operations = getInventoryOperationByOrderId(orderId);
       //减过库存的记录数据
       Integer reduceCount=0;
       //冻结库存 记录数
       Integer lockedCountActive = 0;
       for(InventoryOperation inventoryOperation:operations){
          //0:冻结,1:解冻,2:减库存,3:还库存
          if(inventoryOperation.getOperateType()==2){
             reduceCount = reduceCount+1;
          }
          if(inventoryOperation.getOperateType()==0){
             lockedCountActive =lockedCountActive +1;
          }
       }
       //未减过库存才允许减库存,冻结过库存才允许 减库存
       if(reduceCount == 0 &&lockedCountActive==1){
          //可扣减库存不够了
          if(inventory.getQuantity()>inventory.getQuantityUsed()){
              ...... //添加减库存操作记录,删除冻结库存操作记录
          }else{
             retValue =false;
          }
       }
       return retValue;
    }
    
  • 还库存:

    主要的判断逻辑reduceCount==1

    public Boolean storeInventoryAdd(Long orderId,Date effectDate, Integer storeId, Integer quantity) {
       Boolean retValue = true;
       Inventory inventory = getInventory(effectDate,dateyMd,storeId);
       List<InventoryOperation> operations = getInventoryOperation(orderId);
       //减过库存的记录条数
       Integer reduceCount=0;
       //还过库存的记录条数
       Integer addCount=0;
       for(InventoryOperation inventoryOperation:operations){
          //2:减库存,3:还库存
          if(inventoryOperation.getOperateType()==2){
             reduceCount = reduceCount+1;
          }
          if(inventoryOperation.getOperateType()==3){
             addCount =addCount+1;
          }
       }
       //未还过库存则允许还库存,且有过扣库存才允许还库存
       if(addCount == 0&&reduceCount==1){
          if(inventory.getQuantityActual()<inventory.getQuantity()){
              ...... //添加还库存的操作,删除减库存的操作
          }
       }
       return retValue;
    }
    

Search

    Table of Contents