这篇文章主要讲解了“Redis的分布式锁应该怎么打开”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Redis的分布式锁应该怎么打开”吧!

要求

基于Redis实现分布式锁需要满足如下几点要求:

在分布式集群中,被分布式锁控制的方法或代码段同一时刻只能被一个客户端上面的一个线程执行,也就是互斥

锁信息需要设置过期时间,避免一个线程长期占有(比如在做解锁操作前异常退出)而导致死锁

加锁与解锁必须一致,谁加的锁,就由谁来解(或过期超时),一个客户端不能解开另一个客户端加的锁

加锁与解锁的过程必须保证原子性

实现1. 加锁实现

基于Redis的分布式锁加锁操作一般使用SETNX命令,其含义是“将key的值设为value,当且仅当key不存在。若给定的key已经存在,则SETNX不做任何动作”。
在 Spring Boot 中,可以使用 StringRedisTemplate 来实现,如下,一行代码即可实现加锁过程。(下列代码给出两种调用形式——立即返回加锁结果与给定超时时间获取加锁结果)

/***尝试获取锁(立即返回)*@paramkey锁的rediskey*@paramvalue锁的value*@paramexpire过期时间/秒*@return是否获取成功*/publicbooleanlock(Stringkey,Stringvalue,longexpire){returnstringRedisTemplate.opsForValue().setIfAbsent(key,value,expire,TimeUnit.SECONDS);}/***尝试获取锁,并至多等待timeout时长**@paramkey锁的rediskey*@paramvalue锁的value*@paramexpire过期时间/秒*@paramtimeout超时时长*@paramunit时间单位*@return是否获取成功*/publicbooleanlock(Stringkey,Stringvalue,longexpire,longtimeout,TimeUnitunit){longwaitMillis=unit.toMillis(timeout);longwaitAlready=0;while(!stringRedisTemplate.opsForValue().setIfAbsent(key,value,expire,TimeUnit.SECONDS)&&waitAlready<waitMillis){try{Thread.sleep(waitMillisPer);}catch(InterruptedExceptione){log.error("Interruptedwhentryingtogetalock.key:{}",key,e);}waitAlready+=waitMillisPer;}if(waitAlready<waitMillis){returntrue;}log.warn("<======lock{}failedafterwaitingfor{}ms",key,waitAlready);returnfalse;}

上述实现如何满足前面提到的几点要求:

客户端互斥: 可以将expire过期时间设置为大于同步代码的执行时间,比如同步代码块执行时间为1s,则可将expire设置为3s或5s。避免同步代码执行过程中expire时间到,其它客户端又可以获取锁执行同步代码块。

通过设置过期时间expire来避免某个客户端长期占有锁。

通过value来控制谁加的锁,由谁解的逻辑,比如可以使用requestId作为value,requestId唯一标记一次请求。

setIfAbsent方法 底层通过调用 Redis 的SETNX命令,操作具备原子性。

错误示例:

网上有如下实现,

publicbooleanlock(Stringkey,Stringvalue,longexpire){booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,value);if(result){stringRedisTemplate.expire(key,expire,TimeUnit.SECONDS);}returnresult;}

该实现的问题是如果在result为true,但还没成功设置expire时,程序异常退出了,将导致该锁一直被占用而导致死锁,不满足第二点要求。

2. 解锁实现

解锁也需要满足前面所述的四个要求,实现代码如下:

privatestaticfinalStringRELEASE_LOCK_LUA_SCRIPT="ifredis.call('get',KEYS[1])==ARGV[1]thenreturnredis.call('del',KEYS[1])elsereturn0end";privatestaticfinalLongRELEASE_LOCK_SUCCESS_RESULT=1L;/***释放锁*@paramkey锁的rediskey*@paramvalue锁的value*/publicbooleanunLock(Stringkey,Stringvalue){DefaultRedisScript<Long>redisScript=newDefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT,Long.class);longresult=stringRedisTemplate.execute(redisScript,Collections.singletonList(key),value);returnObjects.equals(result,RELEASE_LOCK_SUCCESS_RESULT);}

这段实现使用一个Lua脚本来实现解锁操作,保证操作的原子性。传入的value值需与该线程加锁时的value一致,可以使用requestId(具体实现下面给出)。

错误示例:

