PostgreSQL的后台进程walsender分析
这篇文章主要介绍“PostgreSQL的后台进程walsender分析”,在日常操作中,相信很多人在PostgreSQL的后台进程walsender分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”PostgreSQL的后台进程walsender分析”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
该进程实质上是streaming replication环境中master节点上普通的backend进程,在standby节点启动时,standby节点向master发送连接请求,master节点的postmaster进程接收到请求后,启动该进程与standby节点的walreceiver进程建立通讯连接,用于传输WAL Record.
walsender启动后,使用gdb跟踪此进程,其调用栈如下:
(gdb)bt#00x00007fb6e6390903in__epoll_wait_nocancel()from/lib64/libc.so.6#10x000000000088e668inWaitEventSetWaitBlock(set=0x10ac808,cur_timeout=29999,occurred_events=0x7ffd634441b0,nevents=1)atlatch.c:1048#20x000000000088e543inWaitEventSetWait(set=0x10ac808,timeout=29999,occurred_events=0x7ffd634441b0,nevents=1,wait_event_info=83886092)atlatch.c:1000#30x000000000088dcecinWaitLatchOrSocket(latch=0x7fb6dcbfc4d4,wakeEvents=27,sock=10,timeout=29999,wait_event_info=83886092)atlatch.c:385#40x000000000085405binWalSndLoop(send_data=0x8547fe<XLogSendPhysical>)atwalsender.c:2229#50x0000000000851c93inStartReplication(cmd=0x10ab750)atwalsender.c:684#60x00000000008532f0inexec_replication_command(cmd_string=0x101dd78"START_REPLICATION0/5D000000TIMELINE16")atwalsender.c:1539#70x00000000008c0170inPostgresMain(argc=1,argv=0x1049cb8,dbname=0x1049ba8"",username=0x1049b80"replicator")atpostgres.c:4178#80x000000000081e06cinBackendRun(port=0x103fb50)atpostmaster.c:4361#90x000000000081d7dfinBackendStartup(port=0x103fb50)atpostmaster.c:4033#100x0000000000819bd9inServerLoop()atpostmaster.c:1706#110x000000000081948finPostmasterMain(argc=1,argv=0x1018a50)atpostmaster.c:1379#120x0000000000742931inmain(argc=1,argv=0x1018a50)atmain.c:228
本节首先介绍调用栈中PostgresMain函数.
一、数据结构StringInfo
StringInfoData结构体保存关于扩展字符串的相关信息.
/*-------------------------*StringInfoDataholdsinformationaboutanextensiblestring.*StringInfoData结构体保存关于扩展字符串的相关信息.*dataisthecurrentbufferforthestring(allocatedwithpalloc).*data通过palloc分配的字符串缓存*lenisthecurrentstringlength.Thereisguaranteedtobe*aterminating'\0'atdata[len],althoughthisisnotvery*usefulwhenthestringholdsbinarydataratherthantext.*len是当前字符串的长度.保证以ASCII0(\0)结束(data[len]='\0').*虽然如果存储的是二进制数据而不是文本时不太好使.*maxlenistheallocatedsizeinbytesof'data',i.e.themaximum*stringsize(includingtheterminating'\0'char)thatwecan*currentlystorein'data'withouthavingtoreallocate*morespace.Wemustalwayshavemaxlen>len.*maxlen以字节为单位已分配的'data'的大小,限定了最大的字符串大小(包括结尾的ASCII0)*小于此尺寸的数据可以直接存储而无需重新分配.*cursorisinitializedtozerobymakeStringInfoorinitStringInfo,*butisnototherwisetouchedbythestringinfo.croutines.*SomeroutinesuseittoscanthroughaStringInfo.*cursor通过makeStringInfo或initStringInfo初始化为0,但不受stringinfo.c例程的影响.*某些例程使用该字段扫描StringInfo*-------------------------*/typedefstructStringInfoData{char*data;intlen;intmaxlen;intcursor;}StringInfoData;typedefStringInfoData*StringInfo;二、源码解读
PostgresMain
后台进程postgres的主循环入口 — 所有的交互式或其他形式的后台进程在这里启动.
其主要逻辑如下:
1.初始化相关变量
2.初始化进程信息,设置进程状态,初始化GUC参数
3.解析命令行参数并作相关校验
4.如为walsender进程,则调用WalSndSignals初始化,否则执行其他信号初始化
5.初始化BlockSig/UnBlockSig/StartupBlockSig
6.非Postmaster,则检查数据库路径/切换路径/创建锁定文件等操作
7.调用BaseInit执行基本的初始化
8.调用InitProcess/InitPostgres初始化进程
9.重置内存上下文,处理加载库和前后台消息交互等
10.初始化内存上下文
11.进入主循环
11.1切换至MessageContext上下文
11.2初始化输入的消息
11.3给客户端发送可以执行查询等消息
11.4读取命令
11.5根据命令类型执行相关操作
/*----------------------------------------------------------------*PostgresMain*postgresmainloop--allbackends,interactiveorotherwisestarthere*postgres主循环--所有的交互式或其他形式的后台进程在这里启动**argc/argvarethecommandlineargumentstobeused.(Whenbeingforked*bythepostmaster,thesearenottheoriginalargvarrayoftheprocess.)*dbnameisthenameofthedatabasetoconnectto,orNULLifthedatabase*nameshouldbeextractedfromthecommandlineargumentsordefaulted.*usernameisthePostgreSQLusernametobeusedforthesession.*argc/argv是命令行参数(postmasterfork进程时,不存在原有的进程argv数组).*dbname是连接的数据库名称,如需要从命令行参数中解析或者为默认的数据库名称,则为NULL.*username是PostgreSQL会话的用户名.*----------------------------------------------------------------*//*输入:argc/argv-Main函数的输入参数dbname-数据库名称username-用户名输出:无*/voidPostgresMain(intargc,char*argv[],constchar*dbname,constchar*username){intfirstchar;//临时变量,读取输入的CommandStringInfoDatainput_message;//字符串增强结构体sigjmp_buflocal_sigjmp_buf;//系统变量volatileboolsend_ready_for_query=true;//booldisable_idle_in_transaction_timeout=false;/*Initializestartupprocessenvironmentifnecessary.*///如需要,初始化启动进程环境if(!IsUnderPostmaster//未初始化?initializedforthebootstrap/standalonecaseInitStandaloneProcess(argv[0]);//初始化进程SetProcessingMode(InitProcessing);//设置进程状态为InitProcessing/**Setdefaultvaluesforcommand-lineoptions.*设置命令行选项默认值*/if(!IsUnderPostmaster)InitializeGUCOptions();//初始化GUC参数,GUC=GrandUnifiedConfiguration/**Parsecommand-lineoptions.*解析命令行选项*/process_postgres_switches(argc,argv,PGC_POSTMASTER,&dbname);//解析输入参数/*Musthavegottenadatabasename,orhaveadefault(theusername)*///必须包含数据库名称或者存在默认值if(dbname==NULL)//输入的dbname为空{dbname=username;//设置为用户名if(dbname==NULL)//如仍为空,报错ereport(FATAL,(errcode(ERRCODE_INVALID_PARAMETER_VALUE),errmsg("%s:nodatabasenorusernamespecified",progname)));}/*Acquireconfigurationparameters,unlessinheritedfrompostmaster*///请求配置参数,除非从postmaster中继承if(!IsUnderPostmaster){if(!SelectConfigFiles(userDoption,progname))//读取配置文件conf/hba文件&定位数据目录proc_exit(1);}/**Setupsignalhandlersandmasks.*配置信号handlers和masks.**Notethatpostmasterblockedallsignalsbeforeforkingchildprocess,*sothereisnoraceconditionwherebywemightreceiveasignalbefore*wehavesetupthehandler.*注意在fork子进程前postmaster已阻塞了所有信号,*因此就算接收到信号,但在完成配置handler前不会存在条件争用.**Alsonote:it'sbestnottouseanysignalsthatareSIG_IGNoredinthe*postmaster.Ifsuchasignalarrivesbeforeweareabletochangethe*handlertonon-SIG_IGN,it'llgetdropped.Instead,makeadummy*handlerinthepostmastertoreservethesignal.(Ofcourse,thisisn't*anissueforsignalsthatarelocallygenerated,suchasSIGALRMand*SIGPIPE.)*同时注意:最好不要使用在postmaster中标记为SIG_IGNored的信号.*如果在改变处理器为non-SIG_IGN前,接收到这样的信号,会被清除.*相反,可以在postmaster中创建dummyhandler来保留这样的信号.*(当然,对于本地产生的信号,比如SIGALRM和SIGPIPE,这不会是问题)*/if(am_walsender)//walsender进程?WalSndSignals();//如果是,则调用WalSndSignalselse//不是walsender进程{//设置标记,读取配置文件pqsignal(SIGHUP,PostgresSigHupHandler);/*setflagtoreadconfig*file*///中断信号处理器(中断当前查询)pqsignal(SIGINT,StatementCancelHandler);/*cancelcurrentquery*///终止当前查询并退出pqsignal(SIGTERM,die);/*cancelcurrentqueryandexit*//**Inastandalonebackend,SIGQUITcanbegeneratedfromthekeyboard*easily,whileSIGTERMcannot,sowemakebothsignalsdodie()*ratherthanquickdie().*在standalone进程,SIGQUIT可很容易的通过键盘生成,而SIGTERM则不好生成,*因此让这两个信号执行die()而不是quickdie().*///boolIsUnderPostmaster=falseif(IsUnderPostmaster)//悲催时刻,执行quickdie()pqsignal(SIGQUIT,quickdie);/*hardcrashtime*/else//执行die()pqsignal(SIGQUIT,die);/*cancelcurrentqueryandexit*///建立SIGALRM处理器InitializeTimeouts();/*establishesSIGALRMhandler*//**Ignorefailuretowritetofrontend.Note:iffrontendcloses*connection,wewillnoticeitandexitcleanlywhencontrolnext*returnstoouterloop.Thisseemssaferthanforcingexitinthe*midstofoutputduringwho-knows-whatoperation...*忽略写入前端的错误.*注意:如果前端关闭了连接,会通知并在空中下一次返回给外层循环时退出.*这看起来会比在who-knows-what操作期间强制退出安全一些.*/pqsignal(SIGPIPE,SIG_IGN);pqsignal(SIGUSR1,procsignal_sigusr1_handler);pqsignal(SIGUSR2,SIG_IGN);pqsignal(SIGFPE,FloatExceptionHandler);/**Resetsomesignalsthatareacceptedbypostmasterbutnotby*backend*重置一些postmaster接收而后台进程不会接收的信号*///在某些平台上,system()需要这个信号pqsignal(SIGCHLD,SIG_DFL);/*system()requiresthisonsome*platforms*/}//初始化BlockSig/UnBlockSig/StartupBlockSigpqinitmask();//InitializeBlockSig,UnBlockSig,andStartupBlockSig.if(IsUnderPostmaster){/*WeallowSIGQUIT(quickdie)atalltimes*///放开SIGQUIT(quickdie)sigdelset(&BlockSig,SIGQUIT);}//除了SIGQUIT,阻塞其他PG_SETMASK(&BlockSig);/*blockeverythingexceptSIGQUIT*/if(!IsUnderPostmaster){/**Validatewehavebeengivenareasonable-lookingDataDir(ifunder*postmaster,assumepostmasterdidthisalready).*验证已给出了reasonable-lookingDataDir*(如在postmaster下,假定postmaster已完成了这个事情)*/checkDataDir();//确认数据库路径OK,使用stat命令/*ChangeintoDataDir(ifunderpostmaster,wasdonealready)*///切换至数据库路径,使用chdir命令ChangeToDataDir();/**Createlockfilefordatadirectory.*///创建锁定文件,CreateLockFile(DIRECTORY_LOCK_FILE,amPostmaster,"",true,DataDir);CreateDataDirLockFile(false);/*readcontrolfile(errorcheckingandcontainsconfig)*///读取控制文件(错误检查和包含配置)LocalProcessControlFile(false);//Readthecontrolfile,setrespectiveGUCs./*InitializeMaxBackends(ifunderpostmaster,wasdonealready)*///从配置选项中初始化MaxBackendsInitializeMaxBackends();//InitializeMaxBackendsvaluefromconfigoptions.}/*Earlyinitialization*/BaseInit();//执行基本的初始化/**Createaper-backendPGPROCstructinsharedmemory,exceptinthe*EXEC_BACKENDcasewherethiswasdoneinSubPostmasterMain.Wemustdo*thisbeforewecanuseLWLocks(andintheEXEC_BACKENDcasewealready*hadtodosomestuffwithLWLocks).*在共享内存中创建每个backend都有的PGPROC结构体,除了在SubPostmasterMain的EXEC_BACKEND情况.*在可以使用LWLocks前必须执行该操作.*(在EXEC_BACKEND中,已使用了LWLocks执行这个场景)*///initializeaper-processdatastructureforthisbackend#ifdefEXEC_BACKENDif(!IsUnderPostmaster)InitProcess();#elseInitProcess();#endif/*WeneedtoallowSIGINT,etcduringtheinitialtransaction*///在初始化事务期间需要允许SIGINT等等PG_SETMASK(&UnBlockSig);/**Generalinitialization.*常规的初始化.**NOTE:ifyouaretemptedtoaddcodeinthisvicinity,considerputting*itinsideInitPostgres()instead.Inparticular,anythingthat*involvesdatabaseaccessshouldbethere,nothere.*注意:如果希望在此处添加代码,请考虑将其放入InitPostgres()中.*特别的,任何涉及到数据库访问的内容都应该在InitPostgres中,而不是在这里.*/InitPostgres(dbname,InvalidOid,username,InvalidOid,NULL,false);//InitializePOSTGRES/**IfthePostmasterContextisstillaround,recyclethespace;wedon't*needitanymoreafterInitPostgrescompletes.Notethisdoesnottrash**MyProcPort,becauseConnCreate()allocatedthatspacewithmalloc()*...elsewe'dneedtocopythePortdatafirst.Also,subsidiarydata*suchastheusernameisn'tlosteither;seeProcessStartupPacket().*如果PostmasterContext仍然存在,回收空间;*在InitPostgres完成后,我们不再需要这些空间.*注意:这个操作不会回收*MyProcPort,因为ConnCreate()分配*/if(PostmasterContext){MemoryContextDelete(PostmasterContext);PostmasterContext=NULL;}//完成初始化后,设置进程模式为NormalProcessingSetProcessingMode(NormalProcessing);/**NowallGUCstatesarefullysetup.Reportthemtoclientif*appropriate.*///ReportGUCBeginReportingGUCOptions();/**Alsosetuphandlertologsessionend;wehavetowaittillnowtobe*sureLog_disconnectionshasitsfinalvalue.*///设置处理器,用于记录会话结束;//等待直至确保Log_disconnections最终有值存在if(IsUnderPostmaster&&Log_disconnections)on_proc_exit(log_disconnections,0);//thisfunctionaddsacallbackfunctiontothelistoffunctionsinvokedbyproc_exit()/*PerforminitializationspecifictoaWALsenderprocess.*///为WALsender进程执行特别的初始化if(am_walsender)InitWalSender();//初始化WALsenderprocess/**processanylibrariesthatshouldbepreloadedatbackendstart(this*likewisecan'tbedoneuntilGUCsettingsarecomplete)*处理在后台进程启动时需要提前预装载的库(这个步骤在GUC配置完成后才能够执行)*/process_session_preload_libraries();//加载LIB/**Sendthisbackend'scancellationinfotothefrontend.*发送后端的取消信息到前台*/if(whereToSendOutput==DestRemote){StringInfoDatabuf;pq_beginmessage(&buf,'K');pq_sendint32(&buf,(int32)MyProcPid);pq_sendint32(&buf,(int32)MyCancelKey);pq_endmessage(&buf);/*NeednotflushsinceReadyForQuerywilldoit.*///不需要flush,因为ReadyForQuery会执行该操作}/*Welcomebannerforcase*///standalone的欢迎信息if(whereToSendOutput==DestDebug)printf("\nPostgreSQLstand-alonebackend%s\n",PG_VERSION);/**Createthememorycontextwewilluseinthemainloop.*创建主循环中使用的内存上下文**MessageContextisresetonceperiterationofthemainloop,ie,upon*completionofprocessingofeachcommandmessagefromtheclient.*主循环中,每一次迭代都会重置MessageContext,比如完成了每个命令的处理,已从客户端返回了信息*///初始化内存上下文:MessageContextMessageContext=AllocSetContextCreate(TopMemoryContext,"MessageContext",ALLOCSET_DEFAULT_SIZES);/**CreatememorycontextandbufferusedforRowDescriptionmessages.As*SendRowDescriptionMessage(),viaexec_describe_statement_message(),is*frequentlyexecutedforeversinglestatement,wedon'twantto*allocateaseparatebuffereverytime.*创建RowDescription消息的内存上下文和缓存.*每一条单独的语句执行时都会频繁的通过exec_describe_statement_message()函数*调用SendRowDescriptionMessage(),不希望在每次都分配单独的缓存.*///TODO传输RowDescriptionmessages?row_description_context=AllocSetContextCreate(TopMemoryContext,"RowDescriptionContext",ALLOCSET_DEFAULT_SIZES);MemoryContextSwitchTo(row_description_context);initStringInfo(&row_description_buf);MemoryContextSwitchTo(TopMemoryContext);/**Rememberstand-alonebackendstartuptime*记录stand-alone后台进程的启动时间*/if(!IsUnderPostmaster)PgStartTime=GetCurrentTimestamp();//记录启动时间/**POSTGRESmainprocessingloopbeginshere*POSTGRES的主处理循环在这里开始**Ifanexceptionisencountered,processingresumesheresoweabortthe*currenttransactionandstartanewone.*如果出现了异常,处理过程会在这里恢复因此PG可以回滚当前事务并启动新事务**Youmightwonderwhythisisn'tcodedasaninfinitelooparounda*PG_TRYconstruct.Thereasonisthatthisisthebottomofthe*exceptionstack,andsowithPG_TRYtherewouldbenoexceptionhandler*inforceatallduringtheCATCHpart.Byleavingtheoutermostsetjmp*alwaysactive,wehaveatleastsomechanceofrecoveringfromanerror*duringerrorrecovery.(Ifwegetintoaninfiniteloopthereby,it*willsoonbestoppedbyoverflowofelog.c'sinternalstatestack.)*你可能会对这里不用PG_TRY构造一个无限循环感到困惑.*理由是这是异常栈的底,因此在这里使用PG_TRY会导致在CATCH部分没有任何的异常处理器.*通过让最外层的setjmp始终处于活动状态,我们起码有机会在错误恢复的错误中进行恢复.*(如果进入了无线循环,会很快因为elog's内部状态栈的溢出而停止)**Notethatweusesigsetjmp(...,1),sothatthisfunction'ssignalmask*(towit,UnBlockSig)willberestoredwhenlongjmp'ingtohere.This*isessentialincasewelongjmp'doutofasignalhandleronaplatform*wherethatleavesthesignalblocked.It'snotredundantwiththe*unblockinAbortTransaction()becausethelatterisonlycalledifwe*wereinsideatransaction.*注意我们使用sigsetjmp(...,1),*以便该函数的信号mask(也就是说,UnBlockSig)在longjmp到这里的时候可以被还原.*在某个让信号继续阻塞的平台上通过longjmp跳出信号处理器时这样的处理是需要的.*这与AbortTransaction()设置unblock并不多余因为如果我们在事务中保证只执行了一次.*/if(sigsetjmp(local_sigjmp_buf,1)!=0)//{/**NOTE:ifyouaretemptedtoaddmorecodeinthisif-block,*considerthehighprobabilitythatitshouldbein*AbortTransaction()instead.Theonlystuffdonedirectlyhere*shouldbestuffthatisguaranteedtoapply*only*forouter-level*errorrecovery,suchasadjustingtheFE/BEprotocolstatus.*注意:如果你希望在if-block中添加代码,建议在AbortTransaction()中添加.*直接添加在这里的唯一理由是可以应用对高层的错误恢复,比如调整FE/BE协议状态.*//*SincenotusingPG_TRY,mustreseterrorstackbyhand*///不使用PG_TRY,必须重置错误栈error_context_stack=NULL;/*Preventinterruptswhilecleaningup*///清理时禁止中断HOLD_INTERRUPTS();/**ForgetanypendingQueryCancelrequest,sincewe'rereturningto*theidleloopanyway,andcancelanyactivetimeoutrequests.(In*futurewemightwanttoallowsometimeoutrequeststosurvive,but*atminimumit'dbenecessarytodoreschedule_timeouts(),incase*wegotherebecauseofaquerycancelinterruptingtheSIGALRM*interrupthandler.)Noteinparticularthatwemustclearthe*statementandlocktimeoutindicators,topreventanyfutureplain*querycancelsfrombeingmisreportedastimeoutsincasewe're*forgettingatimeoutcancel.*废弃正在处理中QueryCancel请求,因为进程会返回到空闲循环中,同时取消所有活动的超时请求.*(在未来,我们可能希望运行某些超时请求仍然存活,但最起码有需要执行reschedule_timeouts(),*在这种情况下到达这里的原因是查询取消是通过SIGALRM终端处理器中断的).*注意特别的,必须清理语句和锁超时提示器,已避免在取消超时后后续的普通查询出现超时时没有报告.*/disable_all_timeouts(false);QueryCancelPending=false;/*secondtoavoidracecondition*/stmt_timeout_active=false;/*Notreadingfromtheclientanymore.*///不再从客户端读取信息DoingCommandRead=false;/*Makesurelibpqisinagoodstate*///确保libq状态OKpq_comm_reset();/*Reporttheerrortotheclientand/orserverlog*///向客户端和/或服务器日志报告错误EmitErrorReport();/**Makesuredebug_query_stringgetsresetbeforewepossiblyclobber*thestorageitpointsat.*确保debug_query_string在可能破坏它所指向的存储之前重置*/debug_query_string=NULL;/**Abortthecurrenttransactioninordertorecover.*取消当前事务*/AbortCurrentTransaction();if(am_walsender)//如为walsender,则执行清理工作WalSndErrorCleanup();//错误清理PortalErrorCleanup();SPICleanup();/**Wecan'treleasereplicationslotsinsideAbortTransaction()aswe*needtobeabletostartandaborttransactionswhilehavingaslot*acquired.Butweneverneedtoholdthemacrosstoplevelerrors,*soreleasinghereisfine.There'sanothercleanupinProcKill()*ensuringwe'llcorrectlycleanuponFATALerrorsaswell.*在AbortTransaction()中不能释放replicationslots因为需要在持有slot时启动和回滚事务.*但不限于在顶层出现错误时持有这些slots,因此在这里释放这些slots是OK的.*这里在ProcKill()中存在另外一个清理确保我们可以在FATAL错误中正确的恢复.*/if(MyReplicationSlot!=NULL)ReplicationSlotRelease();/*Wealsowanttocleanuptemporaryslotsonerror.*///出现错误时,清理临时slotsReplicationSlotCleanup();//重置JITjit_reset_after_error();/**Nowreturntonormaltop-levelcontextandclearErrorContextfor*nexttime.*现在可以回到正常的顶层上下文中并为下次循环清理ErrorContext*/MemoryContextSwitchTo(TopMemoryContext);FlushErrorState();/**Ifwewerehandlinganextended-query-protocolmessage,initiate*skiptillnextSync.Thisalsocausesusnottoissue*ReadyForQuery(untilwegetSync).*如果我们正在处理extended-query-protocol消息,启动跳过直至下次Sync.*这同时会导致我们不会触发ReadyForQuery(直至接收到Sync)*/if(doing_extended_query_message)ignore_till_sync=true;/*Wedon'thaveatransactioncommandopenanymore*///不再有打开的事务命令xact_started=false;/**Ifanerroroccurredwhilewewerereadingamessagefromthe*client,wehavepotentiallylosttrackofwheretheprevious*messageendsandthenextonebegins.Eventhoughwehave*otherwiserecoveredfromtheerror,wecannotsafelyreadanymore*messagesfromtheclient,sothereisn'tmuchwecandowiththe*connectionanymore.*如果在读取客户端消息时出现错误,进程可能已经丢失了上一条消息结束和下一条消息开始的位置.*虽然从错误中恢复了,但我们仍然不能安全的从客户端读取消息,因此我们对该连接已无能无力.*/if(pq_is_reading_msg())ereport(FATAL,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("terminatingconnectionbecauseprotocolsynchronizationwaslost")));/*Nowwecanallowinterruptsagain*///允许中断RESUME_INTERRUPTS();}/*Wecannowhandleereport(ERROR)*///现在可以处理ereport(ERROR)了PG_exception_stack=&local_sigjmp_buf;if(!ignore_till_sync)//错误恢复后重新初始化send_ready_for_query=true;/*initially,oraftererror*//**Non-errorqueriesloophere.*世界清净了...*/for(;;)//主循环{/**Attopofloop,resetextended-query-messageflag,sothatany*errorsencounteredin"idle"statedon'tprovokeskip.*在循环的最开始处,重置extended-query-message标记,*以便在"idle"状态遇到错误时不会跳过*/doing_extended_query_message=false;/**Releasestorageleftoverfrompriorquerycycle,andcreateanew*queryinputbufferintheclearedMessageContext.*释放上一个查询周期中残余的存储空间,并在干净的MessageContext中创建新的查询输入缓存*/MemoryContextSwitchTo(MessageContext);//切换至MessageContextMemoryContextResetAndDeleteChildren(MessageContext);initStringInfo(&input_message);//初始化输入的信息/**Alsoconsiderreleasingourcatalogsnapshotifany,sothatit's*notpreventingadvanceofglobalxminwhilewewaitfortheclient.*尝试释放catalogsnapshot,以便在等待客户端返回时不会阻碍全局xmin的增加.*/InvalidateCatalogSnapshotConditionally();/**(1)Ifwe'vereachedidlestate,tellthefrontendwe'rereadyfor*anewquery.*(1)如果是idle状态,告诉前台已准备接受新查询请求了.**Note:thisincludesfflush()'ingthelastoftheprioroutput.*注意:这包含了fflush()'ing前一个输出的最后一个.**Thisisalsoagoodtimetosendcollectedstatisticstothe*collector,andtoupdatethePSstatsdisplay.Weavoiddoing*thoseeverytimethroughthemessageloopbecauseit'dslowdown*processingofbatchedmessages,andbecausewedon'twanttoreport*uncommittedupdates(thatconfusesautovacuum).Thenotification*processorwantsacalltoo,ifwearenotinatransactionblock.*发送收集的统计信息到collector,正当其时!*在每次消息循环时都发送统计信息是需要避免的,*因为我不希望报告未提交的更新(这会让autoacuum出现混乱).*如果我们不在事务块中,那么通知处理器希望调用一次.*/if(send_ready_for_query)//Iamready!{if(IsAbortedTransactionBlockState()){set_ps_display("idleintransaction(aborted)",false);pgstat_report_activity(STATE_IDLEINTRANSACTION_ABORTED,NULL);/*Starttheidle-in-transactiontimer*/if(IdleInTransactionSessionTimeout>0){disable_idle_in_transaction_timeout=true;enable_timeout_after(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,IdleInTransactionSessionTimeout);}}elseif(IsTransactionOrTransactionBlock()){set_ps_display("idleintransaction",false);pgstat_report_activity(STATE_IDLEINTRANSACTION,NULL);/*Starttheidle-in-transactiontimer*/if(IdleInTransactionSessionTimeout>0){disable_idle_in_transaction_timeout=true;enable_timeout_after(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,IdleInTransactionSessionTimeout);}}else{ProcessCompletedNotifies();pgstat_report_stat(false);set_ps_display("idle",false);pgstat_report_activity(STATE_IDLE,NULL);}ReadyForQuery(whereToSendOutput);send_ready_for_query=false;}/**(2)Allowasynchronoussignalstobeexecutedimmediatelyifthey*comeinwhilewearewaitingforclientinput.(Thismustbe*conditionalsincewedon'twant,say,readsonbehalfofCOPYFROM*STDINdoingthesamething.)*(2)如果异步信号在等待客户端输入时接收到,那么允许马上执行.*(这是有条件的,因为我们不希望或者说执行COPYFORMSTDIN同样的动作)*/DoingCommandRead=true;/**(3)readacommand(loopblockshere)*(3)读取命令(循环块)*/firstchar=ReadCommand(&input_message);//读取命令/**(4)disableasyncsignalconditionsagain.*(4)再次禁用异步信号条件**Querycancelissupposedtobeano-opwhenthereisnoqueryin*progress,soifaquerycancelarrivedwhilewewereidle,just*resetQueryCancelPending.ProcessInterrupts()hasthateffectwhen*it'scalledwhenDoingCommandReadisset,socheckforinterrupts*beforeresettingDoingCommandRead.*在处理过程中如无查询,那么取消查询被认为是no-op的,*因此如果在空闲状态下接收到查询取消信号,那么重置QueryCancelPending.*ProcessInterrupts()函数在DoingCommandRead设置的时候调用会有类似的影响,*因此在重置DoingCommandRead前重新检查中断.*/CHECK_FOR_INTERRUPTS();DoingCommandRead=false;/**(5)turnofftheidle-in-transactiontimeout*(5)关闭idle-in-transaction超时控制*/if(disable_idle_in_transaction_timeout){disable_timeout(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,false);disable_idle_in_transaction_timeout=false;}/**(6)checkforanyotherinterestingeventsthathappenedwhilewe*slept.*(6)在休眠时检查感兴趣的事件*/if(ConfigReloadPending){ConfigReloadPending=false;ProcessConfigFile(PGC_SIGHUP);}/**(7)processthecommand.Butignoreitifwe'reskippingtill*Sync.*(7)处理命令.但如果我们设置了ignore_till_sync则忽略之.*/if(ignore_till_sync&&firstchar!=EOF)continue;switch(firstchar){case'Q':/*simplequery*/{//---------简单查询constchar*query_string;/*Setstatement_timestamp()*///设置时间戳SetCurrentStatementStartTimestamp();//SQL语句query_string=pq_getmsgstring(&input_message);pq_getmsgend(&input_message);if(am_walsender){//如为WALsender,执行exec_replication_commandif(!exec_replication_command(query_string))exec_simple_query(query_string);}else//普通的后台进程exec_simple_query(query_string);//执行SQL语句send_ready_for_query=true;}break;case'P':/*parse*/{//----------解析constchar*stmt_name;constchar*query_string;intnumParams;Oid*paramTypes=NULL;forbidden_in_wal_sender(firstchar);/*Setstatement_timestamp()*/SetCurrentStatementStartTimestamp();stmt_name=pq_getmsgstring(&input_message);query_string=pq_getmsgstring(&input_message);numParams=pq_getmsgint(&input_message,2);if(numParams>0){inti;paramTypes=(Oid*)palloc(numParams*sizeof(Oid));for(i=0;i<numParams;i++)paramTypes[i]=pq_getmsgint(&input_message,4);}pq_getmsgend(&input_message);//执行解析exec_parse_message(query_string,stmt_name,paramTypes,numParams);}break;case'B':/*bind*///-------------绑定forbidden_in_wal_sender(firstchar);/*Setstatement_timestamp()*/SetCurrentStatementStartTimestamp();/**thismessageiscomplexenoughthatitseemsbesttoput*thefieldextractionout-of-line*该消息看起来比较复杂,看起来最好的做法是提取字段*/exec_bind_message(&input_message);break;case'E':/*execute*/{//------------执行constchar*portal_name;intmax_rows;forbidden_in_wal_sender(firstchar);/*Setstatement_timestamp()*/SetCurrentStatementStartTimestamp();portal_name=pq_getmsgstring(&input_message);max_rows=pq_getmsgint(&input_message,4);pq_getmsgend(&input_message);exec_execute_message(portal_name,max_rows);}break;case'F':/*fastpathfunctioncall*///-----------函数调用forbidden_in_wal_sender(firstchar);/*Setstatement_timestamp()*/SetCurrentStatementStartTimestamp();/*Reportquerytovariousmonitoringfacilities.*/pgstat_report_activity(STATE_FASTPATH,NULL);set_ps_display("<FASTPATH>",false);/*startanxactforthisfunctioninvocation*/start_xact_command();/**Note:wemayatthispointbeinsideanaborted*transaction.Wecan'tthrowerrorforthatuntilwe've*finishedreadingthefunction-callmessage,so*HandleFunctionRequest()mustcheckforitafterdoingso.*Becarefulnottodoanythingthatassumeswe'reinsidea*validtransactionhere.*//*switchbacktomessagecontext*/MemoryContextSwitchTo(MessageContext);HandleFunctionRequest(&input_message);/*committhefunction-invocationtransaction*/finish_xact_command();send_ready_for_query=true;break;case'C':/*close*/{//----------关闭intclose_type;constchar*close_target;forbidden_in_wal_sender(firstchar);close_type=pq_getmsgbyte(&input_message);close_target=pq_getmsgstring(&input_message);pq_getmsgend(&input_message);switch(close_type){case'S':if(close_target[0]!='\0')DropPreparedStatement(close_target,false);else{/*special-casetheunnamedstatement*/drop_unnamed_stmt();}break;case'P':{Portalportal;portal=GetPortalByName(close_target);if(PortalIsValid(portal))PortalDrop(portal,false);}break;default:ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("invalidCLOSEmessagesubtype%d",close_type)));break;}if(whereToSendOutput==DestRemote)pq_putemptymessage('3');/*CloseComplete*/}break;case'D':/*describe*/{//-------------描述比如\d等命令intdescribe_type;constchar*describe_target;forbidden_in_wal_sender(firstchar);/*Setstatement_timestamp()(neededforxact)*/SetCurrentStatementStartTimestamp();describe_type=pq_getmsgbyte(&input_message);describe_target=pq_getmsgstring(&input_message);pq_getmsgend(&input_message);switch(describe_type){case'S':exec_describe_statement_message(describe_target);break;case'P':exec_describe_portal_message(describe_target);break;default:ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("invalidDESCRIBEmessagesubtype%d",describe_type)));break;}}break;case'H':/*flush*///---------flush刷新pq_getmsgend(&input_message);if(whereToSendOutput==DestRemote)pq_flush();break;case'S':/*sync*///----------Sync同步pq_getmsgend(&input_message);finish_xact_command();send_ready_for_query=true;break;/**'X'meansthatthefrontendisclosingdownthesocket.EOF*meansunexpectedlossoffrontendconnection.Eitherway,*performnormalshutdown.*/case'X':caseEOF:/**ResetwhereToSendOutputtopreventereportfromattempting*tosendanymoremessagestoclient.*/if(whereToSendOutput==DestRemote)whereToSendOutput=DestNone;/**NOTE:ifyouaretemptedtoaddmorecodehere,DON'T!*Whateveryouhadinmindtodoshouldbesetupasan*on_proc_exitoron_shmem_exitcallback,instead.Otherwise*itwillfailtobecalledduringotherbackend-shutdown*scenarios.*/proc_exit(0);case'd':/*copydata*/case'c':/*copydone*/case'f':/*copyfail*//**Acceptbutignorethesemessages,perprotocolspec;we*probablygotherebecauseaCOPYfailed,andthefrontend*isstillsendingdata.*/break;default:ereport(FATAL,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("invalidfrontendmessagetype%d",firstchar)));}}/*endofinput-readingloop*/}三、跟踪分析
在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点
[xdb@localhost~]$ps-ef|greppostgrexdb12631014:20pts/000:00:00/appdb/xdb/pg11.2/bin/postgres(gdb)bPostgresMainBreakpoint1at0x8bf9df:filepostgres.c,line3660.(gdb)setfollow-fork-modechild(gdb)cContinuing.[Newprocess1332][Threaddebuggingusinglibthread_dbenabled]Usinghostlibthread_dblibrary"/lib64/libthread_db.so.1".[SwitchingtoThread0x7fb3885d98c0(LWP1332)]Breakpoint1,PostgresMain(argc=1,argv=0x1aa4c78,dbname=0x1aa4b68"",username=0x1aa4b40"replicator")atpostgres.c:36603660volatileboolsend_ready_for_query=true;(gdb)
1.初始化相关变量
注意变量IsUnderPostmaster,如为T则表示该进程为postmaster的子进程
(gdb)p*argv$1=0xc27715"postgres"(gdb)n3661booldisable_idle_in_transaction_timeout=false;(gdb)3664if(!IsUnderPostmaster)(gdb)pIsUnderPostmaster$2=true
2.初始化进程信息,设置进程状态,初始化GUC参数
(gdb)n3667SetProcessingMode(InitProcessing);(gdb)3672if(!IsUnderPostmaster)(gdb)pInitProcessing$3=InitProcessing
3.解析命令行参数并作相关校验
(gdb)n3678process_postgres_switches(argc,argv,PGC_POSTMASTER,&dbname);(gdb)3681if(dbname==NULL)(gdb)pdbname$4=0x1aa4b68""(gdb)pusername$5=0x1aa4b40"replicator"(gdb)n3692if(!IsUnderPostmaster)(gdb)
4.如为walsender进程,则调用WalSndSignals初始化,否则执行其他信号初始化
3712if(am_walsender)(gdb)3713WalSndSignals();(gdb)
5.初始化BlockSig/UnBlockSig/StartupBlockSig
(gdb)3751pqinitmask();(gdb)3753if(IsUnderPostmaster)(gdb)3756sigdelset(&BlockSig,SIGQUIT);(gdb)(gdb)3759PG_SETMASK(&BlockSig);/*blockeverythingexceptSIGQUIT*/(gdb)
6.非子进程(仍为postmaster进程),则检查数据库路径/切换路径/创建锁定文件等操作
N/A
7.调用BaseInit执行基本的初始化
3785BaseInit();(gdb)
8.调用InitProcess/InitPostgres初始化进程
3797InitProcess();(gdb)3801PG_SETMASK(&UnBlockSig);(gdb)3810InitPostgres(dbname,InvalidOid,username,InvalidOid,NULL,false);(gdb)
9.重置内存上下文,处理加载库和前后台消息交互等
(gdb)3819if(PostmasterContext)(gdb)3821MemoryContextDelete(PostmasterContext);(gdb)PPostmasterContext$6=(MemoryContext)0x1a78c60(gdb)P*PostmasterContext$7={type=T_AllocSetContext,isReset=false,allowInCritSection=false,methods=0xc93260<AllocSetMethods>,parent=0x1a73aa0,firstchild=0x1a9a700,prevchild=0x1a7ac70,nextchild=0x1a75ab0,name=0xc2622a"Postmaster",ident=0x0,reset_cbs=0x0}(gdb)n3822PostmasterContext=NULL;(gdb)3825SetProcessingMode(NormalProcessing);(gdb)3831BeginReportingGUCOptions();(gdb)3837if(IsUnderPostmaster&&Log_disconnections)(gdb)pLog_disconnections$8=false(gdb)p$9=false(gdb)n3841if(am_walsender)(gdb)3842InitWalSender();(gdb)3848process_session_preload_libraries();(gdb)3853if(whereToSendOutput==DestRemote)(gdb)3857pq_beginmessage(&buf,'K');(gdb)3858pq_sendint32(&buf,(int32)MyProcPid);(gdb)3859pq_sendint32(&buf,(int32)MyCancelKey);(gdb)3860pq_endmessage(&buf);(gdb)3865if(whereToSendOutput==DestDebug)(gdb)
10.初始化内存上下文
(gdb)3874MessageContext=AllocSetContextCreate(TopMemoryContext,(gdb)3884row_description_context=AllocSetContextCreate(TopMemoryContext,(gdb)3887MemoryContextSwitchTo(row_description_context);(gdb)3888initStringInfo(&row_description_buf);(gdb)3889MemoryContextSwitchTo(TopMemoryContext);(gdb)3894if(!IsUnderPostmaster)(gdb)3919if(sigsetjmp(local_sigjmp_buf,1)!=0)(gdb)4027PG_exception_stack=&local_sigjmp_buf;(gdb)4029if(!ignore_till_sync)(gdb)4030send_ready_for_query=true;/*initially,oraftererror*/(gdb)
11.进入主循环
11.1切换至MessageContext上下文
(gdb)4042doing_extended_query_message=false;(gdb)4048MemoryContextSwitchTo(MessageContext);(gdb)4049MemoryContextResetAndDeleteChildren(MessageContext);
11.2初始化输入的消息
(gdb)4051initStringInfo(&input_message);(gdb)4057InvalidateCatalogSnapshotConditionally();(gdb)pinput_message$10={data=0x1a78d78"",len=0,maxlen=1024,cursor=0}(gdb)
11.3给客户端发送可以执行查询等消息
(gdb)n4072if(send_ready_for_query)(gdb)psend_ready_for_query$12=true(gdb)n4074if(IsAbortedTransactionBlockState())(gdb)4087elseif(IsTransactionOrTransactionBlock())(gdb)4102ProcessCompletedNotifies();(gdb)4103pgstat_report_stat(false);(gdb)4105set_ps_display("idle",false);(gdb)4106pgstat_report_activity(STATE_IDLE,NULL);(gdb)4109ReadyForQuery(whereToSendOutput);(gdb)4110send_ready_for_query=false;(gdb)
11.4读取命令
命令是IDENTIFY_SYSTEM,判断系统标识是否OK
firstchar -> ASCII 81 —> 字母’Q’
(gdb)4119DoingCommandRead=true;(gdb)4124firstchar=ReadCommand(&input_message);(gdb)4135CHECK_FOR_INTERRUPTS();(gdb)pinput_message$13={data=0x1a78d78"IDENTIFY_SYSTEM",len=16,maxlen=1024,cursor=0}(gdb)pfirstchar$14=81(gdb)$15=81(gdb)n4136DoingCommandRead=false;(gdb)4141if(disable_idle_in_transaction_timeout)(gdb)4151if(ConfigReloadPending)(gdb)4161if(ignore_till_sync&&firstchar!=EOF)(gdb)
11.5根据命令类型执行相关操作
walsender —> 执行exec_replication_command命令
(gdb)4164switch(firstchar)(gdb)4171SetCurrentStatementStartTimestamp();(gdb)4173query_string=pq_getmsgstring(&input_message);(gdb)4174pq_getmsgend(&input_message);(gdb)pquery_string$16=0x1a78d78"IDENTIFY_SYSTEM"(gdb)n4176if(am_walsender)(gdb)4178if(!exec_replication_command(query_string))(gdb)4184send_ready_for_query=true;(gdb)4186break;(gdb)4411}/*endofinput-readingloop*/(gdb)
继续循环,接收命令,第二个命令是START_REPLICATION
...(gdb)4124firstchar=ReadCommand(&input_message);(gdb)4135CHECK_FOR_INTERRUPTS();(gdb)pinput_message$18={data=0x1a78d78"START_REPLICATION0/5D000000TIMELINE16",len=41,maxlen=1024,cursor=0}(gdb)pfirstchar$19=81...4164switch(firstchar)(gdb)n4171SetCurrentStatementStartTimestamp();(gdb)4173query_string=pq_getmsgstring(&input_message);(gdb)4174pq_getmsgend(&input_message);(gdb)4176if(am_walsender)(gdb)pquery_string$20=0x1a78d78"START_REPLICATION0/5D000000TIMELINE16"(gdb)pinput_message$21={data=0x1a78d78"START_REPLICATION0/5D000000TIMELINE16",len=41,maxlen=1024,cursor=41}(gdb)n4178if(!exec_replication_command(query_string))(gdb)
开始执行复制,master节点使用psql连接数据库,执行sql语句,子进程会接收到相关信号,执行相关处理
执行脚本
[xdb@localhost~]$psql-dtestdbpsql(11.2)Type"help"forhelp.testdb=#droptablet1;
子进程输出
(gdb)ProgramreceivedsignalSIGUSR1,Userdefinedsignal1.0x00007fb38696c903in__epoll_wait_nocancel()from/lib64/libc.so.6(gdb)Singlesteppinguntilexitfromfunction__epoll_wait_nocancel,whichhasnolinenumberinformation.procsignal_sigusr1_handler(postgres_signal_arg=32766)atprocsignal.c:262262{(gdb)n263intsave_errno=errno;(gdb)ProgramreceivedsignalSIGTRAP,Trace/breakpointtrap.0x00007fb3881eecd0in__errno_location()from/lib64/libpthread.so.0(gdb)Singlesteppinguntilexitfromfunction__errno_location,whichhasnolinenumberinformation.procsignal_sigusr1_handler(postgres_signal_arg=10)atprocsignal.c:265265if(CheckProcSignal(PROCSIG_CATCHUP_INTERRUPT))(gdb)
DONE!
DEBUG退出gdb后,psql会话crash:(
[xdb@localhost~]$psql-dtestdbpsql(11.2)Type"help"forhelp.testdb=#droptablet1;WARNING:terminatingconnectionbecauseofcrashofanotherserverprocessDETAIL:Thepostmasterhascommandedthisserverprocesstorollbackthecurrenttransactionandexit,becauseanotherserverprocessexitedabnormallyandpossiblycorruptedsharedmemory.HINT:Inamomentyoushouldbeabletoreconnecttothedatabaseandrepeatyourcommand.serverclosedtheconnectionunexpectedlyThisprobablymeanstheserverterminatedabnormallybeforeorwhileprocessingtherequest.Theconnectiontotheserverwaslost.Attemptingreset:Failed.!>
到此,关于“PostgreSQL的后台进程walsender分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。