这篇文章主要介绍“PostgreSQL中ReceiveXlogStream有什么作用”,在日常操作中,相信很多人在PostgreSQL中ReceiveXlogStream有什么作用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”PostgreSQL中ReceiveXlogStream有什么作用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的BaseBackup中对WAL数据进行备份的实现函数StartLogStreamer->LogStreamerMain及其主要的实现函数ReceiveXlogStream.

一、数据结构

logstreamer_param
WAL data streamer参数.

typedefstruct{////后台连接PGconn*bgconn;//开始位置XLogRecPtrstartptr;//目录或者tar文件,依赖于使用的模式charxlog[MAXPGPATH];/*directoryortarfiledependingonmode*///系统标识符char*sysidentifier;//时间线inttimeline;}logstreamer_param;

StreamCtl
接收xlog流数据时的全局参数

/**Globalparameterswhenreceivingxlogstream.Fordetailsabouttheindividualfields,*seethefunctioncommentforReceiveXlogStream().*接收xlog流数据时的全局参数.*每个域字段的详细解释,参见ReceiveXlogStream()函数注释.*/typedefstructStreamCtl{//streaming的开始位置XLogRecPtrstartpos;/*Startpositionforstreaming*///时间线TimeLineIDtimeline;/*Timelinetostreamdatafrom*///系统标识符char*sysidentifier;/*Validatethissystemidentifierand*timeline*///standby超时信息intstandby_message_timeout;/*Sendstatusmessagesthisoften*///是否同步(写入时是否马上FlushWALdata)boolsynchronous;/*FlushimmediatelyWALdataonwrite*///在已归档的数据中标记segment为已完成boolmark_done;/*Marksegmentasdoneingeneratedarchive*///刷新到磁盘上以确保数据的一致性状态(是否已刷新到磁盘上)booldo_sync;/*Flushtodisktoensureconsistentstateof*data*///在返回T时停止streamingstream_stop_callbackstream_stop;/*Stopstreamingwhenreturnstrue*///如有效,监测该socket中的输入并检查stream_stop()的返回pgsocketstop_socket;/*ifvalid,watchforinputonthissocket*andcheckstream_stop()whenthereisany*///如何写WALWalWriteMethod*walmethod;/*HowtowritetheWAL*///附加到部分接受文件的后缀char*partial_suffix;/*Suffixappendedtopartiallyreceivedfiles*///使用的replicationslot,如无则为NULLchar*replication_slot;/*Replicationslottouse,orNULL*/}StreamCtl;二、源码解读

LogStreamerMain
WAL流复制主函数,用于fork后的子进程调用

staticintLogStreamerMain(logstreamer_param*param){StreamCtlstream;//接收xlog流数据时的全局参数in_log_streamer=true;//初始化StreamCtl结构体MemSet(&stream,0,sizeof(stream));stream.startpos=param->startptr;stream.timeline=param->timeline;stream.sysidentifier=param->sysidentifier;stream.stream_stop=reached_end_position;#ifndefWIN32stream.stop_socket=bgpipe[0];#elsestream.stop_socket=PGINVALID_SOCKET;#endifstream.standby_message_timeout=standby_message_timeout;stream.synchronous=false;stream.do_sync=do_sync;stream.mark_done=true;stream.partial_suffix=NULL;stream.replication_slot=replication_slot;if(format=='p')stream.walmethod=CreateWalDirectoryMethod(param->xlog,0,do_sync);elsestream.walmethod=CreateWalTarMethod(param->xlog,compresslevel,do_sync);//接收数据if(!ReceiveXlogStream(param->bgconn,&stream))/**Anyerrorswillalreadyhavebeenreportedinthefunctionprocess,*butweneedtotelltheparentthatwedidn'tshutdowninanice*way.*在函数执行过程中出现的错误已通过警告的方式发出,*但仍需要告知父进程不能优雅的关闭本进程.*/return1;if(!stream.walmethod->finish()){fprintf(stderr,_("%s:couldnotfinishwritingWALfiles:%s\n"),progname,strerror(errno));return1;}//结束连接PQfinish(param->bgconn);//普通文件格式if(format=='p')FreeWalDirectoryMethod();elseFreeWalTarMethod();//是否内存pg_free(stream.walmethod);return0;}

ReceiveXlogStream
在指定的开始位置接收log stream

/**Receivealogstreamstartingatthespecifiedposition.*在指定的开始位置接收logstream**IndividualparametersarepassedthroughtheStreamCtlstructure.*通过StreamCtl结构体传递参数.**Ifsysidentifierisspecified,validatethatboththesystem*identifierandthetimelinematchesthespecifiedones*(bysendinganextraIDENTIFY_SYSTEMcommand)*如指定了系统标识符,验证系统标识符和timeline是否匹配指定的信息.*(通过发送额外的IDENTIFY_SYSTEM命令)**Allreceivedsegmentswillbewrittentothedirectory*specifiedbybasedir.Thiswillalsofetchanymissingtimelinehistory*files.*所有接收到的segments会写入到basedir中.*这同时会提前所有缺失的timelinehistory文件.**Thestream_stopcallbackwillbecalledeverytimedata*isreceived,andwheneverasegmentiscompleted.Ifitreturns*true,thestreamingwillstopandthefunction*return.Aslongasitreturnsfalse,streamingwillcontinue*indefinitely.*stream_stop回调函数在每次接收到数据以及segment完成传输后调用.*如返回T,streaming会停止,函数返回.*如返回F,streaming会一直继续.**Ifstream_stop()checksforexternalinput,stop_socketshouldbesetto*theFDitchecks.Thiswillallowsuchinputtobedetectedpromptly*ratherthanafterstandby_message_timeout(whichmightbeindefinite).*Notethatsignalswillinterruptwaitsforinputaswell,butthatis*race-ysinceasignalreceivedwhilebusywon'tinterruptthewait.*如stream_stop()用于检测额外的输入,stop_socket变量应设置为该函数需检查的FD.*这会允许立即检测此类输入,而不是在standby_message_timeout之后(可能会无限循环).*注意信号也会中断输入等待,但这是存在竞争的,因为在忙时接收到信号不会中断等待.**standby_message_timeoutcontrolshowoftenwesendamessage*backtothemasterlettingitknowourprogress,inmilliseconds.*Zeromeansnomessagesaresent.*Thismessagewillonlycontainthewritelocation,andnever*flushorreplay.*standby_message_timeout控制发送进度消息回master的频度,单位为ms.*0意味着没有消息会发送.*该消息只保存写入位置,永远不会flush或replay.**If'partial_suffix'isnotNULL,filesareinitiallycreatedwiththe*givensuffix,andthesuffixisremovedoncethefileisfinished.That*allowsyoutotellthedifferencebetweenpartialandcompletedfiles,*sothatyoucancontinuelaterwhereyouleft.*如'partial_suffix'不为NULL,文件已通过给定的suffix创建,*一旦文件完成传输,则suffix会被清除.*这是部分和完整完成文件的异同,以便在离开后可以继续.**If'synchronous'istrue,thereceivedWALisflushedassoonaswritten,*otherwiseonlywhentheWALfileisclosed.*如'synchronous'为T,接收到的WAL会刷新为写入,否则的话只会在WALfile关闭时才写入.**Note:TheWALlocation*must*beatalogsegmentstart!*注意:WAL位置必须是logsegment的起始位置.*/boolReceiveXlogStream(PGconn*conn,StreamCtl*stream){charquery[128];charslotcmd[128];PGresult*res;XLogRecPtrstoppos;/**Thecallershould'vecheckedtheserverversionalready,butdoesn'tdo*anyharmtocheckitheretoo.*调用者已完成版本校验,但这里重复校验并没有什么问题.*/if(!CheckServerVersionForStreaming(conn))returnfalse;/**Decidewhetherwewanttoreporttheflushposition.Ifwereportthe*flushposition,theprimarywillknowwhatWALwe'llpossibly*re-request,anditcanthenremoveolderWALsafely.Wemustalwaysdo*thatwhenweareusingslots.*确定是否需要报告flush位置.*如果我们报告了flush位置,主服务器将会知道可能重复请求的WALfile,*这样可以安全的移除更老的WAL.*如使用slots,应经常执行该操作.**Reportingtheflushpositionmakesoneeligibleasasynchronous*replica.Peopleshouldn'tincludegenericnamesin*synchronous_standby_names,butwe'veprotectedthemagainstitsofar,*solet'scontinuetodosounlessspecificallyrequested.*报告flush位置使其符合同步副本的条件.*DBA不应该在synchronous_standby_names中包含常规的名称,但我们截止目前位置已很好的保护了它们,*因此可以继续这样执行除非特别请求.*/if(stream->replication_slot!=NULL){//存在slotreportFlushPosition=true;sprintf(slotcmd,"SLOT\"%s\"",stream->replication_slot);}else{if(stream->synchronous)reportFlushPosition=true;//同步elsereportFlushPosition=false;//异步slotcmd[0]=0;//ASCII0}if(stream->sysidentifier!=NULL){//系统标识符不为NULL/*Validatesystemidentifierhasn'tchanged*///验证系统标识符没有改变//发送IDENTIFY_SYSTEM命令res=PQexec(conn,"IDENTIFY_SYSTEM");if(PQresultStatus(res)!=PGRES_TUPLES_OK){fprintf(stderr,_("%s:couldnotsendreplicationcommand\"%s\":%s"),progname,"IDENTIFY_SYSTEM",PQerrorMessage(conn));PQclear(res);returnfalse;}if(PQntuples(res)!=1||PQnfields(res)<3){fprintf(stderr,_("%s:couldnotidentifysystem:got%drowsand%dfields,expected%drowsand%drmorefields\n"),progname,PQntuples(res),PQnfields(res),1,3);PQclear(res);returnfalse;}if(strcmp(stream->sysidentifier,PQgetvalue(res,0,0))!=0){fprintf(stderr,_("%s:systemidentifierdoesnotmatchbetweenbasebackupandstreamingonnection\n"),progname);PQclear(res);returnfalse;}if(stream->timeline>atoi(PQgetvalue(res,0,1))){fprintf(stderr,_("%s:startingtimeline%uisnotpresentintheserver\n"),progname,stream->timeline);PQclear(res);returnfalse;}PQclear(res);}/**initializeflushpositiontostartingpoint,it'sthecaller's*responsibilitythatthat'ssane.*初始化flush位置为开始点,这是调用者的责任.*/lastFlushPosition=stream->startpos;while(1){/**Fetchthetimelinehistoryfileforthistimeline,ifwedon'thave*italready.Whenstreaminglogtotar,thiswillalwaysreturn*false,asweareneverstreamingintoanexistingfileand*thereforetherecanbenopre-existingtimelinehistoryfile.*为该timeline提前timelinehistory,如我们已不需要.*如streaming日志为tar格式,这通常会返回F,这如同从来没有streaming到已存在的文件中,*因此没有已存在的timelinehistory文件.*/if(!existsTimeLineHistoryFile(stream)){//如不存在history文件snprintf(query,sizeof(query),"TIMELINE_HISTORY%u",stream->timeline);//发送TIMELINE_HISTORY命令res=PQexec(conn,query);if(PQresultStatus(res)!=PGRES_TUPLES_OK){/*FIXME:wemightsenditok,butgetanerror*/fprintf(stderr,_("%s:couldnotsendreplicationcommand\"%s\":%s"),progname,"TIMELINE_HISTORY",PQresultErrorMessage(res));PQclear(res);returnfalse;}/**TheresponsetoTIMELINE_HISTORYisasinglerowresultset*withtwofields:filenameandcontent*TIMELINE_HISTORY的响应是一个单行结果集,有两个字段:filename和content*/if(PQnfields(res)!=2||PQntuples(res)!=1){fprintf(stderr,_("%s:unexpectedresponsetoTIMELINE_HISTORYcommand:got%drowsand%dields,expected%drowsand%dfields\n"),progname,PQntuples(res),PQnfields(res),1,2);}/*Writethehistoryfiletodisk*///写入history文件到磁盘上writeTimeLineHistoryFile(stream,PQgetvalue(res,0,0),PQgetvalue(res,0,1));PQclear(res);}/**Beforewestartstreamingfromtherequestedlocation,checkifthe*callbacktellsustostophere.*从请求的位置开始streaming前,检查回调函数告诉我们在哪停止*/if(stream->stream_stop(stream->startpos,stream->timeline,false))returntrue;/*Initiatethereplicationstreamatspecifiedlocation*///在指定的位置初始化复制流snprintf(query,sizeof(query),"START_REPLICATION%s%X/%XTIMELINE%u",slotcmd,(uint32)(stream->startpos>>32),(uint32)stream->startpos,stream->timeline);//发送START_REPLICATION命令res=PQexec(conn,query);if(PQresultStatus(res)!=PGRES_COPY_BOTH){fprintf(stderr,_("%s:couldnotsendreplicationcommand\"%s\":%s"),progname,"START_REPLICATION",PQresultErrorMessage(res));PQclear(res);returnfalse;}PQclear(res);/*StreamtheWAL*///流化WALres=HandleCopyStream(conn,stream,&stoppos);if(res==NULL)gotoerror;/**Streamingfinished.**Therearetwopossiblereasonsforthat:acontrolledshutdown,or*wereachedtheendofthecurrenttimeline.Incaseof*end-of-timeline,theserversendsaresultsetafterCopyhas*finished,containinginformationaboutthenexttimeline.Read*that,andrestartstreamingfromthenexttimeline.Incaseof*controlledshutdown,stophere.*Streaming完成.*这里有两个可能的原因:可控的shutdown或者到达了当前时间线的末尾.*在end-of-timeline这种情况下,服务器在Copy完成后发送结果集,*含有关于下一个时间线的相关信息.*读取这些信息,在下一个时间线开始重新启动streaming.*如为可控的关闭,可以停止了.*/if(PQresultStatus(res)==PGRES_TUPLES_OK){/**End-of-timeline.Readthenexttimeline'sIDandstarting*position.Usually,thestartingpositionwillmatchtheendof*theprevioustimeline,buttherearecornercaseslikeifthe*serverhadsentushalfofaWALrecord,whenitwaspromoted.*Thenewtimelinewillbeginattheendofthelastcomplete*recordinthatcase,overlappingthepartialWALrecordonthe*oldtimeline.*这是End-of-timeline的情况.*读取下一个时间线ID和开始位置.通常来说,开始位置将匹配先前时间线的末尾,*但会存在特殊的情况比如服务器已经传输了WALRecord的一部分.*这种情况下,新的时间线会在上次已完成的记录末尾开始,与旧时间线的部分WALRecord重叠.*/uint32newtimeline;//新的时间线boolparsed;//是否解析//读取结果集的末尾parsed=ReadEndOfStreamingResult(res,&stream->startpos,&newtimeline);PQclear(res);if(!parsed)gotoerror;/*Sanitycheckthevaluestheservergaveus*///执行校验和坚持if(newtimeline<=stream->timeline){//新的时间线不可能小于等于stream中的时间线fprintf(stderr,_("%s:serverreportedunexpectednexttimeline%u,followingtimeline%u\n"),progname,newtimeline,stream->timeline);gotoerror;}if(stream->startpos>stoppos){//开始位置大于结束位置fprintf(stderr,_("%s:serverstoppedstreamingtimeline%uat%X/%X,butreportednexttimelineutobeginat%X/%X\n"),progname,stream->timeline,(uint32)(stoppos>>32),(uint32)stoppos,newtimeline,(uint32)(stream->startpos>>32),(uint32)stream->startpos);gotoerror;}/*Readthefinalresult,whichshouldbeCommandComplete.*///读取最后的结果,应为命令结束res=PQgetResult(conn);if(PQresultStatus(res)!=PGRES_COMMAND_OK){fprintf(stderr,_("%s:unexpectedterminationofreplicationstream:%s"),progname,PQresultErrorMessage(res));PQclear(res);gotoerror;}PQclear(res);/**Loopbacktostartstreamingfromthenewtimeline.Always*startstreamingatthebeginningofasegment.*从新时间线开始循环,通常会在segment的开始出开始streaming*/stream->timeline=newtimeline;stream->startpos=stream->startpos-XLogSegmentOffset(stream->startpos,WalSegSz);continue;//继续循环}elseif(PQresultStatus(res)==PGRES_COMMAND_OK){PQclear(res);/**Endofreplication(ie.controlledshutdownoftheserver).*replication完成(比如服务器关闭了复制)**Checkifthecallbackthinksit'sOKtostophere.Ifnot,*complain.*检查是否回调函数认为在这里停止就OK了,如果不是,则报警.*/if(stream->stream_stop(stoppos,stream->timeline,false))returntrue;else{fprintf(stderr,_("%s:replicationstreamwasterminatedbeforestoppoint\n"),progname);gotoerror;}}else{/*Serverreturnedanerror.*///返回错误fprintf(stderr,_("%s:unexpectedterminationofreplicationstream:%s"),progname,PQresultErrorMessage(res));PQclear(res);gotoerror;}}error:if(walfile!=NULL&&stream->walmethod->close(walfile,CLOSE_NO_RENAME)!=0)fprintf(stderr,_("%s:couldnotclosefile\"%s\":%s\n"),progname,current_walfile_name,stream->walmethod->getlasterror());walfile=NULL;returnfalse;}/**ThemainloopofReceiveXlogStream.HandlestheCOPYstreamafter*initiatingstreamingwiththeSTART_REPLICATIONcommand.*ReceiveXlogStream中的主循环实现函数.*在使用START_REPLICATION命令初始化streaming后处理COPYstream.**IftheCOPYends(notnecessarilysuccessfully)dueamessagefromthe*server,returnsaPGresultandsets*stoppostothelastbytewritten.*Onanyothersortoferror,returnsNULL.*如COPY由于服务器端的原因终止,返回PGresult并设置*stoppos为最后写入的字节.*如出现错误,则返回NULL.*/staticPGresult*HandleCopyStream(PGconn*conn,StreamCtl*stream,XLogRecPtr*stoppos){char*copybuf=NULL;TimestampTzlast_status=-1;XLogRecPtrblockpos=stream->startpos;still_sending=true;while(1){//循环处理intr;TimestampTznow;//时间戳longsleeptime;/**Checkifweshouldcontinuestreaming,orabortatthispoint.*检查我们是否应该继续streaming,或者在当前就退出*/if(!CheckCopyStreamStop(conn,stream,blockpos,stoppos))gotoerror;now=feGetCurrentTimestamp();/**Ifsynchronousoptionistrue,issuesynccommandassoonasthere*areWALdatawhichhasnotbeenflushedyet.*如同步选项为T,只要存在未flushed的WALdata,马上执行sync命令.*/if(stream->synchronous&&lastFlushPosition<blockpos&&walfile!=NULL){if(stream->walmethod->sync(walfile)!=0){fprintf(stderr,_("%s:couldnotfsyncfile\"%s\":%s\n"),progname,current_walfile_name,stream->walmethod->getlasterror());gotoerror;}lastFlushPosition=blockpos;/**SendfeedbacksothattheserverseesthelatestWALlocations*immediately.*发送反馈以便服务器马上可看到最后的WAL位置.*/if(!sendFeedback(conn,blockpos,now,false))gotoerror;last_status=now;}/**Potentiallysendastatusmessagetothemaster*可能向主服务器发送状态消息*/if(still_sending&&stream->standby_message_timeout>0&&feTimestampDifferenceExceeds(last_status,now,stream->standby_message_timeout)){/*Timetosendfeedback!*///是时候发送反馈了.if(!sendFeedback(conn,blockpos,now,false))gotoerror;last_status=now;}/**Calculatehowlongsend/receiveloopsshouldsleep*计算send/receive循环应该睡眠多长时间*/sleeptime=CalculateCopyStreamSleeptime(now,stream->standby_message_timeout,last_status);//拷贝stream中接收到的内容r=CopyStreamReceive(conn,sleeptime,stream->stop_socket,©buf);while(r!=0){if(r==-1)gotoerror;//出错if(r==-2){//已完结或出错PGresult*res=HandleEndOfCopyStream(conn,stream,copybuf,blockpos,stoppos);if(res==NULL)gotoerror;elsereturnres;}/*Checkthemessagetype.*///检查消息类型if(copybuf[0]=='k'){if(!ProcessKeepaliveMsg(conn,stream,copybuf,r,blockpos,&last_status))gotoerror;}elseif(copybuf[0]=='w'){if(!ProcessXLogDataMsg(conn,stream,copybuf,r,&blockpos))gotoerror;/**Checkifweshouldcontinuestreaming,orabortatthis*point.*检查我们是否应该继续streaming或者在此就停止*/if(!CheckCopyStreamStop(conn,stream,blockpos,stoppos))gotoerror;}else{fprintf(stderr,_("%s:unrecognizedstreamingheader:\"%c\"\n"),progname,copybuf[0]);gotoerror;}/**Processthereceiveddata,andanysubsequentdatawecanread*withoutblocking.*处理接收到的数据,后续的数据可以无阻塞的读取.*/r=CopyStreamReceive(conn,0,stream->stop_socket,©buf);}}error:if(copybuf!=NULL)PQfreemem(copybuf);returnNULL;}/**Checkifweshouldcontinuestreaming,orabortatthispoint.*/staticboolCheckCopyStreamStop(PGconn*conn,StreamCtl*stream,XLogRecPtrblockpos,XLogRecPtr*stoppos){if(still_sending&&stream->stream_stop(blockpos,stream->timeline,false)){if(!close_walfile(stream,blockpos)){/*Potentialerrormessageiswrittenbyclose_walfile*/returnfalse;}if(PQputCopyEnd(conn,NULL)<=0||PQflush(conn)){fprintf(stderr,_("%s:couldnotsendcopy-endpacket:%s"),progname,PQerrorMessage(conn));returnfalse;}still_sending=false;}returntrue;}/**ReceiveCopyDatamessageavailablefromXLOGstream,blockingfor*maximumof'timeout'ms.*接收从XLOGstream中可用的CopyData消息,如超出最大的'timeout'毫秒,需要阻塞.**Ifdatawasreceived,returnsthelengthofthedata.*bufferissetto*pointtoabufferholdingthereceivedmessage.Thebufferisonlyvalid*untilthenextCopyStreamReceivecall.*如接收到数据,则返回数据的大小.*变量*buffer设置为指向含有接收到消息的buffer.buffer在下一个CopyStreamReceive调用才会生效.**Returns0ifnodatawasavailablewithintimeout,orifwaitwas*interruptedbysignalorstop_socketinput.*-1onerror.-2iftheserverendedtheCOPY.*如在timeout时间内没有数据返回,或者如果因为信号等待/stop_socket输入中断,则返回0.*-1:表示出现错误.-2表示服务器完成了COPY*/staticintCopyStreamReceive(PGconn*conn,longtimeout,pgsocketstop_socket,char**buffer){char*copybuf=NULL;intrawlen;if(*buffer!=NULL)PQfreemem(*buffer);*buffer=NULL;/*TrytoreceiveaCopyDatamessage*/rawlen=PQgetCopyData(conn,©buf,1);if(rawlen==0){intret;/**Nodataavailable.Waitforsometoappear,butnotlongerthan*thespecifiedtimeout,sothatwecanpingtheserver.Alsostop*waitingifinputappearsonstop_socket.*/ret=CopyStreamPoll(conn,timeout,stop_socket);if(ret<=0)returnret;/*Nowthereisactuallydataonthesocket*/if(PQconsumeInput(conn)==0){fprintf(stderr,_("%s:couldnotreceivedatafromWALstream:%s"),progname,PQerrorMessage(conn));return-1;}/*Nowthatwe'veconsumedsomeinput,tryagain*/rawlen=PQgetCopyData(conn,©buf,1);if(rawlen==0)return0;}if(rawlen==-1)/*end-of-streamingorerror*/return-2;if(rawlen==-2){fprintf(stderr,_("%s:couldnotreadCOPYdata:%s"),progname,PQerrorMessage(conn));return-1;}/*Returnreceivedmessagestocaller*/*buffer=copybuf;returnrawlen;}三、跟踪分析

备份命令

pg_basebackup-h192.168.26.25-Ureplicator-p5432-D/data/backup-P-Xs-R-v

启动gdb跟踪(跟踪fork的子进程)

[xdb@localhost~]$gdbpg_basebackupGNUgdb(GDB)RedHatEnterpriseLinux7.6.1-100.el7Copyright(C)2013FreeSoftwareFoundation,Inc.LicenseGPLv3+:GNUGPLversion3orlater<http://gnu.org/licenses/gpl.html>Thisisfreesoftware:youarefreetochangeandredistributeit.ThereisNOWARRANTY,totheextentpermittedbylaw.Type"showcopying"and"showwarranty"fordetails.ThisGDBwasconfiguredas"x86_64-redhat-linux-gnu".Forbugreportinginstructions,pleasesee:<http://www.gnu.org/software/gdb/bugs/>...Readingsymbolsfrom/appdb/xdb/pg11.2/bin/pg_basebackup...done.(gdb)setargs-h192.168.26.25-Ureplicator-p5432-D/data/backup-P-Xs-R-v(gdb)setfollow-fork-modechild(gdb)bLogStreamerMainBreakpoint1at0x403c51:filepg_basebackup.c,line490.(gdb)rStartingprogram:/appdb/xdb/pg11.2/bin/pg_basebackup-h192.168.26.25-Ureplicator-p5432-D/data/backup-P-Xs-R-v[Threaddebuggingusinglibthread_dbenabled]Usinghostlibthread_dblibrary"/lib64/libthread_db.so.1".Password:pg_basebackup:initiatingbasebackup,waitingforcheckpointtocompletepg_basebackup:checkpointcompletedpg_basebackup:write-aheadlogstartpoint:0/5A000028ontimeline16pg_basebackup:startingbackgroundWALreceiverpg_basebackup:createdtemporaryreplicationslot"pg_basebackup_1604"[Newprocess2036][Threaddebuggingusinglibthread_dbenabled]backup/backup_label)Usinghostlibthread_dblibrary"/lib64/libthread_db.so.1".[SwitchingtoThread0x7ffff7fe7840(LWP2036)]Breakpoint1,LogStreamerMain(param=0x629db0)atpg_basebackup.c:490490in_log_streamer=true;305153/305153kB(100%),1/1tablespace)pg_basebackup:write-aheadlogendpoint:0/5A0000F8pg_basebackup:waitingforbackgroundprocesstofinishstreaming...(gdb)

输入参数

(gdb)n492MemSet(&stream,0,sizeof(stream));(gdb)p*param$1={bgconn=0x62a280,startptr=1509949440,xlog="/data/backup/pg_wal",'\000'<repeats1004times>,sysidentifier=0x61f1a0"6666964067616600474",timeline=16}(gdb)

设置StreamCtl结构体

(gdb)n493stream.startpos=param->startptr;(gdb)494stream.timeline=param->timeline;(gdb)495stream.sysidentifier=param->sysidentifier;(gdb)496stream.stream_stop=reached_end_position;(gdb)498stream.stop_socket=bgpipe[0];(gdb)502stream.standby_message_timeout=standby_message_timeout;(gdb)503stream.synchronous=false;(gdb)504stream.do_sync=do_sync;(gdb)505stream.mark_done=true;(gdb)506stream.partial_suffix=NULL;(gdb)507stream.replication_slot=replication_slot;(gdb)509if(format=='p')(gdb)510stream.walmethod=CreateWalDirectoryMethod(param->xlog,0,do_sync);(gdb)

进入ReceiveXlogStream函数

(gdb)514if(!ReceiveXlogStream(param->bgconn,&stream))(gdb)stepReceiveXlogStream(conn=0x62a280,stream=0x7fffffffda30)atreceivelog.c:458458if(!CheckServerVersionForStreaming(conn))(gdb)(gdb)n472if(stream->replication_slot!=NULL)(gdb)p*stream$2={startpos=1509949440,timeline=16,sysidentifier=0x61f1a0"6666964067616600474",standby_message_timeout=10000,synchronous=false,mark_done=true,do_sync=true,stream_stop=0x403953<reached_end_position>,stop_socket=8,walmethod=0x632b10,partial_suffix=0x0,replication_slot=0x62a1e0"pg_basebackup_1604"}(gdb)

判断系统标识符和时间线

(gdb)n474reportFlushPosition=true;(gdb)475sprintf(slotcmd,"SLOT\"%s\"",stream->replication_slot);(gdb)486if(stream->sysidentifier!=NULL)(gdb)489res=PQexec(conn,"IDENTIFY_SYSTEM");(gdb)490if(PQresultStatus(res)!=PGRES_TUPLES_OK)(gdb)498if(PQntuples(res)!=1||PQnfields(res)<3)(gdb)506if(strcmp(stream->sysidentifier,PQgetvalue(res,0,0))!=0)(gdb)pPQgetvalue(res,0,0)$3=0x633500"6666964067616600474"(gdb)n514if(stream->timeline>atoi(PQgetvalue(res,0,1)))(gdb)522PQclear(res);(gdb)pPQgetvalue(res,0,1)$4=0x633514"16"(gdb)

不存在时间线history文件,生成history文件

(gdb)n529lastFlushPosition=stream->startpos;(gdb)539if(!existsTimeLineHistoryFile(stream))(gdb)541snprintf(query,sizeof(query),"TIMELINE_HISTORY%u",stream->timeline);(gdb)542res=PQexec(conn,query);(gdb)543if(PQresultStatus(res)!=PGRES_TUPLES_OK)(gdb)556if(PQnfields(res)!=2||PQntuples(res)!=1)(gdb)564writeTimeLineHistoryFile(stream,(gdb)568PQclear(res);(gdb)

调用START_REPLICATION命令初始化

(gdb)575if(stream->stream_stop(stream->startpos,stream->timeline,false))(gdb)n579snprintf(query,sizeof(query),"START_REPLICATION%s%X/%XTIMELINE%u",(gdb)581(uint32)(stream->startpos>>32),(uint32)stream->startpos,(gdb)579snprintf(query,sizeof(query),"START_REPLICATION%s%X/%XTIMELINE%u",(gdb)581(uint32)(stream->startpos>>32),(uint32)stream->startpos,(gdb)579snprintf(query,sizeof(query),"START_REPLICATION%s%X/%XTIMELINE%u",(gdb)583res=PQexec(conn,query);(gdb)584if(PQresultStatus(res)!=PGRES_COPY_BOTH)(gdb)591PQclear(res);(gdb)

执行命令,处理stream WAL,完成调用

595if(res==NULL)(gdb)p*res$5={ntups=0,numAttributes=0,attDescs=0x0,tuples=0x0,tupArrSize=0,numParameters=0,paramDescs=0x0,resultStatus=PGRES_COMMAND_OK,cmdStatus="START_STREAMING\000\000\000\000\000\270\027u\367\377\177\000\000P/c\000\000\000\000\000CT\000\000\001",'\000'<repeats19times>,"\200\000\000",binary=0,noticeHooks={noticeRec=0x7ffff7b9eaa4<defaultNoticeReceiver>,noticeRecArg=0x0,noticeProc=0x7ffff7b9eaf9<defaultNoticeProcessor>,noticeProcArg=0x0},events=0x0,nEvents=0,client_encoding=0,errMsg=0x0,errFields=0x0,errQuery=0x0,null_field="",curBlock=0x0,curOffset=0,spaceLeft=0}(gdb)n608if(PQresultStatus(res)==PGRES_TUPLES_OK)(gdb)666elseif(PQresultStatus(res)==PGRES_COMMAND_OK)(gdb)668PQclear(res);(gdb)676if(stream->stream_stop(stoppos,stream->timeline,false))(gdb)677returntrue;(gdb)702}(gdb)LogStreamerMain(param=0x629db0)atpg_basebackup.c:523523if(!stream.walmethod->finish())(gdb)

到此,关于“PostgreSQL中ReceiveXlogStream有什么作用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!