publicbooleanunLock(Stringkey,Stringvalue){StringoldValue=stringRedisTemplate.opsForValue().get(key);if(value.equals(oldValue)){stringRedisTemplate.delete(key);}}

该实现先获取锁的当前值,判断两值相等则删除。考虑一种极端情况,如果在判断为true时,刚好该锁过期时间到,另一个客户端加锁成功,则接下来的delete将不管三七二十一将别人加的锁直接删掉了,不满足第三点要求。该示例主要是因为没有保证解锁操作的原子性导致。

3. 注解支持

为了方便使用,添加一个注解,可以放于方法上控制方法在分布式环境中的同步执行。

/***标注在方法上的分布式锁注解*/@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.METHOD)public@interfaceDistributedLockable{Stringkey();Stringprefix()default"disLock:";longexpire()default10L;//默认10s过期}

添加一个切面来解析注解的处理,

/***分布式锁注解处理切面*/@Aspect@Slf4jpublicclassDistributedLockAspect{privateDistributedLocklock;publicDistributedLockAspect(DistributedLocklock){this.lock=lock;}/***在方法上执行同步锁*/@Around(value="@annotation(lockable)")publicObjectdistLock(ProceedingJoinPointpoint,DistributedLockablelockable)throwsThrowable{booleanlocked=false;Stringkey=lockable.prefix()+lockable.key();try{locked=lock.lock(key,WebUtil.getRequestId(),lockable.expire());if(locked){returnpoint.proceed();}else{log.info("Didnotgetalockforkey{}",key);returnnull;}}catch(Exceptione){throwe;}finally{if(locked){if(!lock.unLock(key,WebUtil.getRequestId())){log.warn("Unlock{}failed,maybelockedbyanotherclientalready.",lockable.key());}}}}}

RequestId 的实现如下,通过注册一个Filter,在请求开始时生成一个uuid存于ThreadLocal中,在请求返回时清除。

publicclassWebUtil{publicstaticfinalStringREQ_ID_HEADER="Req-Id";privatestaticfinalThreadLocal<String>reqIdThreadLocal=newThreadLocal<>();publicstaticvoidsetRequestId(StringrequestId){reqIdThreadLocal.set(requestId);}publicstaticStringgetRequestId(){StringrequestId=reqIdThreadLocal.get();if(requestId==null){requestId=ObjectId.next();reqIdThreadLocal.set(requestId);}returnrequestId;}publicstaticvoidremoveRequestId(){reqIdThreadLocal.remove();}}publicclassRequestIdFilterimplementsFilter{@OverridepublicvoiddoFilter(ServletRequestservletRequest,ServletResponseservletResponse,FilterChainfilterChain)throwsIOException,ServletException{HttpServletRequesthttpServletRequest=(HttpServletRequest)servletRequest;StringreqId=httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER);//没有则生成一个if(StringUtils.isEmpty(reqId)){reqId=ObjectId.next();}WebUtil.setRequestId(reqId);try{filterChain.doFilter(servletRequest,servletResponse);}finally{WebUtil.removeRequestId();}}}//在配置类中注册Filter/***添加RequestId*@return*/@BeanpublicFilterRegistrationBeanrequestIdFilter(){RequestIdFilterreqestIdFilter=newRequestIdFilter();FilterRegistrationBeanregistrationBean=newFilterRegistrationBean();registrationBean.setFilter(reqestIdFilter);List<String>urlPatterns=Collections.singletonList("/*");registrationBean.setUrlPatterns(urlPatterns);registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE+1);returnregistrationBean;}4. 使用注解

@DistributedLockable(key="test",expire=10)publicvoidtest(){System.out.println("线程-"+Thread.currentThread().getName()+"开始执行..."+LocalDateTime.now());try{Thread.sleep(2000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("线程-"+Thread.currentThread().getName()+"结束执行..."+LocalDateTime.now());}总结

本文给出了基于Redis的分布式锁的实现方案与常见的错误示例。要保障分布式锁的正确运行,需满足本文所提的四个要求,尤其注意保证加锁解锁操作的原子性,设置过期时间,及对同一个锁的加锁解锁线程一致。

感谢各位的阅读,以上就是“Redis的分布式锁应该怎么打开”的内容了,经过本文的学习后,相信大家对Redis的分布式锁应该怎么打开这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!