这篇文章主要介绍了Redis怎么实现分布式锁和等待序列,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁synchronized 、cas 、ReentrankLock 这些锁的作用范围都是JVM ,说白了在集群下没啥用。这时我们就需要能在多台JVM 之间决定执行顺序的锁了,现在分布式锁主要有redis 、Zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。

背景

最近在做一个消费Kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了Redis 的实现方式(因为网上例子多)

分析

redis 实现的分布式锁,实现原理是set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式

丢弃

等待重试 由于我们的系统需要这些数据,那么只能重新尝试获取。这里使用redis 的List 类型实现等待序列的作用

代码

直接上代码 其实直接redis的工具类就可以解决了

packagecom.testimportredis.clients.jedis.Jedis;importjava.util.Collections;importjava.util.List;/***@descredis队列实现方式*@anthor*@date**/publicclassRedisUcUitl{privatestaticfinalStringLOCK_SUCCESS="OK";privatestaticfinalStringSET_IF_NOT_EXIST="NX";privatestaticfinalStringSET_WITH_EXPIRE_TIME="PX";privatestaticfinalLongRELEASE_SUCCESS=1L;privateRedisUcUitl(){}/***logger**//***存储redis队列顺序存储在队列首部存入**@paramkey字节类型*@paramvalue字节类型*/publicstaticLonglpush(Jedisjedis,finalbyte[]key,finalbyte[]value){returnjedis.lpush(key,value);}/***移除列表中最后一个元素并将改元素添加入另一个列表中,当列表为空时将阻塞连接直到等待超时**@paramsrckey*@paramdstkey*@paramtimeout0表示永不超时*@return*/publicstaticbyte[]brpoplpush(Jedisjedis,finalbyte[]srckey,finalbyte[]dstkey,finalinttimeout){returnjedis.brpoplpush(srckey,dstkey,timeout);}/***返回制定的key,起始位置的redis数据*@paramredisKey*@paramstart*@paramend-1表示到最后*@return*/publicstaticList<byte[]>lrange(Jedisjedis,finalbyte[]redisKey,finallongstart,finallongend){returnjedis.lrange(redisKey,start,end);}/***删除key*@paramredisKey*/publicstaticvoiddelete(Jedisjedis,finalbyte[]redisKey){returnjedis.del(redisKey);}/***尝试加锁*@paramlockKeykey名称*@paramrequestId身份标识*@paramexpireTime过期时间*@return*/publicstaticbooleantryGetDistributedLock(Jedisjedis,finalStringlockKey,finalStringrequestId,finalintexpireTime){Stringresult=jedis.set(lockKey,requestId,SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME,expireTime);returnLOCK_SUCCESS.equals(result);}/***释放锁*@paramlockKeykey名称*@paramrequestId身份标识*@return*/publicstaticbooleanreleaseDistributedLock(Jedisjedis,finalStringlockKey,finalStringrequestId){finalStringscript="ifredis.call('get',KEYS[1])==ARGV[1]thenreturnredis.call('del',KEYS[1])elsereturn0end";jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));returnRELEASE_SUCCESS.equals(result);}}

业务逻辑主要代码如下

1.先消耗队列中的

while(true){//消费队列try{//被放入redis队列的数据序列化后的byte[]bytes=RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8),dstKeyStr.getBytes(UTF_8),1);if(bytes==null||bytes.isEmpty()){//队列中没数据时退出break;}//反序列化对象Map<String,Object>singleMap=(Map<String,Object>)ObjectSerialUtil.bytesToObject(bytes);//塞入唯一的值防止被其他线程误解锁StringrequestId=UUID.randomUUID().toString();booleanlockGetFlag=RedisUcUitl.tryGetDistributedLock(keyStr,requestId,100);if(lockGetFlag){//成功获取锁进行业务处理//TODO//处理完毕释放锁booleanfreeLock=RedisUcUitl.releaseDistributedLock(keyStr,requestId);}else{//未能获得锁放入等待队列RedisUcUitl.lpush(keyStr.getBytes(UTF_8),ObjectSerialUtil.objectToBytes(param));}}catch(Exceptione){break;}}

2.处理最新接到的数据

同样是走尝试获取锁,获取不到放入队列的流程

一般序列化用fastJson 之列的就可以了,这里用的是JDK 自带的,工具类如下

publicclassObjectSerialUtil{privateObjectSerialUtil(){//工具类}/***将Object对象序列化为byte[]**@paramobj对象*@returnbyte数组*@throwsException*/publicstaticbyte[]objectToBytes(Objectobj)throwsIOException{ByteArrayOutputStreambos=newByteArrayOutputStream();ObjectOutputStreamoos=newObjectOutputStream(bos);oos.writeObject(obj);byte[]bytes=bos.toByteArray();bos.close();oos.close();returnbytes;}/***将bytes数组还原为对象**@parambytes*@return*@throwsException*/publicstaticObjectbytesToObject(byte[]bytes){try{ByteArrayInputStreambin=newByteArrayInputStream(bytes);ObjectInputStreamois=newObjectInputStream(bin);returnois.readObject();}catch(Exceptione){thrownewBaseException("反序列化出错!",e);}}}

感谢你能够认真阅读完这篇文章,希望小编分享的“Redis怎么实现分布式锁和等待序列”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!