本篇内容介绍了“PostgreSQL中函数AssignTransactionId的实现逻辑是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、数据结构

静态变量
当前事务状态CurrentTransactionState

/**CurrentTransactionStatealwayspointstothecurrenttransactionstate*block.ItwillpointtoTopTransactionStateDatawhennotina*transactionatall,orwheninatop-leveltransaction.*CurrentTransactionState通常指向当前事务块.*如不处于事务中或者处于顶层事务中,则指向TopTransactionStateData*/staticTransactionStateDataTopTransactionStateData={.state=TRANS_DEFAULT,.blockState=TBLOCK_DEFAULT,};/**unreportedXidsholdsXIDsofallsubtransactionsthathavenotyetbeen*reportedinanXLOG_XACT_ASSIGNMENTrecord.*unreportedXids保存所有尚未在XLOG_XACT_ASSIGNMENT记录的子事务.*/staticintnUnreportedXids;staticTransactionIdunreportedXids[PGPROC_MAX_CACHED_SUBXIDS];staticTransactionStateCurrentTransactionState=&TopTransactionStateData;/**ThesubtransactionIDandcommandIDassignmentcountersareglobal*toawholetransaction,sowedonotkeeptheminthestatestack.*subtransactionID和commandID全局计数器,对事务可见,在state栈中不记录这些信息.*/staticSubTransactionIdcurrentSubTransactionId;staticCommandIdcurrentCommandId;staticboolcurrentCommandIdUsed;

TransactionState
事务状态结构体

