Memcached 作用与使用 基本介绍

1,对于缓存的存取方式,简言之,就是以键值对的形式将数据保存在内存中。在日常业务中涉及的操作无非就是增删改查。加入缓存机制后,查询的时候,对数据进行缓存,增删改的时候,清除缓存即可。这其中对于缓存的闭合就非常重要,如果缓存没有及时得到更新,那用户就会获取到过期数据,就会产生问题。

2,对于单一业务的缓存管理(数据库中只操作单表),只需生成一个key,查询时,使用key,置入缓存;增删改时,使用key,清除缓存。将key与表绑定,操作相对简单。

3,但是在现实业务中,更多的是对关联表的增删改查(数据库多表操作),业务之间互相关联,数据库中的某张表不止一个业务再进行操作,将缓存拦截在service层,对业务进行缓存,对多表进行缓存。

4,业务层缓存实现策略:

  4.1,在缓存中建立一个key为"union_query",value为“hashmap<prefix,uqversion>(‘简称uqmap’)“的缓存,prefix保存的是当前业务操作涉及到的数据库表名的组合(数据库表名的唯一性),使用‘|’分隔(例 prefix=“A|B”,此次业务将操作A表与B表),uqversion是业务版本号,从0开始递增。

  4.2,调用一个查询业务时,对数据进行缓存,设置operation为1,告诉cache对象,这是一个缓存操作,例如调用 queryAB(args[])方法时,cache对象切入,将prefix(即”A|B“)与uqversion(初始化为0),存入uqmap中进行缓存。

  4.3,将prefix,uqversion,方法明+参数,进行拼接,使用md5进行加密后作为一个key,将方法的结果集作为value,进行缓存。至此缓存成功。

  4.4,当第二个请求来调用queryAB时,cache对象切入,首先,查询uqmap对象,使用prefix找到对应的uqversion,然后,通过拼接加密获取key,最后取得结果集进行返回。

  4.5,当有一个updateA方法被调用时,设置operation为4,告诉cache对象,这是一个删除缓存的操作,此时prefix的值为“A”,cache对象切入,获取全局的uqmap,遍历其中的prefix,是否包含了表A的名称:如果包含,则更新此prefix的uqversion进行自增,uqversion一旦发生变化,4.3中组合的key将不复存在,业务缓存也就消失了。(对于复杂的updateAB方法,遍历prefix要复杂一点,可以实现)

  4.6,当第三个请求来调用queryAB时,可以获取到uqversion,组合成key后,但是没有对应的value。此时确定缓存不存在时,继续正常执行方法,获取结果集,返回给客户的同时,将结果集进行缓存。

5,对于缓存的操作,网上有三种api可以选择(memcached client forjava、spymemcached、xmemcached),具体的好坏,本人在这就不做分析。本人使用的是XMemcached api。

具体实现细节:

1,新建@interface Annotation{ }定义一个注解 @Annotation,一个注解是一个类。定义缓存策略。

importjava.lang.annotation.Documented;importjava.lang.annotation.ElementType;importjava.lang.annotation.Inherited;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;/***用于查找的时候,放置缓存信息*@authorshufeng*/@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documented@Inheritedpublic@interfaceXmemCache{/***值为当前操作的表名,表名唯一*涉及到多表操作,使用|分隔*/Stringprefix()default"";/**缓存有效期设置,单位为秒*指定间隔时间,默认值为3600秒(1小时)**/intinterval()default3600;/***1从cache里取值,如果未置入cache,则置入*2replacecachevalue未扩展*3replacecachevalue,并返回旧值未扩展*4removecachekey从cache里删除对应的缓存        *5removecachekey从cache里删除对应的缓存,并返回未删除之前的值未扩展**/intoperation()default1;}

2,memcache基础操作类,对一些常用方法进行封装,对memcachedclient进行配置

