Spring怎么管理控制mybatis事务
Spring怎么管理控制mybatis事务?针对这个问题,今天小编总结这篇有关mybatis事务管理的文章,希望能帮助更多想解决这个问题的朋友找到更加简单易行的办法。
一、 XMLMapperBuilder、mapperProxy 与 mapperMethodmapper 文件是怎么解析的,SqlSessionFactory
这个重要的对象,是的就是我们经常需要配置的:
@Bean@ConditionalOnMissingBeanpublicSqlSessionFactorysqlSessionFactory(DataSourcedataSource)throwsException{//略}
这里面做了很多自动化的配置,当然我们可以通过重写它来自定义我们自己的sqlSessionFactory
,借用一下上篇文章的图片:
spring 借助SqlSessionFactoryBean
来创建sqlSessionFactory
,这可以视作是一个典型的建造者模式,来创建SqlSessionFactory
。
spring 拿到我们配置的 mapper 路径去扫描我们 mapper.xml 然后进行一个循环进行解析(上篇文章第二章节:二、SqlSessionFactory 的初始化与 XMLMapperBuilder):
--代码位于org.mybatis.spring.SqlSessionFactoryBean#buildSqlSessionFactory--if(this.mapperLocations!=null){if(this.mapperLocations.length==0){LOGGER.warn(()->"Property'mapperLocations'wasspecifiedbutmatchingresourcesarenotfound.");}else{for(ResourcemapperLocation:this.mapperLocations){if(mapperLocation==null){continue;}try{XMLMapperBuilderxmlMapperBuilder=newXMLMapperBuilder(mapperLocation.getInputStream(),targetConfiguration,mapperLocation.toString(),targetConfiguration.getSqlFragments());xmlMapperBuilder.parse();}catch(Exceptione){thrownewNestedIOException("Failedtoparsemappingresource:'"+mapperLocation+"'",e);}finally{ErrorContext.instance().reset();}LOGGER.debug(()->"Parsedmapperfile:'"+mapperLocation+"'");}}}else{LOGGER.debug(()->"Property'mapperLocations'wasnotspecified.");}--代码位于org.apache.ibatis.builder.xml.XMLMapperBuilder#parse--publicvoidparse(){if(!configuration.isResourceLoaded(resource)){configurationElement(parser.evalNode("/mapper"));//上篇文章主要说的configuration.addLoadedResource(resource);bindMapperForNamespace();//创建mapperProxy的工厂对象}parsePendingResultMaps();parsePendingCacheRefs();parsePendingStatements();}1.1 从 xml 到 mapperStatement
configurationElement(parser.evalNode("/mapper"));
里面发生的故事,实际上还有后续的步骤,如果对 mybatis 有所了解的,应该知道,mybatis 会为我们的接口创建一个叫做mapperProxy
的代理对象(划重点),其实就是在这后续的步骤bindMapperForNamespace();
做的(不尽然,实际上是创建并绑定了mapperProxyFactory
)。
不贴太多代码,bindMapperForNamespace()
方法里核心做的主要就是调用configuration.addMapper()
方法
if(boundType!=null){if(!configuration.hasMapper(boundType)){//Springmaynotknowtherealresourcenamesowesetaflag//topreventloadingagainthisresourcefromthemapperinterface//lookatMapperAnnotationBuilder#loadXmlResourceconfiguration.addLoadedResource("namespace:"+namespace);configuration.addMapper(boundType);}}
这个boundType
就是我们在 mapper 文件里面指定的namespace
,比如:
<mappernamespace="com.anur.mybatisdemo.test.TrackerConfigMapper">XXXXXXXXXXXXXXXXXX里面写的sql语句,resultMap等等,略</mapper>
在configuration.addMapper()
中调用了mapperRegistry.addMapper()
,看到knowMappers
,这个就是存储我们生产MapperProxy
的工厂映射 map,我们稍微再讲,先继续往下看。
public<T>voidaddMapper(Class<T>type){if(type.isInterface()){if(hasMapper(type)){thrownewBindingException("Type"+type+"isalreadyknowntotheMapperRegistry.");}booleanloadCompleted=false;try{knownMappers.put(type,newMapperProxyFactory<>(type));//It'simportantthatthetypeisaddedbeforetheparserisrun//otherwisethebindingmayautomaticallybeattemptedbythe//mapperparser.Ifthetypeisalreadyknown,itwon'ttry.MapperAnnotationBuilderparser=newMapperAnnotationBuilder(config,type);parser.parse();loadCompleted=true;}finally{if(!loadCompleted){knownMappers.remove(type);}}}}1.2 从注解到 mapperStatement
看到MapperAnnotationBuilder#parse()
,parse()
中主要是对这个接口里面定义的方法做了parseStatement
这件事
for(Methodmethod:methods){try{//issue#237if(!method.isBridge()){parseStatement(method);}}catch(IncompleteElementExceptione){configuration.addIncompleteMethod(newMethodResolver(this,method));}}
parseStatement()
就是解析注解语句的地方,如果说我们没有写 xml,将语句以注解的形式写在方法上,则会在这里进行语句解析。它和我们上篇文章讲到的解析xml很像,就是拿到一大堆属性,比如resultMap
,keyGenerator
等等,生成一个MappedStatement
对象,这里就不赘述了。
voidparseStatement(Methodmethod){Class<?>parameterTypeClass=getParameterType(method);LanguageDriverlanguageDriver=getLanguageDriver(method);SqlSourcesqlSource=getSqlSourceFromAnnotations(method,parameterTypeClass,languageDriver);if(sqlSource!=null){//解析注解式的sql语句,略}}1.3 如果写了 xml,也写了注解会怎么样
我们知道承载mapperStatement
的是一个 map 映射,通过我们在上篇文章中反复强调的id
来作为 key,那么重复添加会出现什么呢?
答案在这里,mybatis
的这个 map 被重写了,同时写这两者的话,会抛出...already contains value for...
的异常
--代码位置org.apache.ibatis.session.Configuration.StrictMap#put--@Override@SuppressWarnings("unchecked")publicVput(Stringkey,Vvalue){if(containsKey(key)){thrownewIllegalArgumentException(name+"alreadycontainsvaluefor"+key+(conflictMessageProducer==null?"":conflictMessageProducer.apply(super.get(key),value)));}if(key.contains(".")){finalStringshortKey=getShortName(key);if(super.get(shortKey)==null){super.put(shortKey,value);}else{super.put(shortKey,(V)newAmbiguity(shortKey));}}returnsuper.put(key,value);}1.4 回到 MapperProxy1.4.1 MapperProxy 的创建
刚才在1.1中我们提到了,mapperProxy
,也就是刚才org.apache.ibatis.binding.MapperRegistry#addMapper
的代码:knownMappers.put(type, new MapperProxyFactory<>(type));
看到MapperProxyFactory
的内部:
--有删减--publicclassMapperProxyFactory<T>{privatefinalClass<T>mapperInterface;privatefinalMap<Method,MapperMethod>methodCache=newConcurrentHashMap<>();publicMapperProxyFactory(Class<T>mapperInterface){this.mapperInterface=mapperInterface;}@SuppressWarnings("unchecked")protectedTnewInstance(MapperProxy<T>mapperProxy){return(T)Proxy.newProxyInstance(mapperInterface.getClassLoader(),newClass[]{mapperInterface},mapperProxy);}publicTnewInstance(SqlSessionsqlSession){finalMapperProxy<T>mapperProxy=newMapperProxy<>(sqlSession,mapperInterface,methodCache);returnnewInstance(mapperProxy);}}
了解JDK动态代理的小伙伴应该很清楚了,newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)
意为,为接口创建一个实现了InvocationHandler
的代理对象。我们在调用接口方法的时候,实际上要看代理类是如何实现的。
那么看看 mapperProxy 的内部的invoke
是如何实现的,这里有三类方法,
一种是一些Object
对象带来的方法,这里不进行代理,直接invoke
,
一种是default方法,一种比较蛋疼的写法,把接口当抽象类写,里面可以放一个default方法写实现,这种代理了也没太大意义
最后一种也就是我们准备代理的方法, 它会为每个非上面两者的方法,懒加载一个MapperMethod
对象,并调用MapperMethod#execute
来执行真正的 mybatis 逻辑。
--有删减--publicclassMapperProxy<T>implementsInvocationHandler,Serializable{publicMapperProxy(SqlSessionsqlSession,Class<T>mapperInterface,Map<Method,MapperMethod>methodCache){this.sqlSession=sqlSession;this.mapperInterface=mapperInterface;this.methodCache=methodCache;}@OverridepublicObjectinvoke(Objectproxy,Methodmethod,Object[]args)throwsThrowable{try{if(Object.class.equals(method.getDeclaringClass())){//来自Object的方法,比如toString()returnmethod.invoke(this,args);}elseif(method.isDefault()){//静态方法,我们可以直接忽略if(privateLookupInMethod==null){returninvokeDefaultMethodJava8(proxy,method,args);}else{returninvokeDefaultMethodJava9(proxy,method,args);}}}catch(Throwablet){throwExceptionUtil.unwrapThrowable(t);}finalMapperMethodmapperMethod=cachedMapperMethod(method);returnmapperMethod.execute(sqlSession,args);}privateMapperMethodcachedMapperMethod(Methodmethod){returnmethodCache.computeIfAbsent(method,k->newMapperMethod(mapperInterface,method,sqlSession.getConfiguration()));}}
MapperMethod
的逻辑是怎么样的,也很好猜到,它的构造函数中创建了两个对象,
publicclassMapperMethod{privatefinalSqlCommandcommand;privatefinalMethodSignaturemethod;publicMapperMethod(Class<?>mapperInterface,Methodmethod,Configurationconfig){this.command=newSqlCommand(config,mapperInterface,method);this.method=newMethodSignature(config,mapperInterface,method);}
sqlCommand
sqlCommand
实际上就是从configuration
里面把它对应的MappedStatement
取出来,持有它的唯一id
和执行类型。
publicstaticclassSqlCommand{privatefinalStringname;privatefinalSqlCommandTypetype;publicSqlCommand(Configurationconfiguration,Class<?>mapperInterface,Methodmethod){finalStringmethodName=method.getName();finalClass<?>declaringClass=method.getDeclaringClass();MappedStatementms=resolveMappedStatement(mapperInterface,methodName,declaringClass,configuration);if(ms==null){if(method.getAnnotation(Flush.class)!=null){name=null;type=SqlCommandType.FLUSH;}else{thrownewBindingException("Invalidboundstatement(notfound):"+mapperInterface.getName()+"."+methodName);}}else{name=ms.getId();type=ms.getSqlCommandType();if(type==SqlCommandType.UNKNOWN){thrownewBindingException("Unknownexecutionmethodfor:"+name);}}}
MethodSignatureMethodSignature
是针对接口返回值、参数等值的解析,比如我们的@Param
注解,就是在new ParamNameResolver(configuration, method);
里面解析的,比较简单,在之前的文章简单概括的mybatis sqlSession 源码解析里也提到过,这里就不多说了。
publicMethodSignature(Configurationconfiguration,Class<?>mapperInterface,Methodmethod){TyperesolvedReturnType=TypeParameterResolver.resolveReturnType(method,mapperInterface);if(resolvedReturnTypeinstanceofClass<?>){this.returnType=(Class<?>)resolvedReturnType;}elseif(resolvedReturnTypeinstanceofParameterizedType){this.returnType=(Class<?>)((ParameterizedType)resolvedReturnType).getRawType();}else{this.returnType=method.getReturnType();}this.returnsVoid=void.class.equals(this.returnType);this.returnsMany=configuration.getObjectFactory().isCollection(this.returnType)||this.returnType.isArray();this.returnsCursor=Cursor.class.equals(this.returnType);this.returnsOptional=Optional.class.equals(this.returnType);this.mapKey=getMapKey(method);this.returnsMap=this.mapKey!=null;this.rowBoundsIndex=getUniqueParamIndex(method,RowBounds.class);this.resultHandlerIndex=getUniqueParamIndex(method,ResultHandler.class);this.paramNameResolver=newParamNameResolver(configuration,method);}1.4.3 MapperMethod 的执行
mapperMethod
就是sqlSession
与mappedStatement
的一个整合。它的执行是一个策略模式:
publicObjectexecute(SqlSessionsqlSession,Object[]args){Objectresult;switch(command.getType()){caseINSERT:{Objectparam=method.convertArgsToSqlCommandParam(args);result=rowCountResult(sqlSession.insert(command.getName(),param));break;}caseUPDATE:{Objectparam=method.convertArgsToSqlCommandParam(args);result=rowCountResult(sqlSession.update(command.getName(),param));break;}caseDELETE:{Objectparam=method.convertArgsToSqlCommandParam(args);result=rowCountResult(sqlSession.delete(command.getName(),param));break;}caseSELECT://略..}
具体是怎么执行的在文章简单概括的mybatis sqlSession 源码解析提到过,这里也不过多赘述。
这里对MapperProxy
在初始化与调用过程中的关系做一下罗列:
为了避免有小伙伴对sqlSession
完全没有概念,这里将接口代码贴出,可以看出sqlSession
是执行语句的一个入口,同时也提供了事务的一些操作,实际上就是如此:
publicinterfaceSqlSessionextendsCloseable{<T>TselectOne(Stringstatement);<T>TselectOne(Stringstatement,Objectparameter);<E>List<E>selectList(Stringstatement);<E>List<E>selectList(Stringstatement,Objectparameter);<E>List<E>selectList(Stringstatement,Objectparameter,RowBoundsrowBounds);<K,V>Map<K,V>selectMap(Stringstatement,StringmapKey);<K,V>Map<K,V>selectMap(Stringstatement,Objectparameter,StringmapKey);<K,V>Map<K,V>selectMap(Stringstatement,Objectparameter,StringmapKey,RowBoundsrowBounds);<T>Cursor<T>selectCursor(Stringstatement);<T>Cursor<T>selectCursor(Stringstatement,Objectparameter);<T>Cursor<T>selectCursor(Stringstatement,Objectparameter,RowBoundsrowBounds);voidselect(Stringstatement,Objectparameter,ResultHandlerhandler);voidselect(Stringstatement,ResultHandlerhandler);voidselect(Stringstatement,Objectparameter,RowBoundsrowBounds,ResultHandlerhandler);intinsert(Stringstatement);intinsert(Stringstatement,Objectparameter);intupdate(Stringstatement);intupdate(Stringstatement,Objectparameter);intdelete(Stringstatement);intdelete(Stringstatement,Objectparameter);voidcommit();voidcommit(booleanforce);voidrollback();voidrollback(booleanforce);List<BatchResult>flushStatements();voidclose();voidclearCache();ConfigurationgetConfiguration();<T>TgetMapper(Class<T>type);ConnectiongetConnection();}3.1 sqlSession 的创建3.1.1 Environment 与 Transaction
首先忘掉 spring 为我们提供的便利,看一下基础的,脱离了 spring 托管的 mybatis 是怎么进行 sql 操作的:
SqlSessionsqlSession=sqlSessionFactory.openSession();TrackerConfigMappermapper=sqlSession.getMapper(TrackerConfigMapper.class);TrackerConfigDOone=mapper.getOne(1);
SqlSessionFactory
有两个子类实现:DefaultSqlSessionFactory
和SqlSessionManager
,SqlSessionManager
使用动态代理 + 静态代理对DefaultSqlSessionFactory
进行了代理,不过不用太在意这个SqlSessionManager
,后面会说明原因。
上面不管怎么代理,实际逻辑的执行者都是DefaultSqlSessionFactory
,我们看看它的创建方法,也就是openSession()
实际执行的方法:
privateSqlSessionopenSessionFromDataSource(ExecutorTypeexecType,TransactionIsolationLevellevel,booleanautoCommit){Transactiontx=null;try{finalEnvironmentenvironment=configuration.getEnvironment();finalTransactionFactorytransactionFactory=getTransactionFactoryFromEnvironment(environment);tx=transactionFactory.newTransaction(environment.getDataSource(),level,autoCommit);finalExecutorexecutor=configuration.newExecutor(tx,execType);returnnewDefaultSqlSession(configuration,executor,autoCommit);}catch(Exceptione){closeTransaction(tx);//mayhavefetchedaconnectionsoletscallclose()throwExceptionFactory.wrapException("Erroropeningsession.Cause:"+e,e);}finally{ErrorContext.instance().reset();}}
environment
可用于数据源切换,那么提到数据源切换,就很容易想到了,连接的相关信息是这货维持的。 所以看到我们的代码:tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
,TransactionFactory
有三个实现,它们分别是JdbcTransactionFactory
、ManagedTransactionFactory
和SpringManagedTransactionFactory
。
JdbcTransactionFactory
和ManagedTransactionFactory
最大的区别就在于ManagedTransactionFactory
实现了空的 commit 与 rollback,源码中这样说道:付与容器来管理transaction
的生命周期,这个博主不是特别熟悉,因为没这么用过,tomcat、jetty 等容器实现了对 jdbc 的代理。要注意,不管如何都是使用的 jdbc 这套接口规范进行数据库操作的。
/***{@linkTransaction}thatletsthecontainermanagethefulllifecycleofthetransaction.*DelaysconnectionretrievaluntilgetConnection()iscalled.*Ignoresallcommitorrollbackrequests.*Bydefault,itclosestheconnectionbutcanbeconfigurednottodoit.**@authorClintonBegin**@seeManagedTransactionFactory*/
Transaction
是 mybatis 创建的一个对象,它实际上是对jdbc
connection
对象的一个封装:
--代码位于org.apache.ibatis.transaction.jdbc.JdbcTransaction--@OverridepublicConnectiongetConnection()throwsSQLException{if(connection==null){openConnection();}returnconnection;}@Overridepublicvoidcommit()throwsSQLException{if(connection!=null&&!connection.getAutoCommit()){if(log.isDebugEnabled()){log.debug("CommittingJDBCConnection["+connection+"]");}connection.commit();}}@Overridepublicvoidrollback()throwsSQLException{if(connection!=null&&!connection.getAutoCommit()){if(log.isDebugEnabled()){log.debug("RollingbackJDBCConnection["+connection+"]");}connection.rollback();}}3.1.2 Executor 与 SqlSession
我们知道 sqlSession 的 四大对象之一,Executor,负责统领全局,从语句获取(从 mappedStatement),到参数拼装(parameterHandler),再到执行语句(statementHandler),最后结果集封装(resultHandler),都是它负责“指挥”的。
我们看到它使用Transaction
进行初始化,另外的一个参数是它的类型,这里不多说,REUSE 是带语句缓存的,和普通的 SimpleExecutor 没有特别大的区别,BATCH 类型则是通过 jdbc 提供的批量提交来对网络请求进行优化。
publicenumExecutorType{SIMPLE,REUSE,BATCH}
最后将持有Transaction
的 Executor 置入SqlSession
,完成一个SqlSession
对象的创建。
可以看到,我们的确是一个SqlSession
对应一个连接(Transaction
),MapperProxy
这个业务接口的动态代理对象又持有一个SqlSession
对象,那么总不可能一直用同一个连接吧?
当然有疑问是好的,而且通过对 SqlSession 初始化过程的剖析,我们已经完善了我们对 mybatis 的认知:
接下来就是来打消这个疑问,MapperProxy
持有的sqlSession
和SqlSessionFactory
创建的这个有什么关系?
实际上答案就在SqlSessionTemplate
,SqlSessionTemplate
是 spring 对 mybatisSqlSessionFactory
的封装,同时,它还是SqlSession
的代理。
SqlSessionTemplate
和 mybatis 提供的SqlSessionManager
(SqlSessionFactory
的另一个实现类,也是DefaultSqlSessionFactory
的代理类,可以细想一下,业务是否共用同一个sqlSession
还要在业务里面去传递,去控制是不是很麻烦) 是一样的思路,不过 spring 直接代理了sqlSession
:
--代码位于org.mybatis.spring.SqlSessionTemplate--privatefinalSqlSessionFactorysqlSessionFactory;privatefinalExecutorTypeexecutorType;privatefinalSqlSessionsqlSessionProxy;privatefinalPersistenceExceptionTranslatorexceptionTranslator;/***ConstructsaSpringmanaged{@codeSqlSession}withthegiven*{@codeSqlSessionFactory}and{@codeExecutorType}.*Acustom{@codeSQLExceptionTranslator}canbeprovidedasan*argumentsoany{@codePersistenceException}thrownbyMyBatis*canbecustomtranslatedtoa{@codeRuntimeException}*The{@codeSQLExceptionTranslator}canalsobenullandthusno*exceptiontranslationwillbedoneandMyBatisexceptionswillbe*thrown**@paramsqlSessionFactoryafactoryofSqlSession*@paramexecutorTypeanexecutortypeonsession*@paramexceptionTranslatoratranslatorofexception*/publicSqlSessionTemplate(SqlSessionFactorysqlSessionFactory,ExecutorTypeexecutorType,PersistenceExceptionTranslatorexceptionTranslator){notNull(sqlSessionFactory,"Property'sqlSessionFactory'isrequired");notNull(executorType,"Property'executorType'isrequired");this.sqlSessionFactory=sqlSessionFactory;this.executorType=executorType;this.exceptionTranslator=exceptionTranslator;this.sqlSessionProxy=(SqlSession)newProxyInstance(SqlSessionFactory.class.getClassLoader(),newClass[]{SqlSession.class},newSqlSessionInterceptor());}
还是熟悉的配方,就是 jdk 的动态代理,SqlSessionTemplate
在初始化时创建了一个SqlSession
代理,也内置了ExecutorType
,SqlSessionFactory
等defaultSqlSession
初始化的必要组件。
想必看到这里,已经有很多小伙伴知道这里是怎么回事了,是的,我们对SqlSession
的操作都是经由这个代理来完成,代理的内部,实现了真正SqlSession
的创建与销毁,回滚与提交等,我们先纵览以下它的代理实现。
对于这种jdk动态代理,我们看到SqlSessionInterceptor#invoke
方法就明了了。我们先过一遍常规的流程,也就是没有使用 spring 事务功能支持,执行完 sql 就直接提交事务的常规操作:
1、getSqlSession()
创建sqlSession
2、执行MapperProxy
,也就是前面讲了一大堆的,MapperProxy
中,通过MapperMethod
来调用sqlSession
和我们生成好的mappedStatement
操作 sql 语句。
3、提交事务
4、关闭事务
注:代码有很大删减
privateclassSqlSessionInterceptorimplementsInvocationHandler{@OverridepublicObjectinvoke(Objectproxy,Methodmethod,Object[]args)throwsThrowable{SqlSessionsqlSession=getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,SqlSessionTemplate.this.executorType,SqlSessionTemplate.this.exceptionTranslator);//创建或者获取真正需要的SqlSessiontry{Objectresult=method.invoke(sqlSession,args);//执行原本想对SqlSession做的事情if(!isSqlSessionTransactional(sqlSession,SqlSessionTemplate.this.sqlSessionFactory)){//forcecommitevenonnon-dirtysessionsbecausesomedatabasesrequire//acommit/rollbackbeforecallingclose()sqlSession.commit(true);//如非spring管理事务,则直接提交}finally{if(sqlSession!=null){closeSqlSession(sqlSession,SqlSessionTemplate.this.sqlSessionFactory);}}}}
注意:注释掉的代码在此类型的操作中没有什么意义,getSqlSession()
在这里只是简单通过sessionFactory
创建了一个sqlSession
:
publicstaticSqlSessiongetSqlSession(SqlSessionFactorysessionFactory,ExecutorTypeexecutorType,PersistenceExceptionTranslatorexceptionTranslator){//SqlSessionHolderholder=(SqlSessionHolder)TransactionSynchronizationManager.getResource(sessionFactory);//SqlSessionsession=sessionHolder(executorType,holder);//if(session!=null){//returnsession;//}LOGGER.debug(()->"CreatinganewSqlSession");session=sessionFactory.openSession(executorType);//registerSessionHolder(sessionFactory,executorType,exceptionTranslator,session);returnsession;}3.2.2 sqlSession 借助 TransactionSynchronizationManager 代理流程赏析
看完前面的实现,有小伙伴会好奇,我的@Transactional注解呢?我的事务传播等级呢?
实际上,除去上述常规流程,更多的是要借助TransactionSynchronizationManager
这个对象来完成,比如刚才步骤一,getSqlSession()
我暂时注释掉的代码里面,有一个很重要的操作:
我们把刚才getSqlSession()
中注释掉的代码再拿回来看看:
SqlSessionHolderholder=(SqlSessionHolder)TransactionSynchronizationManager.getResource(sessionFactory);SqlSessionsession=sessionHolder(executorType,holder);if(session!=null){returnsession;}session=sessionFactory.openSession(executorType);registerSessionHolder(sessionFactory,executorType,exceptionTranslator,session);returnsession;
我们可以看到首先获取一个叫做SqlSessionHolder
的东西,如果里面没有sqlSession
则调用sessionFactory.openSession(executorType);
创建一个,并把它注册到 TransactionSynchronizationManager。
sqlSessionHolder 没什么可说的,它就只是个纯粹的容器,里面主要就是装着一个SqlSession
:
publicSqlSessionHolder(SqlSessionsqlSession,ExecutorTypeexecutorType,PersistenceExceptionTranslatorexceptionTranslator){notNull(sqlSession,"SqlSessionmustnotbenull");notNull(executorType,"ExecutorTypemustnotbenull");this.sqlSession=sqlSession;this.executorType=executorType;this.exceptionTranslator=exceptionTranslator;}
所以说我们只需要把目光焦距在TransactionSynchronizationManager
就可以了,它的内部持有了很多个元素为Map<Object, Object>
的ThreadLocal
(代码示例中只贴出了resources
这一个ThreadLocal
):
publicabstractclassTransactionSynchronizationManager{privatestaticfinalLoglogger=LogFactory.getLog(TransactionSynchronizationManager.class);privatestaticfinalThreadLocal<Map<Object,Object>>resources=newNamedThreadLocal<>("Transactionalresources");@NullablepublicstaticObjectgetResource(Objectkey){ObjectactualKey=TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Objectvalue=doGetResource(actualKey);if(value!=null&&logger.isTraceEnabled()){logger.trace("Retrievedvalue["+value+"]forkey["+actualKey+"]boundtothread["+Thread.currentThread().getName()+"]");}returnvalue;}@NullableprivatestaticObjectdoGetResource(ObjectactualKey){Map<Object,Object>map=resources.get();if(map==null){returnnull;}Objectvalue=map.get(actualKey);//TransparentlyremoveResourceHolderthatwasmarkedasvoid...if(valueinstanceofResourceHolder&&((ResourceHolder)value).isVoid()){map.remove(actualKey);//RemoveentireThreadLocalifempty...if(map.isEmpty()){resources.remove();}value=null;}returnvalue;}
也就是说,spring 的事务,是借助TransactionSynchronizationManager
+SqlSessionHolder
对sqlSession
的控制来实现的。
那么这样就很清晰了,如下总结,也如下图:
MapperProxy
内置的sqlSession
是sqlSessiontemplate
sqlSessiontemplate
通过持有SqlSessionFactory
来创建真正的SqlSession
TransactionSynchronizationManager
+SqlSessionHolder
则扮演着SqlSession
管理的角色
上一个小节只是讲了是什么,没有讲为什么,到了这里如果有好奇宝宝一定会好奇诸如 spring 的一系列事务控制是怎么实现的,当然本文不会讲太多 spring 事务管理相关的太多东西,以后会有后续文章专门剖析事务管理。
我们可以简单看下TransactionInterceptor
,这是@Transactional
注解的代理类。
/***AOPAllianceMethodInterceptorfordeclarativetransaction*managementusingthecommonSpringtransactioninfrastructure*({@linkorg.springframework.transaction.PlatformTransactionManager}/*{@linkorg.springframework.transaction.ReactiveTransactionManager}).**<p>Derivesfromthe{@linkTransactionAspectSupport}classwhich*containstheintegrationwithSpring'sunderlyingtransactionAPI.*TransactionInterceptorsimplycallstherelevantsuperclassmethods*suchas{@link#invokeWithinTransaction}inthecorrectorder.**<p>TransactionInterceptorsarethread-safe.**@authorRodJohnson*@authorJuergenHoeller*@seeTransactionProxyFactoryBean*@seeorg.springframework.aop.framework.ProxyFactoryBean*@seeorg.springframework.aop.framework.ProxyFactory*/@SuppressWarnings("serial")publicclassTransactionInterceptorextendsTransactionAspectSupportimplementsMethodInterceptor,Serializable{/***CreateanewTransactionInterceptor.*<p>Transactionmanagerandtransactionattributesstillneedtobeset.*@see#setTransactionManager*@see#setTransactionAttributes(java.util.Properties)*@see#setTransactionAttributeSource(TransactionAttributeSource)*/publicTransactionInterceptor(){}@Override@NullablepublicObjectinvoke(MethodInvocationinvocation)throwsThrowable{//Workoutthetargetclass:maybe{@codenull}.//TheTransactionAttributeSourceshouldbepassedthetargetclass//aswellasthemethod,whichmaybefromaninterface.Class<?>targetClass=(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null);//AdapttoTransactionAspectSupport'sinvokeWithinTransaction...returninvokeWithinTransaction(invocation.getMethod(),targetClass,invocation::proceed);}
可以看到它的代理方法invoke()
的执行逻辑在invokeWithinTransaction()
里:
--代码位于org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction--@NullableprotectedObjectinvokeWithinTransaction(Methodmethod,@NullableClass<?>targetClass,finalInvocationCallbackinvocation)throwsThrowable{//Ifthetransactionattributeisnull,themethodisnon-transactional.TransactionAttributeSourcetas=getTransactionAttributeSource();finalTransactionAttributetxAttr=(tas!=null?tas.getTransactionAttribute(method,targetClass):null);finalTransactionManagertm=determineTransactionManager(txAttr);if(this.reactiveAdapterRegistry!=null&&tminstanceofReactiveTransactionManager){//响应式事务相关}PlatformTransactionManagerptm=asPlatformTransactionManager(tm);finalStringjoinpointIdentification=methodIdentification(method,targetClass,txAttr);if(txAttr==null||!(ptminstanceofCallbackPreferringPlatformTransactionManager)){//StandardtransactiondemarcationwithgetTransactionandcommit/rollbackcalls.TransactionInfotxInfo=createTransactionIfNecessary(ptm,txAttr,joinpointIdentification);ObjectretVal;try{//Thisisanaroundadvice:Invokethenextinterceptorinthechain.//Thiswillnormallyresultinatargetobjectbeinginvoked.retVal=invocation.proceedWithInvocation();}catch(Throwableex){//targetinvocationexceptioncompleteTransactionAfterThrowing(txInfo,ex);throwex;}finally{cleanupTransactionInfo(txInfo);}if(vavrPresent&&VavrDelegate.isVavrTry(retVal)){//Setrollback-onlyincaseofVavrfailurematchingourrollbackrules...TransactionStatusstatus=txInfo.getTransactionStatus();if(status!=null&&txAttr!=null){retVal=VavrDelegate.evaluateTryFailure(retVal,txAttr,status);}}commitTransactionAfterReturning(txInfo);returnretVal;}else{//CallbackPreferringPlatformTransactionManager的处理逻辑}}
invokeWithinTransaction()
的代码虽然长,我们还是把它分段来看:
第一部分,准备阶段
也就是这部分代码:
//Ifthetransactionattributeisnull,themethodisnon-transactional.TransactionAttributeSourcetas=getTransactionAttributeSource();finalTransactionAttributetxAttr=(tas!=null?tas.getTransactionAttribute(method,targetClass):null);finalTransactionManagertm=determineTransactionManager(txAttr);PlatformTransactionManagerptm=asPlatformTransactionManager(tm);finalStringjoinpointIdentification=methodIdentification(method,targetClass,txAttr);
获取TransactionAttribute
(TransactionDefinition
(底层接口),这里面装载了事务传播等级,隔离级别等属性。TransactionAttribute
的创建依据配置,或者我们的事务传播等级注解,对什么异常进行回滚等,后续会继续对它的应用做说明,PlatformTransactionManager
则是进行事务管理的主要操作者。
第二部分,事务开启或者获取与准备,也就是我们执行逻辑的第一行代码createTransactionIfNecessary()
(是不是和前面说到的 SqlSession的创建或者获取很像?)
我们可以看到createTransactionIfNecessary()
的实现就做了两件事,其一是获取一个叫做TransactionStatus
的东西,另外则是调用prepareTransactionInfo()
,获取一个TransactionInfo
:
//StandardtransactiondemarcationwithgetTransactionandcommit/rollbackcalls.TransactionInfotxInfo=createTransactionIfNecessary(ptm,txAttr,joinpointIdentification);--代码位于org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary--protectedTransactionInfocreateTransactionIfNecessary(@NullablePlatformTransactionManagertm,@NullableTransactionAttributetxAttr,finalStringjoinpointIdentification){TransactionStatusstatus=tm.getTransaction(txAttr);returnprepareTransactionInfo(tm,txAttr,joinpointIdentification,status);}
先看看第一件事,也就是获取TransactionStatus
,它保存了事务的savePoint
,是否新事物等。删减掉一些判断方法,代码如下:
publicfinalTransactionStatusgetTransaction(@NullableTransactionDefinitiondefinition)throwsTransactionException{//Usedefaultsifnotransactiondefinitiongiven.TransactionDefinitiondef=(definition!=null?definition:TransactionDefinition.withDefaults());Objecttransaction=doGetTransaction();booleandebugEnabled=logger.isDebugEnabled();if(isExistingTransaction(transaction)){//Existingtransactionfound->checkpropagationbehaviortofindouthowtobehave.returnhandleExistingTransaction(def,transaction,debugEnabled);}if(def.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRED||def.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRES_NEW||def.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NESTED){SuspendedResourcesHoldersuspendedResources=suspend(null);try{booleannewSynchronization=(getTransactionSynchronization()!=SYNCHRONIZATION_NEVER);DefaultTransactionStatusstatus=newTransactionStatus(def,transaction,true,newSynchronization,debugEnabled,suspendedResources);doBegin(transaction,def);prepareSynchronization(status,def);returnstatus;}catch(RuntimeException|Errorex){resume(null,suspendedResources);throwex;}}else{//Create"empty"transaction:noactualtransaction,butpotentiallysynchronization.if(def.getIsolationLevel()!=TransactionDefinition.ISOLATION_DEFAULT&&logger.isWarnEnabled()){logger.warn("Customisolationlevelspecifiedbutnoactualtransactioninitiated;"+"isolationlevelwilleffectivelybeignored:"+def);}booleannewSynchronization=(getTransactionSynchronization()==SYNCHRONIZATION_ALWAYS);returnprepareTransactionStatus(def,null,true,newSynchronization,debugEnabled,null);}}
代码很长,但是不急,我们可以简单看出它分为两个部分:
第一部分是获取事务doGetTransaction()
第二部分则是判断是否新事物,
则TransactionDefinition.PROPAGATION_REQUIRED
、TransactionDefinition.PROPAGATION_REQUIRES_NEW
、TransactionDefinition.PROPAGATION_NESTED
是一种逻辑
其余是另一种逻辑,信息量有点大,但是慢慢来:
如果不是新事物,则执行handleExistingTransaction
,
如果是新事物
4.2.1 doGetTransactionprotectedObjectdoGetTransaction(){DataSourceTransactionObjecttxObject=newDataSourceTransactionObject();txObject.setSavepointAllowed(isNestedTransactionAllowed());ConnectionHolderconHolder=(ConnectionHolder)TransactionSynchronizationManager.getResource(obtainDataSource());txObject.setConnectionHolder(conHolder,false);returntxObject;}
doGetTransaction
获取我们的事务对象,这里也使用了TransactionSynchronizationManager
(前面说到的SqlSession
的管理类),事务对象会尝试获取本事务所使用的连接对象,这个和事务传播等级有关,先立个 flag。
我们可以看到这里面主要逻辑就是去获取ConnectionHolder
,实际上很简单,只要能获取到,就是已经存在的事务,获取不到(或者事务已经关闭)就是新事物。
如果说前面无法从TransactionSynchronizationManager
获取到conHolder
,或者说,我们的线程中并没有ConnectionHolder
那么将会进入此分支,此分支的支持的三个事务传播等级TransactionDefinition.PROPAGATION_REQUIRED
、TransactionDefinition.PROPAGATION_REQUIRES_NEW
、TransactionDefinition.PROPAGATION_NESTED
都是需要创建新事务的,所以它们在同一个分支里面:
SuspendedResourcesHoldersuspendedResources=suspend(null);booleannewSynchronization=(getTransactionSynchronization()!=SYNCHRONIZATION_NEVER);DefaultTransactionStatusstatus=newTransactionStatus(def,transaction,true,newSynchronization,debugEnabled,suspendedResources);doBegin(transaction,def);prepareSynchronization(status,def);returnstatus;
SuspendedResourcesHolder
与事务的挂起相关,doBegin()
则是对连接对象connection
的获取和配置,prepareSynchronization()
则是对新事物的一些初始化操作。我们一点点看:
/***Thisimplementationsetstheisolationlevelbutignoresthetimeout.*/@OverrideprotectedvoiddoBegin(Objecttransaction,TransactionDefinitiondefinition){DataSourceTransactionObjecttxObject=(DataSourceTransactionObject)transaction;Connectioncon=null;if(!txObject.hasConnectionHolder()||txObject.getConnectionHolder().isSynchronizedWithTransaction()){ConnectionnewCon=obtainDataSource().getConnection();if(logger.isDebugEnabled()){logger.debug("AcquiredConnection["+newCon+"]forJDBCtransaction");}txObject.setConnectionHolder(newConnectionHolder(newCon),true);}txObject.getConnectionHolder().setSynchronizedWithTransaction(true);con=txObject.getConnectionHolder().getConnection();IntegerpreviousIsolationLevel=DataSourceUtils.prepareConnectionForTransaction(con,definition);txObject.setPreviousIsolationLevel(previousIsolationLevel);txObject.setReadOnly(definition.isReadOnly());//Switchtomanualcommitifnecessary.ThisisveryexpensiveinsomeJDBCdrivers,//sowedon'twanttodoitunnecessarily(forexampleifwe'veexplicitly//configuredtheconnectionpooltosetitalready).if(con.getAutoCommit()){txObject.setMustRestoreAutoCommit(true);if(logger.isDebugEnabled()){logger.debug("SwitchingJDBCConnection["+con+"]tomanualcommit");}con.setAutoCommit(false);}prepareTransactionalConnection(con,definition);txObject.getConnectionHolder().setTransactionActive(true);//Bindtheconnectionholdertothethread.if(txObject.isNewConnectionHolder()){TransactionSynchronizationManager.bindResource(obtainDataSource(),txObject.getConnectionHolder());}}}
可以看到,ConnectionHolder
的创建和连接的打开就是在这里进行的,创建后,设置其隔离级别,取消connection
的自动提交,将提交操作纳入到 spring 管理,并且将其存到TransactionSynchronizationManager
使得4.2.1 提到的 doGetTransaction()
可以拿到此ConnectionHolder
。
做完连接的获取与配置后,下一步就是对事物的一些初始化:
/***Initializetransactionsynchronizationasappropriate.*/protectedvoidprepareSynchronization(DefaultTransactionStatusstatus,TransactionDefinitiondefinition){if(status.isNewSynchronization()){TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel()!=TransactionDefinition.ISOLATION_DEFAULT?definition.getIsolationLevel():null);TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());TransactionSynchronizationManager.initSynchronization();}}
这个代码都是代码字面意义的简单设置,就不赘述了。
4.2.3 新事物的处理之创建一个虚假的事务对象刚才讲的是 “无法从TransactionSynchronizationManager
获取到conHolder
”,并且属于一些需要创建新事物的传播等级的情况。
如果说方才没有事务,也不需要创建新的事务,则会进入此分支,创建一个空的TransactionStatus
,内部的事务对象为空,代码很简单就不贴了,有兴趣可以去看看org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
的最后一个分支。
刚才说的都是无法获取到conHolder
的情况,如果获取到了,则又是另一套代码了,handleExistingTransaction
很长,它的第一个部分是对传播等级的控制,有兴趣的小伙伴可以去看看源码,我这里只挑一个简单的传播等级PROPAGATION_NESTED_NEW
做说明(其他的会在专门的事务一期做讲解):
--代码位于org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction--privateTransactionStatushandleExistingTransaction(TransactionDefinitiondefinition,Objecttransaction,booleandebugEnabled)throwsTransactionException{if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRES_NEW){if(debugEnabled){logger.debug("Suspendingcurrenttransaction,creatingnewtransactionwithname["+definition.getName()+"]");}SuspendedResourcesHoldersuspendedResources=suspend(transaction);try{booleannewSynchronization=(getTransactionSynchronization()!=SYNCHRONIZATION_NEVER);DefaultTransactionStatusstatus=newTransactionStatus(definition,transaction,true,newSynchronization,debugEnabled,suspendedResources);doBegin(transaction,definition);prepareSynchronization(status,definition);returnstatus;}catch(RuntimeException|ErrorbeginEx){resumeAfterBeginException(transaction,suspendedResources,beginEx);throwbeginEx;}}...略}
我们可以发现和4.2.2 新事物的处理
代码是一样的,唯一的区别就是此TransactionStatus
对象会真正内嵌一个事务挂起对象SuspendedResourcesHolder
。
拿到TransactionStatus
之后,prepareTransactionInfo()
里简单的将刚才那些PlatformTransactionManager
、TransactionAttribute
、TransactionStatus
包装成一个TransactionInfo
对象,并将其保存在ThreadLocal
中,这个bindToThread()
还会将当前已经持有的TransactionInfo
对象暂存。
protectedTransactionInfoprepareTransactionInfo(@NullablePlatformTransactionManagertm,@NullableTransactionAttributetxAttr,StringjoinpointIdentification,@NullableTransactionStatusstatus){TransactionInfotxInfo=newTransactionInfo(tm,txAttr,joinpointIdentification);if(txAttr!=null){//Thetransactionmanagerwillflaganerrorifanincompatibletxalreadyexists.txInfo.newTransactionStatus(status);}//WealwaysbindtheTransactionInfotothethread,evenifwedidn'tcreate//anewtransactionhere.ThisguaranteesthattheTransactionInfostack//willbemanagedcorrectlyevenifnotransactionwascreatedbythisaspect.txInfo.bindToThread();returntxInfo;}
到这里思路就很清晰了,代理为我们做的事情就是生成了一个叫做TransactionInfo
的东西,里面的TransactionManager
可以使得 spring 去对最底层的connection
对象做一些回滚,提交操作。TransactionStatus
则保存挂起的事务的信息,以及当前事务的一些状态,如下图:
让我们回到第四节开头的那段很长的代码,到这里是不是很明了了:
protectedObjectinvokeWithinTransaction(Methodmethod,@NullableClass<?>targetClass,finalInvocationCallbackinvocation)throwsThrowable{//Ifthetransactionattributeisnull,themethodisnon-transactional.TransactionAttributeSourcetas=getTransactionAttributeSource();finalTransactionAttributetxAttr=(tas!=null?tas.getTransactionAttribute(method,targetClass):null);finalTransactionManagertm=determineTransactionManager(txAttr);PlatformTransactionManagerptm=asPlatformTransactionManager(tm);finalStringjoinpointIdentification=methodIdentification(method,targetClass,txAttr);if(txAttr==null||!(ptminstanceofCallbackPreferringPlatformTransactionManager)){//StandardtransactiondemarcationwithgetTransactionandcommit/rollbackcalls.TransactionInfotxInfo=createTransactionIfNecessary(ptm,txAttr,joinpointIdentification);ObjectretVal;try{//Thisisanaroundadvice:Invokethenextinterceptorinthechain.//Thiswillnormallyresultinatargetobjectbeinginvoked.retVal=invocation.proceedWithInvocation();}catch(Throwableex){//targetinvocationexceptioncompleteTransactionAfterThrowing(txInfo,ex);throwex;}finally{cleanupTransactionInfo(txInfo);}if(vavrPresent&&VavrDelegate.isVavrTry(retVal)){//Setrollback-onlyincaseofVavrfailurematchingourrollbackrules...TransactionStatusstatus=txInfo.getTransactionStatus();if(status!=null&&txAttr!=null){retVal=VavrDelegate.evaluateTryFailure(retVal,txAttr,status);}}commitTransactionAfterReturning(txInfo);returnretVal;}}
1、获取 TransactionInfo
2、执行切面
3、将之前挂起的TransactionInfo
找回:
privatevoidbindToThread(){//ExposecurrentTransactionStatus,preservinganyexistingTransactionStatus//forrestorationafterthistransactioniscomplete.this.oldTransactionInfo=transactionInfoHolder.get();transactionInfoHolder.set(this);}privatevoidrestoreThreadLocalStatus(){//UsestacktorestoreoldtransactionTransactionInfo.//Willbenullifnonewasset.transactionInfoHolder.set(this.oldTransactionInfo);}
4、如果需要,则提交当前事务
5、返回切面值
4.5 最后一块拼图,spring 如何与 sqlSession 产生关联:我们在第三章讲到,mybatis有一个叫做defualtSqlSessionFactory
的类,负责创建sqlSession
,但是它和 spring 又是怎么产生关联的呢?
答案就在于,spring 实现了自己的TransactionFactory
,以及自己的Transaction
对象SpringManagedTransaction
。回顾一下SqlSession
的创建过程:
privateSqlSessionopenSessionFromDataSource(ExecutorTypeexecType,TransactionIsolationLevellevel,booleanautoCommit){Transactiontx=null;try{finalEnvironmentenvironment=configuration.getEnvironment();finalTransactionFactorytransactionFactory=getTransactionFactoryFromEnvironment(environment);tx=transactionFactory.newTransaction(environment.getDataSource(),level,autoCommit);finalExecutorexecutor=configuration.newExecutor(tx,execType);returnnewDefaultSqlSession(configuration,executor,autoCommit);}catch(Exceptione){closeTransaction(tx);//mayhavefetchedaconnectionsoletscallclose()throwExceptionFactory.wrapException("Erroropeningsession.Cause:"+e,e);}finally{ErrorContext.instance().reset();}}
看一下SpringManagedTransaction
是如何管理connection
的:
privatevoidopenConnection()throwsSQLException{this.connection=DataSourceUtils.getConnection(this.dataSource);this.autoCommit=this.connection.getAutoCommit();this.isConnectionTransactional=DataSourceUtils.isConnectionTransactional(this.connection,this.dataSource);LOGGER.debug(()->"JDBCConnection["+this.connection+"]will"+(this.isConnectionTransactional?"":"not")+"bemanagedbySpring");}
DataSourceUtils.getConnection(this.dataSource);
划重点,里面的实现不用我多说了,我们可以看到熟悉的身影,也就是ConnectionHolder
,连接是从这里(优先)拿的:
ConnectionHolderconHolder=(ConnectionHolder)TransactionSynchronizationManager.getResource(dataSource);if(conHolder!=null&&(conHolder.hasConnection()||conHolder.isSynchronizedWithTransaction())){conHolder.requested();if(!conHolder.hasConnection()){logger.debug("FetchingresumedJDBCConnectionfromDataSource");conHolder.setConnection(fetchConnection(dataSource));}returnconHolder.getConnection();}
更新整套体系图:
我们整体简单过一次:
mybatis 启动时根据xml、注解创建了mapperedStatement
,用于sql执行,创建了SqlSessionFactory
用于创建SqlSession
对象。
mybatis 启动时创建了MapperProxyFactory
用于创建接口的代理对象MapperProxy
在创建MapperProxy
时,spring 为其注入了一个sqlSession
用于 sql执行,但是这个sqlSession
是一个代理对象,叫做sqlSessionTemplate
,它会自动选择我们该使用哪个sqlSession
去执行
在执行时,spring 切面在执行事务之前,会创建一个叫做TransactionInfo
的对象,此对象会根据事务传播等级来控制是否创建新连接,是否挂起上一个连接,将信息保存在TransactionSynchronizationManager
到了真正需要创建或者获取sqlSession
时,spring 重写的TransactionFactory
会优先去TransactionSynchronizationManager
中拿连接对象。
看完上述内容,你们对mybatis事务的管理控制有进一步的了解吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读。
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。