/**transactionstates-transactionstatefromserverperspective*事务状态枚举-服务器视角的事务状态*/typedefenumTransState{TRANS_DEFAULT,/*idle空闲*/TRANS_START,/*transactionstarting事务启动*/TRANS_INPROGRESS,/*insideavalidtransaction进行中*/TRANS_COMMIT,/*commitinprogress提交中*/TRANS_ABORT,/*abortinprogress回滚中*/TRANS_PREPARE/*prepareinprogress准备中*/}TransState;/**transactionblockstates-transactionstateofclientqueries*事务块状态-客户端查询的事务状态**Note:thesubtransactionstatesareusedonlyfornon-topmost*transactions;theothersappearonlyinthetopmosttransaction.*注意:subtransaction只用于非顶层事务;其他字段用于顶层事务.*/typedefenumTBlockState{/*not-in-transaction-blockstates未进入事务块状态*/TBLOCK_DEFAULT,/*idle空闲*/TBLOCK_STARTED,/*runningsingle-querytransaction单个查询事务*//*transactionblockstates事务块状态*/TBLOCK_BEGIN,/*startingtransactionblock开始事务块*/TBLOCK_INPROGRESS,/*livetransaction进行中*/TBLOCK_IMPLICIT_INPROGRESS,/*livetransactionafterimplicitBEGIN隐式事务,进行中*/TBLOCK_PARALLEL_INPROGRESS,/*livetransactioninsideparallelworker并行worker中的事务,进行中*/TBLOCK_END,/*COMMITreceived接收到COMMIT*/TBLOCK_ABORT,/*failedxact,awaitingROLLBACK失败,等待ROLLBACK*/TBLOCK_ABORT_END,/*failedxact,ROLLBACKreceived失败,已接收ROLLBACK*/TBLOCK_ABORT_PENDING,/*livexact,ROLLBACKreceived进行中,接收到ROLLBACK*/TBLOCK_PREPARE,/*livexact,PREPAREreceived进行中,接收到PREPARE*//*subtransactionstates子事务状态*/TBLOCK_SUBBEGIN,/*startingasubtransaction开启*/TBLOCK_SUBINPROGRESS,/*livesubtransaction进行中*/TBLOCK_SUBRELEASE,/*RELEASEreceived接收到RELEASE*/TBLOCK_SUBCOMMIT,/*COMMITreceivedwhileTBLOCK_SUBINPROGRESS进行中,接收到COMMIT*/TBLOCK_SUBABORT,/*failedsubxact,awaitingROLLBACK失败,等待ROLLBACK*/TBLOCK_SUBABORT_END,/*failedsubxact,ROLLBACKreceived失败,已接收ROLLBACK*/TBLOCK_SUBABORT_PENDING,/*livesubxact,ROLLBACKreceived进行中,接收到ROLLBACK*/TBLOCK_SUBRESTART,/*livesubxact,ROLLBACKTOreceived进行中,接收到ROLLBACKTO*/TBLOCK_SUBABORT_RESTART/*failedsubxact,ROLLBACKTOreceived失败,已接收ROLLBACKTO*/}TBlockState;/**transactionstatestructure*事务状态结构体*/typedefstructTransactionStateData{//事务IDTransactionIdtransactionId;/*myXID,orInvalidifnone*///子事务IDSubTransactionIdsubTransactionId;/*mysubxactID*///保存点名称char*name;/*savepointname,ifany*///保存点级别intsavepointLevel;/*savepointlevel*///低级别的事务状态TransStatestate;/*low-levelstate*///高级别的事务状态TBlockStateblockState;/*high-levelstate*///事务嵌套深度intnestingLevel;/*transactionnestingdepth*///GUC上下文嵌套深度intgucNestLevel;/*GUCcontextnestingdepth*///事务生命周期上下文MemoryContextcurTransactionContext;/*myxact-lifetimecontext*///查询资源ResourceOwnercurTransactionOwner;/*myqueryresources*///按XID顺序保存的已提交的子事务IDTransactionId*childXids;/*subcommittedchildXIDs,inXIDorder*///childXids数组大小intnChildXids;/*#ofsubcommittedchildXIDs*///分配的childXids数组空间intmaxChildXids;/*allocatedsizeofchildXids[]*///上一个CurrentUserIdOidprevUser;/*previousCurrentUserIdsetting*///上一个SecurityRestrictionContextintprevSecContext;/*previousSecurityRestrictionContext*///上一事务是否只读?boolprevXactReadOnly;/*entry-timexactr/ostate*///是否处于Recovery?boolstartedInRecovery;/*didwestartinrecovery?*///XID是否已保存在WALRecord中?booldidLogXid;/*hasxidbeenincludedinWALrecord?*///Enter/ExitParallelMode计数器intparallelModeLevel;/*Enter/ExitParallelModecounter*///父事务状态structTransactionStateData*parent;/*backlinktoparent*/}TransactionStateData;//结构体指针typedefTransactionStateData*TransactionState;二、源码解读

AssignTransactionId函数,给定的TransactionState分配一个新的持久化事务号XID,在此函数调用前,不会为事务分配XIDs.

/**AssignTransactionId**AssignsanewpermanentXIDtothegivenTransactionState.*WedonotassignXIDstotransactionsuntil/unlessthisiscalled.*Also,anyparentTransactionStatesthatdon'tyethaveXIDsareassigned*one;thismaintainstheinvariantthatachildtransactionhasanXID*followingitsparent's.*为给定的TransactionState分配一个新的持久化事务号XID.*在此函数调用前,我们不会为事务分配XIDs.*同时,所有尚未获得XIDs的父TransactionStates也会分配事务号,*这可以确保子事务的事务号在父事务之后.*/staticvoidAssignTransactionId(TransactionStates){boolisSubXact=(s->parent!=NULL);ResourceOwnercurrentOwner;boollog_unknown_top=false;/*Assertthatcallerdidn'tscrewup*///确保调用者没有搞砸Assert(!TransactionIdIsValid(s->transactionId));Assert(s->state==TRANS_INPROGRESS);/**Workerssynchronizetransactionstateatthebeginningofeachparallel*operation,sowecan'taccountfornewXIDsatthispoint.*在每个并行操作前,ParallelWorkers同步事务状态,*因此我们不能在这时候请求XIDs*/if(IsInParallelMode()||IsParallelWorker())elog(ERROR,"cannotassignXIDsduringaparalleloperation");/**Ensureparent(s)haveXIDs,sothatachildalwayshasanXIDlater*thanitsparent.Mustn'trecursehere,orwemightgetastackoverflow*ifwe'reatthebottomofahugestackofsubtransactionsnoneofwhich*haveXIDsyet.*确保父事务已分配XIDs,这样可以确保子事务的XID后于父事务.*不能在这里递归执行,否则如果我们在一个未分配XID子事务大栈的底部,可能会遇到栈溢出*/if(isSubXact&&!TransactionIdIsValid(s->parent->transactionId)){TransactionStatep=s->parent;TransactionState*parents;size_tparentOffset=0;parents=palloc(sizeof(TransactionState)*s->nestingLevel);while(p!=NULL&&!TransactionIdIsValid(p->transactionId)){parents[parentOffset++]=p;p=p->parent;}/**Thisistechnicallyarecursivecall,buttherecursionwillnever*bemorethanonelayerdeep.*递归调用,但递归不会超过一层*/while(parentOffset!=0)AssignTransactionId(parents[--parentOffset]);pfree(parents);}/**Whenwal_level=logical,guaranteethatasubtransaction'sxidcanonly*beseenintheWALstreamifitstoplevelxidhasbeenloggedbefore.*Ifnecessaryweloganxact_assignmentrecordwithfewerthan*PGPROC_MAX_CACHED_SUBXIDS.NotethatitisfineifdidLogXidisn'tset*foratransactioneventhoughitappearsinaWALrecord,wejustmight*superfluouslylogsomething.Thatcanhappenwhenanxidisincluded*somewhereinsideawalrecord,butnotinXLogRecord->xl_xid,likein*xl_standby_locks.*如wal_level=logical,确保子事务XID在顶层XID已被"日志"的情况只可以被WALstream看到.*如果有必要,我们将使用小于PGPROC_MAX_CACHED_SUBXIDS的值来记录xact_assignment记录.*请注意,即使didLogXid出现在WAL记录中,如果它没有为事务设置,也没有问题,*我们可能只是多余地记录了一些东西。*当一个xid出现在WALRecord中的某个地方,但不在XLogRecord->xl_xid中*(比如在xl_standby_locks中)时,就会发生这种情况。*/if(isSubXact&&XLogLogicalInfoActive()&&!TopTransactionStateData.didLogXid)log_unknown_top=true;/**GenerateanewXidandrecorditinPG_PROCandpg_subtrans.*生成一个新的XID,并记录在PG_PROC和pg_subtrans中**NB:wemustmakethesubtransentryBEFOREtheXidappearsanywherein*sharedstorageotherthanPG_PROC;becauseifthere'snoroomforitin*PG_PROC,thesubtransentryisneededtoensurethatotherbackendssee*theXidas"running".SeeGetNewTransactionId.*注意:我们必须在Xid出现在除PG_PROC之外的共享存储之前构造subtrans条目.*因为如果在PG_PROC没有空闲空间,子事务条目需要确保其他进程看到该XID正在运行.*参考函数GetNewTransactionId说明.**/s->transactionId=GetNewTransactionId(isSubXact);if(!isSubXact)XactTopTransactionId=s->transactionId;if(isSubXact)SubTransSetParent(s->transactionId,s->parent->transactionId);/**Ifit'satop-leveltransaction,thepredicatelockingsystemneedsto*betoldaboutittoo.*如为顶层事务,谓词锁系统也需要了解此事务.*/if(!isSubXact)RegisterPredicateLockingXid(s->transactionId);/**AcquirelockonthetransactionXID.(Weassumethiscannotblock.)We*havetoensurethatthelockisassignedtothetransaction'sown*ResourceOwner.*请求锁(我们假定这样做不好引起阻塞).我们必须确保锁已被分配给事务自己的ResourceOwner.*/currentOwner=CurrentResourceOwner;CurrentResourceOwner=s->curTransactionOwner;XactLockTableInsert(s->transactionId);CurrentResourceOwner=currentOwner;/**EveryPGPROC_MAX_CACHED_SUBXIDSassignedtransactionidswithineach*top-leveltransactionweissueaWALrecordfortheassignment.We*includethetop-levelxidandallthesubxidsthathavenotyetbeen*reportedusingXLOG_XACT_ASSIGNMENTrecords.*在每个顶级事务中分配的每个PGPROC_MAX_CACHED_SUBXIDS事务id,*我们都会为分配记录一条WAL记录。*该记录包括顶级的xid和所有尚未使用XLOG_XACT_ASSIGNMENT记录报告的子xid。**Thisisrequiredtolimittheamountofsharedmemoryrequiredinahot*standbyservertokeeptrackofin-progressXIDs.Seenotesfor*RecordKnownAssignedTransactionIds().*在跟踪进行中的XIDs的备机上,需要控制共享内存的大小.*参见RecordKnownAssignedTransactionIds()函数说明.**Wedon'tkeeptrackoftheimmediateparentofeachsubxid,onlythe*top-leveltransactionthateachsubxactbelongsto.Thisiscorrectin*recoveryonlybecauseabortedsubtransactionsareseparatelyWAL*logged.*我们不需要跟踪父事务的每个子事务,只需要跟踪子事务归属的顶层事务即可.*这样可行是因为在恢复中,已回滚的子事务是通过WAL单独记录的.**Thisiscorrectevenforthecasewhereseverallevelsaboveusdidn't*haveanxidassignedaswerecurseduptothembeforehand.*即使在我们之前递归到上面的几个级别时没有分配xid的情况下,这也是正确的。*/if(isSubXact&&XLogStandbyInfoActive()){unreportedXids[nUnreportedXids]=s->transactionId;nUnreportedXids++;/**ensurethistestmatchessimilaronein*RecoverPreparedTransactions()*确保在RecoverPreparedTransactions()中可以匹配到相似的.*/if(nUnreportedXids>=PGPROC_MAX_CACHED_SUBXIDS||log_unknown_top){xl_xact_assignmentxlrec;/**xtopisalwayssetbynowbecausewerecurseuptransaction*stacktothehighestunassignedxidandthencomebackdown*xtop现在已经设置好了,因为我们将事务堆栈递归到最高的未分配的xid,然后再返回*/xlrec.xtop=GetTopTransactionId();Assert(TransactionIdIsValid(xlrec.xtop));xlrec.nsubxacts=nUnreportedXids;XLogBeginInsert();XLogRegisterData((char*)&xlrec,MinSizeOfXactAssignment);XLogRegisterData((char*)unreportedXids,nUnreportedXids*sizeof(TransactionId));(void)XLogInsert(RM_XACT_ID,XLOG_XACT_ASSIGNMENT);nUnreportedXids=0;/*marktop,notcurrentxactashavingbeenlogged*///标记为最顶层,而不是当前已记录日志的xactTopTransactionStateData.didLogXid=true;}}}三、跟踪分析

执行txid_current,触发函数调用

11:10:36(xdb@[local]:5432)testdb=#begin;BEGIN11:40:20(xdb@[local]:5432)testdb=#*selecttxid_current_if_assigned();txid_current_if_assigned--------------------------(1row)11:40:43(xdb@[local]:5432)testdb=#*selecttxid_current();

启动gdb,设置断点

(gdb)bAssignTransactionIdBreakpoint5at0x546a4c:filexact.c,line491.(gdb)cContinuing.Breakpoint5,AssignTransactionId(s=0xf9c720<TopTransactionStateData>)atxact.c:491491boolisSubXact=(s->parent!=NULL);(gdb)

查看调用栈

(gdb)bt#0AssignTransactionId(s=0xf9c720<TopTransactionStateData>)atxact.c:491#10x000000000054693dinGetTopTransactionId()atxact.c:392#20x00000000009fe1f3intxid_current(fcinfo=0x25835a0)attxid.c:443#30x00000000006cfebdinExecInterpExpr(state=0x25834b8,econtext=0x25831a8,isnull=0x7ffe3d4a31f7)atexecExprInterp.c:654#40x00000000006d1ac6inExecInterpExprStillValid(state=0x25834b8,econtext=0x25831a8,isNull=0x7ffe3d4a31f7)atexecExprInterp.c:1786#50x00000000007140ddinExecEvalExprSwitchContext(state=0x25834b8,econtext=0x25831a8,isNull=0x7ffe3d4a31f7)at../../../src/include/executor/executor.h:303#60x000000000071414binExecProject(projInfo=0x25834b0)at../../../src/include/executor/executor.h:337#70x0000000000714323inExecResult(pstate=0x2583090)atnodeResult.c:136#80x00000000006e4c30inExecProcNodeFirst(node=0x2583090)atexecProcnode.c:445#90x00000000006d9974inExecProcNode(node=0x2583090)at../../../src/include/executor/executor.h:237#100x00000000006dc22dinExecutePlan(estate=0x2582e78,planstate=0x2583090,use_parallel_mode=false,operation=CMD_SELECT,sendTuples=true,numberTuples=0,direction=ForwardScanDirection,dest=0x24cd0a0,execute_once=true)atexecMain.c:1723#110x00000000006d9f5cinstandard_ExecutorRun(queryDesc=0x256b0c8,direction=ForwardScanDirection,count=0,execute_once=true)atexecMain.c:364#120x00000000006d9d7finExecutorRun(queryDesc=0x256b0c8,direction=ForwardScanDirection,count=0,execute_once=true)atexecMain.c:307#130x00000000008ccf5ainPortalRunSelect(portal=0x250c748,forward=true,count=0,dest=0x24cd0a0)atpquery.c:932#140x00000000008ccbf3inPortalRun(portal=0x250c748,count=9223372036854775807,isTopLevel=true,run_once=true,dest=0x24cd0a0,altdest=0x24cd0a0,completionTag=0x7ffe3d4a3570"")atpquery.c:773#150x00000000008c6b1einexec_simple_query(query_string=0x24a6ec8"selecttxid_current();")atpostgres.c:1145#160x00000000008cae70inPostgresMain(argc=1,argv=0x24d2dc8,dbname=0x24d2c30"testdb",username=0x24a3ba8"xdb")atpostgres.c:4182#170x000000000082642binBackendRun(port=0x24c8c00)atpostmaster.c:4361---Type<return>tocontinue,orq<return>toquit---#180x0000000000825b8finBackendStartup(port=0x24c8c00)atpostmaster.c:4033#190x0000000000821f1cinServerLoop()atpostmaster.c:1706#200x00000000008217b4inPostmasterMain(argc=1,argv=0x24a1b60)atpostmaster.c:1379#210x00000000007488efinmain(argc=1,argv=0x24a1b60)atmain.c:228

输入参数TransactionState(全局变量,指向TopTransactionStateData)

(gdb)ps$13=(TransactionState)0xf9c720<TopTransactionStateData>(gdb)p*s$14={transactionId=0,subTransactionId=1,name=0x0,savepointLevel=0,state=TRANS_INPROGRESS,blockState=TBLOCK_INPROGRESS,nestingLevel=1,gucNestLevel=1,curTransactionContext=0x2523850,curTransactionOwner=0x24d4868,childXids=0x0,nChildXids=0,maxChildXids=0,prevUser=10,prevSecContext=0,prevXactReadOnly=false,startedInRecovery=false,didLogXid=false,parallelModeLevel=0,parent=0x0}(gdb)

初始化部分变量并验证

(gdb)n493boollog_unknown_top=false;(gdb)496Assert(!TransactionIdIsValid(s->transactionId));(gdb)497Assert(s->state==TRANS_INPROGRESS);(gdb)

获取事务号

(gdb)503if(IsInParallelMode()||IsParallelWorker())(gdb)512if(isSubXact&&!TransactionIdIsValid(s->parent->transactionId))(gdb)545if(isSubXact&&XLogLogicalInfoActive()&&(gdb)557s->transactionId=GetNewTransactionId(isSubXact);(gdb)558if(!isSubXact)(gdb)ps->transactionId$15=2407(gdb)

注册,并设置其他信息

(gdb)n559XactTopTransactionId=s->transactionId;(gdb)561if(isSubXact)(gdb)568if(!isSubXact)(gdb)569RegisterPredicateLockingXid(s->transactionId);(gdb)576currentOwner=CurrentResourceOwner;(gdb)577CurrentResourceOwner=s->curTransactionOwner;(gdb)579XactLockTableInsert(s->transactionId);(gdb)581CurrentResourceOwner=currentOwner;(gdb)601if(isSubXact&&XLogStandbyInfoActive())(gdb)635}

完成调用

(gdb)GetTopTransactionId()atxact.c:393393returnXactTopTransactionId;(gdb)

“PostgreSQL中函数AssignTransactionId的实现逻辑是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!