本篇内容主要讲解“redis中的分布式锁有哪些特点”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“redis中的分布式锁有哪些特点”吧!

分布式锁的特点

1.独占性

不论在任何情况下都只能有一个线程持有锁。

2.高可用

redis集群环境不能因为某一个节点宕机而出现获取锁或释放锁失败。

3.防死锁

必须有超时控制机制或者撤销操作。

4.不乱抢

自己加锁,自己释放。不能释放别人加的锁。

5.重入性

同一线程可以多次加锁。

redis单机怎么实现

一般情况下都是使用setnx+lua脚本实现。

直接贴代码

packagecom.fandf.test.redis;importcn.hutool.core.util.IdUtil;importcn.hutool.core.util.RandomUtil;importlombok.extern.slf4j.Slf4j;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.data.redis.core.script.DefaultRedisScript;importorg.springframework.stereotype.Service;importjavax.annotation.Resource;importjava.util.Collections;importjava.util.concurrent.TimeUnit;/***redis单机锁**@authorfandongfeng*@date2023/3/2906:52*/@Slf4j@ServicepublicclassRedisLock{@ResourceRedisTemplate<String,Object>redisTemplate;privatestaticfinalStringSELL_LOCK="kill:";/***模拟秒杀**@return是否成功*/publicStringkill(){StringproductId="123";Stringkey=SELL_LOCK+productId;//锁value,解锁时用来判断当前锁是否是自己加的Stringvalue=IdUtil.fastSimpleUUID();//加锁十秒钟过期防死锁Booleanflag=redisTemplate.opsForValue().setIfAbsent(key,value,10,TimeUnit.SECONDS);if(!flag){return"加锁失败";}try{StringproductKey="good123";//获取商品库存Integerstock=(Integer)redisTemplate.opsForValue().get(productKey);if(stock==null){//模拟录入数据,实际应该加载时从数据库读取redisTemplate.opsForValue().set(productKey,100);stock=100;}if(stock<=0){return"卖完了,下次早点来吧";}//扣减库存,模拟随机卖出数量intrandomInt=RandomUtil.randomInt(1,10);redisTemplate.opsForValue().decrement(productKey,randomInt);//修改db,可以丢到队列里慢慢处理return"成功卖出"+randomInt+"个,库存剩余"+redisTemplate.opsForValue().get(productKey)+"个";}finally{////这种方法会存在删除别人加的锁的可能//redisTemplate.delete(key);//if(value.equals(redisTemplate.opsForValue().get(key))){////因为if条件的判断和delete不是原子性的,////if条件判断成功后,恰好锁到期自己解锁////此时别的线程如果持有锁了,就会把别人的锁删除掉//redisTemplate.delete(key);//}//使用lua脚本保证判断和删除的原子性StringluaScript="if(redis.call('get',KEYS[1])==ARGV[1])then"+"returnredis.call('del',KEYS[1])"+"else"+"return0"+"end";redisTemplate.execute(newDefaultRedisScript<>(luaScript,Boolean.class),Collections.singletonList(key),value);}}}

进行单元测试,模拟一百个线程同时进行秒杀

packagecom.fandf.test.redis;importorg.junit.jupiter.api.DisplayName;importorg.junit.jupiter.api.RepeatedTest;importorg.junit.jupiter.api.Test;importorg.junit.jupiter.api.parallel.Execution;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;importstaticorg.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;/***@Description:*@author:fandongfeng*@date:2023-3-2416:45*/@SpringBootTestclassSignServiceTest{@ResourceRedisLockredisLock;@RepeatedTest(100)@Execution(CONCURRENT)publicvoidredisLock(){Stringresult=redisLock.kill();if("加锁失败".equals(result)){}else{System.out.println(result);}}}

只有三个线程抢到了锁

成功卖出5个,库存剩余95个成功卖出8个,库存剩余87个成功卖出7个,库存剩余80个

redis锁有什么问题?

总的来说有两个:

1.无法重入。

2.我们为了防止死锁,加锁时都会加上过期时间,这个时间大部分情况下都是根据经验对现有业务评估得出来的,但是万一程序阻塞或者异常,导致执行了很长时间,锁过期就会自动释放了。此时如果别的线程拿到锁,执行逻辑,就有可能出现问题。

那么这两个问题有没有办法解决呢?有,接下来我们就来讲讲Redisson

Redisson实现分布式锁Redisson是什么?

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

springboot集成Redisson

集成很简单,只需两步

pom引入依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId></dependency>

application.yml增加redis配置

spring:application:name:testredis:host:127.0.0.1port:6379

使用也很简单,只需要注入RedissonClient即可

packagecom.fandf.test.redis;importlombok.extern.slf4j.Slf4j;importorg.redisson.api.RLock;importorg.redisson.api.RedissonClient;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/***@authorfandongfeng*/@Component@Slf4jpublicclassRedissonTest{@ResourceRedissonClientredissonClient;publicvoidtest(){RLockrLock=redissonClient.getLock("anyKey");//rLock.lock(10,TimeUnit.SECONDS);rLock.lock();try{//dosomething}catch(Exceptione){log.error("业务异常",e);}finally{rLock.unlock();}}}

可能不了解redisson的小伙伴会不禁发出疑问。
what?加锁时不需要加过期时间吗?这样会不会导致死锁啊。解锁不需要判断是不是自己持有吗?
哈哈,别着急,我们接下来一步步揭开redisson的面纱。

Redisson lock()源码跟踪

我们来一步步跟着lock()方法看下源码(本地redisson版本为3.20.0)

//RedissonLock.class@Overridepublicvoidlock(){try{lock(-1,null,false);}catch(InterruptedExceptione){thrownewIllegalStateException();}}

查看lock(-1, null, false);方法

privatevoidlock(longleaseTime,TimeUnitunit,booleaninterruptibly)throwsInterruptedException{//获取当前线程idlongthreadId=Thread.currentThread().getId();//加锁代码块,返回锁的失效时间Longttl=tryAcquire(-1,leaseTime,unit,threadId);//lockacquiredif(ttl==null){return;}CompletableFuture<RedissonLockEntry>future=subscribe(threadId);pubSub.timeout(future);RedissonLockEntryentry;if(interruptibly){entry=commandExecutor.getInterrupted(future);}else{entry=commandExecutor.get(future);}try{while(true){ttl=tryAcquire(-1,leaseTime,unit,threadId);//lockacquiredif(ttl==null){break;}//waitingformessageif(ttl>=0){try{entry.getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);}catch(InterruptedExceptione){if(interruptibly){throwe;}entry.getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);}}else{if(interruptibly){entry.getLatch().acquire();}else{entry.getLatch().acquireUninterruptibly();}}}}finally{unsubscribe(entry,threadId);}//get(lockAsync(leaseTime,unit));}

我们看下它是怎么上锁的,也就是tryAcquire方法

privateLongtryAcquire(longwaitTime,longleaseTime,TimeUnitunit,longthreadId){//真假加锁方法tryAcquireAsyncreturnget(tryAcquireAsync(waitTime,leaseTime,unit,threadId));}

publicRedissonLock(CommandAsyncExecutorcommandExecutor,Stringname){super(commandExecutor,name);this.commandExecutor=commandExecutor;this.internalLockLeaseTime=commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();this.pubSub=commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}private<T>RFuture<Long>tryAcquireAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId){RFuture<Long>ttlRemainingFuture;if(leaseTime>0){ttlRemainingFuture=tryLockInnerAsync(waitTime,leaseTime,unit,threadId,RedisCommands.EVAL_LONG);}else{//waitTime和leaseTime都是-1,所以走这里//过期时间internalLockLeaseTime初始化的时候赋值commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();//跟进去源码发现默认值是30秒,privatelonglockWatchdogTimeout=30*1000;ttlRemainingFuture=tryLockInnerAsync(waitTime,internalLockLeaseTime,TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);}CompletionStage<Long>s=handleNoSync(threadId,ttlRemainingFuture);ttlRemainingFuture=newCompletableFutureWrapper<>(s);//加锁成功,开启子线程进行续约CompletionStage<Long>f=ttlRemainingFuture.thenApply(ttlRemaining->{//lockacquiredif(ttlRemaining==null){if(leaseTime>0){//如果指定了过期时间,则不续约internalLockLeaseTime=unit.toMillis(leaseTime);}else{//没指定过期时间,或者小于0,在这里实现锁自动续约scheduleExpirationRenewal(threadId);}}returnttlRemaining;});returnnewCompletableFutureWrapper<>(f);}

上面代码里面包含加锁和锁续约的逻辑,我们先来看看加锁的代码

<T>RFuture<T>tryLockInnerAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommand<T>command){returnevalWriteAsync(getRawName(),LongCodec.INSTANCE,command,"if((redis.call('exists',KEYS[1])==0)"+"or(redis.call('hexists',KEYS[1],ARGV[2])==1))then"+"redis.call('hincrby',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+"returnredis.call('pttl',KEYS[1]);",Collections.singletonList(getRawName()),unit.toMillis(leaseTime),getLockName(threadId));}

这里就看的很明白了吧,redisson使用了lua脚本来保证了命令的原子性。
redis.call('hexists', KEYS[1], ARGV[2]) 查看 key value 是否存在。

Redis Hexists 命令用于查看哈希表的指定字段是否存在。
如果哈希表含有给定字段,返回 1 。 如果哈希表不含有给定字段,或 key 不存在,返回 0 。

127.0.0.1:6379>hexists123uuid(integer)0127.0.0.1:6379>hincrby123uuid1(integer)1127.0.0.1:6379>hincrby123uuid1(integer)2127.0.0.1:6379>hincrby123uuid1(integer)3127.0.0.1:6379>hexists123uuid(integer)1127.0.0.1:6379>hgetall1231)"uuid"2)"3"127.0.0.1:6379>

当key不存在,或者已经含有给定字段(也就是已经加过锁了,这里是为了实现重入性),直接对字段的值+1
这个字段的值,也就是ARGV[2], 取得是getLockName(threadId)方法,我们再看看这个字段的值是什么

protectedStringgetLockName(longthreadId){returnid+":"+threadId;}publicRedissonBaseLock(CommandAsyncExecutorcommandExecutor,Stringname){super(commandExecutor,name);this.commandExecutor=commandExecutor;this.id=commandExecutor.getServiceManager().getId();this.internalLockLeaseTime=commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();this.entryName=id+":"+name;}//commandExecutor.getServiceManager()的id默认值privatefinalStringid=UUID.randomUUID().toString();

这里就明白了,字段名称是 uuid + : + threadId

接下来我们看看锁续约的代码scheduleExpirationRenewal(threadId);

protectedvoidscheduleExpirationRenewal(longthreadId){ExpirationEntryentry=newExpirationEntry();//判断该实例是否加过锁ExpirationEntryoldEntry=EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(),entry);if(oldEntry!=null){//重入次数+1oldEntry.addThreadId(threadId);}else{//第一次加锁entry.addThreadId(threadId);try{//锁续约核心代码renewExpiration();}finally{if(Thread.currentThread().isInterrupted()){//如果线程异常终止,则关闭锁续约线程cancelExpirationRenewal(threadId);}}}}

我们看看renewExpiration()方法

privatevoidrenewExpiration(){ExpirationEntryee=EXPIRATION_RENEWAL_MAP.get(getEntryName());if(ee==null){return;}//新建一个线程执行Timeouttask=commandExecutor.getServiceManager().newTimeout(newTimerTask(){@Overridepublicvoidrun(Timeouttimeout)throwsException{ExpirationEntryent=EXPIRATION_RENEWAL_MAP.get(getEntryName());if(ent==null){return;}LongthreadId=ent.getFirstThreadId();if(threadId==null){return;}//设置锁过期时间为30秒CompletionStage<Boolean>future=renewExpirationAsync(threadId);future.whenComplete((res,e)->{if(e!=null){log.error("Can'tupdatelock{}expiration",getRawName(),e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}//检查锁是还否存在if(res){//rescheduleitself10后调用自己renewExpiration();}else{//关闭续约cancelExpirationRenewal(null);}});}},internalLockLeaseTime/3,TimeUnit.MILLISECONDS);//注意上行代码internalLockLeaseTime/3,//internalLockLeaseTime默认30s,那么也就是10s检查一次ee.setTimeout(task);}//设置锁过期时间为internalLockLeaseTime也就是30slua脚本保证原子性protectedCompletionStage<Boolean>renewExpirationAsync(longthreadId){returnevalWriteAsync(getRawName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"return1;"+"end;"+"return0;",Collections.singletonList(getRawName()),internalLockLeaseTime,getLockName(threadId));}

OK,分析到这里我们已经知道了,lock(),方法会默认加30秒过期时间,并且开启一个新线程,每隔10秒检查一下,锁是否释放,如果没释放,就将锁过期时间设置为30秒,如果锁已经释放,那么就将这个新线程也关掉。

我们写个测试类看看

packagecom.fandf.test.redis;importorg.junit.jupiter.api.Test;importorg.redisson.api.RLock;importorg.redisson.api.RedissonClient;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;/***@Description:*@author:fandongfeng*@date:2023-3-2416:45*/@SpringBootTestclassRedissonTest{@ResourceprivateRedissonClientredisson;@TestpublicvoidwatchDog()throwsInterruptedException{RLocklock=redisson.getLock("123");lock.lock();Thread.sleep(1000000);}}

查看锁的过期时间,及是否续约

127.0.0.1:6379>keys*1)"123"127.0.0.1:6379>ttl123(integer)30127.0.0.1:6379>ttl123(integer)26127.0.0.1:6379>ttl123(integer)24127.0.0.1:6379>ttl123(integer)22127.0.0.1:6379>ttl123(integer)21127.0.0.1:6379>ttl123(integer)20127.0.0.1:6379>ttl123(integer)30127.0.0.1:6379>ttl123(integer)28127.0.0.1:6379>

我们再改改代码,看看是否可重入和字段名称是否和我们预期一致

packagecom.fandf.test.redis;importorg.junit.jupiter.api.Test;importorg.redisson.api.RLock;importorg.redisson.api.RedissonClient;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;/***@Description:*@author:fandongfeng*@date:2023-3-2416:45*/@SpringBootTestclassRedissonTest{@ResourceprivateRedissonClientredisson;@TestpublicvoidwatchDog()throwsInterruptedException{RLocklock=redisson.getLock("123");lock.lock();lock.lock();lock.lock();//加了三次锁,此时重入次数为3Thread.sleep(3000);//解锁一次,此时重入次数变为3lock.unlock();Thread.sleep(1000000);}}

127.0.0.1:6379>keys*1)"123"127.0.0.1:6379>127.0.0.1:6379>ttl123(integer)24127.0.0.1:6379>hgetall1231)"df7f4c71-b57b-455f-acee-936ad8475e01:12"2)"3"127.0.0.1:6379>127.0.0.1:6379>hgetall1231)"df7f4c71-b57b-455f-acee-936ad8475e01:12"2)"2"127.0.0.1:6379>

我们加锁了三次,重入次数是3,字段值也是 uuid+:+threadId,和我们预期结果是一致的。

Redlock算法

redisson是基于Redlock算法实现的,那么什么是Redlock算法呢?

假设当前集群有5个节点,那么运行redlock算法的客户端会一次执行下面步骤

1.客户端记录当前系统时间,以毫秒为单位

2.依次尝试从5个redis实例中,使用相同key获取锁
当redis请求获取锁时,客户端会设置一个网络连接和响应超时时间,避免因为网络故障等原因导致阻塞。

3.客户端使用当前时间减去开始获取锁时间(步骤1的时间),得到获取锁消耗的时间
只有当半数以上redis节点加锁成功,并且加锁消耗的时间要小于锁失效时间,才算锁获取成功

4.如果获取到了锁,key的真正有效时间等于锁失效时间 减去 获取锁消耗的时间

5.如果获取锁失败,所有的redis实例都会进行解锁
防止因为服务端响应消息丢失,但是实际数据又添加成功导致数据不一致问题

这里有下面几个点需要注意:

1.我们都知道单机的redis是cp的,但是集群情况下redis是ap的,所以运行Redisson的节点必须是主节点,不能有从节点,防止主节点加锁成功未同步从节点就宕机,而客户端却收到加锁成功,导致数据不一致问题。

2.为了提高redis节点宕机的容错率,可以使用公式2N(n指宕机数量)+1,假设宕机一台,Redisson还要继续运行,那么至少要部署2*1+1=3台主节点。

到此,相信大家对“redis中的分布式锁有哪些特点”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!