这期内容当中小编将会给大家带来有关Redis中怎样实现一个分布式锁,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

方案

使用 SETNX 和 EXPIRE 命令

SETNXkeyvalueEXPIREkeysecondsDELkeyif(setnx("item_1_lock",1)){expire("item_1_lock",30);try{...逻辑}catch{...}finally{del("item_1_lock");}}

这种方法看起来可以解决问题,但是有一定的风险,因为 SETNX 和 EXPIRE 这波操作是非原子性的,如果 SETNX 成功之后,出现错误,导致 EXPIRE 没有执行,导致锁没有设置超时时间形成死锁。

针对这种情况,我们可以使用 lua 脚本来保持操作原子性,保证 SETNX 和 EXPIRE 两个操作要么都成功,要么都不成功。

if(redis.call('setnx',KEYS[1],ARGV[1])<1)thenreturn0;end;redis.call('expire',KEYS[1],tonumber(ARGV[2]));return1;

通过这样的方法,我们初步解决了竞争锁的原子性问题,虽然其他功能还未实现,但是应该不会造成死锁???。

Redis 2.6.12 以上可灵活使用 SET 命令

SETkeyvalueNXEX30DELkeyif(set("item_1_lock",1,"NX","EX",30)){try{...逻辑}catch{...}finally{del("item_1_lock");}}

改进后的方法不需要借助 lua 脚本就解决了 SETNX 和 EXPIRE 的原子性问题。现在我们再仔细琢磨琢磨,如果 A 拿到了锁顺利进入代码块执行逻辑,但是由于各种原因导致超时自动释放锁。在这之后 B 成功拿到了锁进入代码块执行逻辑,但此时如果 A 执行逻辑完毕再来释放锁,就会把 B 刚获得的锁释放了。就好比用自己家的钥匙开了别家的门,这是不可接受的。

为了解决这个问题我们可以尝试在 SET 的时候设置一个锁标识,然后在 DEL 的时候验证当前锁是否为自己的锁。

Stringvalue=UUID.randomUUID().toString().replaceAll("-","");if(set("item_1_lock",value,"NX","EX",30)){try{...逻辑}catch{...}finally{...lua脚本保证原子性}}if(redis.call('get',KEYS[1])==ARGV[1])thenreturnredis.call('del',KEYS[1])elsereturn0end

到这里,我们终于解决了竞争锁的原子性问题和误删锁问题。但是锁一般还需要支持可重入、循环等待和超时自动续约等功能点。下面我们学习使用一个非常好用的包来解决这些问题。

入门 Redisson

Redission 的锁,实现了可重入和超时自动续约功能,它都帮我们封装好了,我们只要按照自己的需求调用它的 API 就可以轻松实现上面所提到的几个功能点。详细功能可以查看 Redisson 文档

在项目中安装 Redisson

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.2</version></dependency>

implementation'org.redisson:redisson:3.13.2'

用 Maven 或者 Gradle 构建,目前最新版本为 3.13.2,也可以在这里 Redisson 找到你需要的版本。

简单尝试

RedissonClientredissonClient=Redisson.create();RLocklock=redissonClient.getLock("lock");booleanres=lock.lock();if(res){try{...逻辑}finally{lock.unlock();}}

Redisson 将底层逻辑全部做了一个封装 ?,我们无需关心具体实现,几行代码就能使用一把完美的锁。下面我们简单折腾折腾源码 ???。

加锁

privatevoidlock(longleaseTime,TimeUnitunit,booleaninterruptibly)throwsInterruptedException{longthreadId=Thread.currentThread().getId();Longttl=tryAcquire(leaseTime,unit,threadId);if(ttl==null){return;}RFuture<RedissonLockEntry>future=subscribe(threadId);if(interruptibly){commandExecutor.syncSubscriptionInterrupted(future);}else{commandExecutor.syncSubscription(future);}try{while(true){ttl=tryAcquire(leaseTime,unit,threadId);if(ttl==null){break;}if(ttl>=0){try{future.getNow().getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);}catch(InterruptedExceptione){if(interruptibly){throwe;}future.getNow().getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);}}else{if(interruptibly){future.getNow().getLatch().acquire();}else{future.getNow().getLatch().acquireUninterruptibly();}}}}finally{unsubscribe(future,threadId);}}

获取锁

private<T>RFuture<Long>tryAcquireAsync(longleaseTime,TimeUnitunit,longthreadId){if(leaseTime!=-1){returntryLockInnerAsync(leaseTime,unit,threadId,RedisCommands.EVAL_LONG);}RFuture<Long>ttlRemainingFuture=tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining,e)->{if(e!=null){return;}if(ttlRemaining==null){scheduleExpirationRenewal(threadId);}});returnttlRemainingFuture;}<T>RFuture<T>tryLockInnerAsync(longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommand<T>command){internalLockLeaseTime=unit.toMillis(leaseTime);returnevalWriteAsync(getName(),LongCodec.INSTANCE,command,"if(redis.call('exists',KEYS[1])==0)then"+"redis.call('hincrby',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+"redis.call('hincrby',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+"returnredis.call('pttl',KEYS[1]);",Collections.singletonList(getName()),internalLockLeaseTime,getLockName(threadId));}

删除锁

publicRFuture<Void>unlockAsync(longthreadId){RPromise<Void>result=newRedissonPromise<Void>();RFuture<Boolean>future=unlockInnerAsync(threadId);future.onComplete((opStatus,e)->{cancelExpirationRenewal(threadId);if(e!=null){result.tryFailure(e);return;}if(opStatus==null){IllegalMonitorStateExceptioncause=newIllegalMonitorStateException("attempttounlocklock,notlockedbycurrentthreadbynodeid:"+id+"thread-id:"+threadId);result.tryFailure(cause);return;}result.trySuccess(null);});returnresult;}protectedRFuture<Boolean>unlockInnerAsync(longthreadId){returnevalWriteAsync(getName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if(redis.call('hexists',KEYS[1],ARGV[3])==0)then"+"returnnil;"+"end;"+"localcounter=redis.call('hincrby',KEYS[1],ARGV[3],-1);"+"if(counter>0)then"+"redis.call('pexpire',KEYS[1],ARGV[2]);"+"return0;"+"else"+"redis.call('del',KEYS[1]);"+"redis.call('publish',KEYS[2],ARGV[1]);"+"return1;"+"end;"+"returnnil;",Arrays.asList(getName(),getChannelName()),LockPubSub.UNLOCK_MESSAGE,internalLockLeaseTime,getLockName(threadId));}

上述就是小编为大家分享的Redis中怎样实现一个分布式锁了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。