importjava.io.IOException;importjava.io.InputStream;importjava.util.HashMap;importjava.util.Iterator;importjava.util.Map;importjava.util.Properties;importjava.util.Set;importjava.util.concurrent.TimeoutException;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.DisposableBean;importcom.node.hlhw.rbac.api.constant.Constant;importnet.rubyeye.xmemcached.GetsResponse;importnet.rubyeye.xmemcached.MemcachedClient;importnet.rubyeye.xmemcached.MemcachedClientBuilder;importnet.rubyeye.xmemcached.XMemcachedClientBuilder;importnet.rubyeye.xmemcached.command.BinaryCommandFactory;importnet.rubyeye.xmemcached.exception.MemcachedException;importnet.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;importnet.rubyeye.xmemcached.transcoders.SerializingTranscoder;importnet.rubyeye.xmemcached.utils.AddrUtil;/***@authorMelodyshufeng*对memcachedclient进行封装,添加一下常用方法*/publicclassMemcachedOperateimplementsDisposableBean{/**timeout-Operationtimeout,ifthemethodisnotreturnedinthis*time,throwTimeoutExceptiontimeout-operationtimeout,inmilliseconds*exp-Anexpirationtime,inseconds.Canbeupto30days.After30*days,istreatedasaunixtimestampofanexactdate.value-stored*data*/privatestaticfinalLoggerlogger=LoggerFactory.getLogger(MemcachedOperate.class);privatestaticPropertiesPROPERTIES=newProperties();privatestaticStringMEMCACHED_SETTING="memcached.properties";privatestaticMemcachedClientmemcachedClient;publicstaticMemcachedClientgetClient(){returnmemcachedClient;}/***静态代码块,类加载时,初始化缓存客户端*确保只创建一个client实例*authorshufeng*/static{InputStreamin=Object.class.getResourceAsStream("/"+MEMCACHED_SETTING);try{PROPERTIES.load(in);}catch(IOExceptione){e.printStackTrace();}Stringservers=PROPERTIES.getProperty("memcached.servers","");if(null!=servers&&!"".equals(servers)){try{logger.debug("启动memcached连接");MemcachedClientBuilderbuilder=newXMemcachedClientBuilder(AddrUtil.getAddresses(servers));builder.setConnectionPoolSize(100);builder.setFailureMode(true);builder.setCommandFactory(newBinaryCommandFactory());builder.setSessionLocator(newKetamaMemcachedSessionLocator());builder.setTranscoder(newSerializingTranscoder());memcachedClient=builder.build();memcachedClient.setEnableHeartBeat(false);//关闭心跳memcachedClient.flushAll();//清空缓存}catch(IOExceptione){e.printStackTrace();}catch(TimeoutExceptione){e.printStackTrace();}catch(InterruptedExceptione){e.printStackTrace();}catch(MemcachedExceptione){e.printStackTrace();}catch(Exceptione){e.printStackTrace();}}}/***@paramkey*@return获取value*/publicstaticObjectget(Stringkey){Objectobject=null;try{object=memcachedClient.get(key);}catch(TimeoutExceptione){e.printStackTrace();}catch(InterruptedExceptione){e.printStackTrace();}catch(MemcachedExceptione){e.printStackTrace();}returnobject;}publicstaticvoidsetWithNoReply(Stringkey,intexp,Objectvalue){try{memcachedClient.setWithNoReply(key,exp,value);}catch(InterruptedExceptione){e.printStackTrace();}catch(MemcachedExceptione){e.printStackTrace();}}/***查询联表的业务版本号如果为空,则初始化**@paramprefix*@return*/@SuppressWarnings("unchecked")publicstaticLonggetUnionQueryVersion(Stringprefix){try{Map<String,Long>uqmap=null;GetsResponse<Object>getsresponse=memcachedClient.gets(Constant.UNION_QUERY);if(getsresponse==null){uqmap=newHashMap<String,Long>();Longuqversion=newLong(1);//初始化版本号uqmap.put(prefix,uqversion);if(memcachedClient.cas(Constant.UNION_QUERY,0,uqmap,0)){//检测插入之前是否被修改过returnuqversion;//插入成功}else{//插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询returngetUnionQueryVersion(prefix);}}else{longcas=getsresponse.getCas();Objectuqobj=getsresponse.getValue();if(uqobj==null){//不存在对象uqmap=newHashMap<String,Long>();Longuqversion=newLong(1);//初始化版本号uqmap.put(prefix,uqversion);if(memcachedClient.cas(Constant.UNION_QUERY,0,uqmap,cas)){//检测插入之前是否被修改过returnuqversion;//插入成功}else{//插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询returngetUnionQueryVersion(prefix);}}else{uqmap=(Map<String,Long>)uqobj;Longuqversion=uqmap.get(prefix);if(uqversion==null){//不存在此业务版本uqversion=newLong(1);//初始化版本号uqmap.put(prefix,uqversion);if(memcachedClient.cas(Constant.UNION_QUERY,0,uqmap,cas)){//检测插入之前是否被修改过returnuqversion;//插入成功}else{//插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询returngetUnionQueryVersion(prefix);}}else{returnuqversion;}}}}catch(TimeoutException|InterruptedException|MemcachedExceptione){e.printStackTrace();System.err.println("getUnionQueryVersion---Exception");}return1L;}/***查询单表的业务版本号如果为空,则初始化**@return*/publicstaticLonggetVersion(Stringprefix){try{GetsResponse<Object>getsresponse=memcachedClient.gets(prefix);if(getsresponse==null){Longpfversion=newLong(1);if(memcachedClient.cas(prefix,0,pfversion,0)){returnpfversion;}else{returngetVersion(prefix);}}else{Objectpfobj=getsresponse.getValue();longcas=getsresponse.getCas();if(pfobj==null){Longpfversion=newLong(1);if(memcachedClient.cas(prefix,0,pfversion,cas)){returnpfversion;}else{returngetVersion(prefix);}}else{return(Long)pfobj;}}}catch(TimeoutException|InterruptedException|MemcachedExceptione){e.printStackTrace();System.err.println("getVersion---Exception");}return1L;}/***shufeng更新多表版本号*由于存在线程安全问题,会覆盖uqmap,更新unionquery业务版本号*使用cas方法解决线程安全问题*更新unionquery中key包含p1或p2或p3的version*@paramprefix*/@SuppressWarnings("unchecked")publicstaticvoidupdateUnionQueryVersion(Stringprefix){try{Map<String,Long>uqmap=null;GetsResponse<Object>getsresponse=memcachedClient.gets(Constant.UNION_QUERY);if(getsresponse==null){return;}else{Objectuqobj=getsresponse.getValue();longcas=getsresponse.getCas();if(uqobj==null){return;}else{uqmap=(HashMap<String,Long>)uqobj;Set<String>uqset=uqmap.keySet();//遍历unionquery中的keyIterator<String>quit=uqset.iterator();Stringuqkey="";booleanuqflag=false;while(quit.hasNext()){uqkey=quit.next();if(("|"+uqkey+"|").contains("|"+prefix+"|")){//key中包含prefixuqmap.put(uqkey,uqmap.get(uqkey)+1);//更新mapuqflag=true;}}if(uqflag){if(!memcachedClient.cas(Constant.UNION_QUERY,0,uqmap,cas)){updateUnionQueryVersion(prefix);}}}}}catch(TimeoutException|InterruptedException|MemcachedExceptione){e.printStackTrace();System.err.println("updateUnionQueryVersion---Exception");}}/***更新单表版本号**@paramprefix*@return*/publicstaticvoidupdateVersion(Stringprefix){try{GetsResponse<Object>getsresponse;getsresponse=memcachedClient.gets(prefix);if(getsresponse==null){return;}else{Objectpfobj=getsresponse.getValue();longcas=getsresponse.getCas();if(pfobj==null){return;}else{Longpfversion=(Long)pfobj;pfversion+=1;if(!memcachedClient.cas(prefix,0,pfversion,cas)){updateVersion(prefix);}}}}catch(TimeoutException|InterruptedException|MemcachedExceptione){e.printStackTrace();System.err.println("updateVersion---Exception");}}publicvoidshutdown(){try{memcachedClient.shutdown();}catch(IOExceptione){e.printStackTrace();}}@Overridepublicvoiddestroy()throwsException{shutdown();}}

3,结合spring aop 配置缓存,使用spring aop来切入业务层加入缓存,与业务进行解耦。使用注解进行方便配置。

importjava.lang.reflect.Method;importorg.aspectj.lang.ProceedingJoinPoint;importorg.aspectj.lang.Signature;importorg.aspectj.lang.annotation.Around;importorg.aspectj.lang.annotation.Aspect;importorg.aspectj.lang.annotation.Before;importorg.aspectj.lang.annotation.Pointcut;importorg.aspectj.lang.reflect.MethodSignature;importorg.springframework.stereotype.Component;importcom.alibaba.fastjson.JSON;importcom.node.hlhw.common.cache.XmemCache;importcom.node.hlhw.common.digest.Md5Utils;@Component@AspectpublicclassMemcachedAop{@Pointcut("execution(*com.node.hlhw.*.service.impl.*.*(..))")publicvoidpointcut(){}//方法执行前调用@Before("pointcut()")publicvoidbefore(){}//方法执行的前后调用/****改进建议:使用uuid作为版本号,减少版本号的读取,直接生成uuid,进行缓存*线程安全问题:存在线程安全问题,但是针对于缓存,问题不大。*多线程同一时间重复覆盖一个业务id,还是可以更新缓存**@paramcall*@throwsThrowable*/@Around("pointcut()")publicObjectdoAround(ProceedingJoinPointcall)throwsThrowable{Objectresult=null;//检测是否存在memcached客户端实例if(MemcachedOperate.getClient()==null){System.err.println("memcachedclientnotexist");result=call.proceed();returnresult;}Signaturesignature=call.getSignature();MethodSignaturemethodSignature=(MethodSignature)signature;Methodmethod=methodSignature.getMethod();if(!method.isAnnotationPresent(XmemCache.class)){result=call.proceed();returnresult;}XmemCachexmemcache=method.getAnnotation(XmemCache.class);//获取操作方法intoperation=xmemcache.operation();//获取注解前缀,实际使用是为各个业务包名称,一般为表名Stringprefix=xmemcache.prefix();//无前缀if(prefix==null||"".equals(prefix)){result=call.proceed();returnresult;}//获取注解配置memcached死亡时间秒单位intinterval=xmemcache.interval();switch(operation){case1://1从cache里取值,如果未置入cache,则置入//判断prefix是否涉及多表,查看是否包含|if(prefix.contains("|")){Longuqversion=MemcachedOperate.getUnionQueryVersion(prefix);Stringcombinedkey=generCombinedKey(prefix,uqversion,method,call.getArgs());Objectresultobj=MemcachedOperate.get(combinedkey);if(resultobj==null){result=call.proceed();MemcachedOperate.setWithNoReply(combinedkey,interval,JSON.toJSONString(result));//缓存数据}else{Class<?>returnType=((MethodSignature)signature).getReturnType();result=JSON.parseObject(resultobj.toString(),returnType);}}else{//单表操作Longpfversion=MemcachedOperate.getVersion(prefix);Stringcombinedkey=generCombinedKey(prefix,pfversion,method,call.getArgs());Objectresultobj=MemcachedOperate.get(combinedkey);if(resultobj==null){result=call.proceed();MemcachedOperate.setWithNoReply(combinedkey,interval,JSON.toJSONString(result));//缓存数据}else{Class<?>returnType=((MethodSignature)signature).getReturnType();result=JSON.parseObject(resultobj.toString(),returnType);}}break;case2://2replacecachevaluebreak;case3:break;case4://4removecachekey从cache里删除对应业务版本的缓存/**更新unionquery业务版本号*0,切割prefix为p1、p2、p3*1,更新prefix为p1或p2或p3的version*2,更新unionquery中key包含p1或p2或p3的version*/if(prefix.contains("|")){//表示涉及到多表,需要清除单表的缓存,与联表中包含当前部分的缓存String[]prefixs=prefix.split("\\|");//0.切割prefix为p1、p2、p3for(Stringpf:prefixs){MemcachedOperate.updateVersion(pf);//1,更新prefix为p1或p2或p3的versionMemcachedOperate.updateUnionQueryVersion(pf);}}else{//没有涉及到多表的时候MemcachedOperate.updateVersion(prefix);MemcachedOperate.updateUnionQueryVersion(prefix);}result=call.proceed();break;default:result=call.proceed();break;}returnresult;}/***组装key值*@paramkey*@paramversion*@parammethod*@paramargs*@return*/privateStringgenerCombinedKey(Stringkey,Longversion,Methodmethod,Object[]args){StringBuffersb=newStringBuffer();//获取方法名StringmethodName=method.getName();//获取参数类型Object[]classTemps=method.getParameterTypes();//存入方法名sb.append(methodName);for(inti=0;i<args.length;i++){sb.append(classTemps[i]+"&");if(null==args[i]){sb.append("null");}elseif("".equals(args[i])){sb.append("*");}else{Stringtt=JSON.toJSONString(args[i]);sb.append(tt);}}sb.append(key);sb.append(version.toString());Stringtemp=Md5Utils.getMD5(sb.toString());returntemp;}}

4,properties文件中配置memcached服务器地址

#host1:port1,host2:port2memcached.servers=192.168.1.1:11211,192.168.1.2:11211

5,修改spring配置文件,声明自动为spring容器中那些配置@aspectJ切面的bean创建代理,织入切面。

<aop:aspectj-autoproxyproxy-target-class="true"/>

6,service层使用注解方式切入缓存

importjava.util.Arrays;importjava.util.Date;importjava.util.List;importjava.util.Map;importorg.apache.ibatis.session.RowBounds;importorg.apache.log4j.Logger;importorg.springframework.beans.factory.annotation.Autowired;importcom.alibaba.dubbo.common.utils.StringUtils;importcom.alibaba.dubbo.config.annotation.Service;importcom.node.hlhw.common.cache.XmemCache;importcom.node.hlhw.common.digest.ApplicationUtils;importcom.node.hlhw.core.service.BaseService;importcom.node.hlhw.core.store.IBaseStore;importcom.node.hlhw.core.store.PageParam;importcom.node.hlhw.rbac.api.dao.UserRoleDao;importcom.node.hlhw.rbac.api.entity.UserRole;importcom.node.hlhw.rbac.api.service.UserRoleService;/***@authorMelody*处理用户角色*/@Service(version="1.0.0")publicclassUserRoleServiceImplextendsBaseService<UserRole>implementsUserRoleService{privatestaticfinalLoggerlogger=Logger.getLogger(UserRoleServiceImpl.class);@AutowiredpublicUserRoleDaouserRoleDao;@OverrideprotectedIBaseStore<UserRole>getBaseDao(){returnuserRoleDao;}/**单表操作,prefix为表名,operation为4,只进行缓存的删除操作*/@XmemCache(prefix="userrole",operation=4)publicvoidinsertUserRole(UserRoleuserRole)throwsException{userRoleDao.insertUserRole(userRole);logger.info("插入用户角色数据");}/*(non-Javadoc)*此方法操作了两个表,role与userole,使用‘|’进行分隔*operation为1,表示缓存操作,对结果集进行缓存*interval表示缓存时间默认不填为3600秒,也可指定具体时长*/@Override@XmemCache(prefix="role|userrole",interval=3600,operation=1)publicList<Map<String,Object>>selectUserRoleList(UserRoleuserrole,PageParampageParam)throwsException{RowBoundsrowBounds=newRowBounds(pageParam.getOffset(),pageParam.getLimit());List<Map<String,Object>>list=userRoleDao.selectUserRoleList(userrole,rowBounds);returnlist;}@Override@XmemCache(prefix="userrole",operation=4)publicvoidmodifyUserRole(UserRoleuserrole,String[]roleids)throwsException{//删除所包含的角色userRoleDao.deleteByUserRole(userrole);for(Stringroleid:roleids){if(!StringUtils.isEmpty(roleid)){userrole.setCreatetime(newDate());userrole.setRoleid(roleid);userrole.setUuid(ApplicationUtils.getUUID());userRoleDao.insertUserRole(userrole);}}}@Override@XmemCache(prefix="userrole",operation=1)publicbooleanexistsRef(Stringroleids)throwsException{String[]roleid=roleids.split(",");List<String>roleidlist=Arrays.asList(roleid);returnuserRoleDao.existsRef(roleidlist)>0?true:false;}}