本篇内容主要讲解“PostgreSQL中StartLogStreamer分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中StartLogStreamer分析”吧!

本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的BaseBackup中对WAL数据进行备份的实现函数StartLogStreamer.

一、数据结构

logstreamer_param
WAL data streamer参数.

typedefstruct{////后台连接PGconn*bgconn;//开始位置XLogRecPtrstartptr;//目录或者tar文件,依赖于使用的模式charxlog[MAXPGPATH];/*directoryortarfiledependingonmode*///系统标识符char*sysidentifier;//时间线inttimeline;}logstreamer_param;

StreamCtl
接收xlog流数据时的全局参数

/**Globalparameterswhenreceivingxlogstream.Fordetailsabouttheindividualfields,*seethefunctioncommentforReceiveXlogStream().*接收xlog流数据时的全局参数.*每个域字段的详细解释,参见ReceiveXlogStream()函数注释.*/typedefstructStreamCtl{//streaming的开始位置XLogRecPtrstartpos;/*Startpositionforstreaming*///时间线TimeLineIDtimeline;/*Timelinetostreamdatafrom*///系统标识符char*sysidentifier;/*Validatethissystemidentifierand*timeline*///standby超时信息intstandby_message_timeout;/*Sendstatusmessagesthisoften*///是否同步(写入时是否马上FlushWALdata)boolsynchronous;/*FlushimmediatelyWALdataonwrite*///在已归档的数据中标记segment为已完成boolmark_done;/*Marksegmentasdoneingeneratedarchive*///刷新到磁盘上以确保数据的一致性状态(是否已刷新到磁盘上)booldo_sync;/*Flushtodisktoensureconsistentstateof*data*///在返回T时停止streamingstream_stop_callbackstream_stop;/*Stopstreamingwhenreturnstrue*///如有效,监测该socket中的输入并检查stream_stop()的返回pgsocketstop_socket;/*ifvalid,watchforinputonthissocket*andcheckstream_stop()whenthereisany*///如何写WALWalWriteMethod*walmethod;/*HowtowritetheWAL*///附加到部分接受文件的后缀char*partial_suffix;/*Suffixappendedtopartiallyreceivedfiles*///使用的replicationslot,如无则为NULLchar*replication_slot;/*Replicationslottouse,orNULL*/}StreamCtl;二、源码解读

StartLogStreamer
StartLogStreamer用于在备份时初始化后台进程用于接收WAL.接收进程将创建自己的数据库连接以并行的方式对文件进行streaming复制.

/**Initiatebackgroundprocessforreceivingxlogduringthebackup.*Thebackgroundstreamwilluseitsowndatabaseconnectionsowecan*streamthelogfileinparallelwiththebackups.*在备份时初始化后台进程用于接收WAL.*后台stream进程将用自己的数据库连接以使以并行的方式stream文件.*/staticvoidStartLogStreamer(char*startpos,uint32timeline,char*sysidentifier){//参数logstreamer_param*param;uint32hi,lo;//高位/低位charstatusdir[MAXPGPATH];param=pg_malloc0(sizeof(logstreamer_param));param->timeline=timeline;param->sysidentifier=sysidentifier;/*Convertthestartingposition*///转换开始位置(高低位转换)if(sscanf(startpos,"%X/%X",&hi,&lo)!=2){fprintf(stderr,_("%s:couldnotparsewrite-aheadloglocation\"%s\"\n"),progname,startpos);exit(1);}//开始位置,转换为64bit的地址param->startptr=((uint64)hi)<<32|lo;/*Roundofftoevensegmentposition*///按segment取整param->startptr-=XLogSegmentOffset(param->startptr,WalSegSz);#ifndefWIN32//WIN32使用的代码/*Createourbackgroundpipe*/if(pipe(bgpipe)<0){fprintf(stderr,_("%s:couldnotcreatepipeforbackgroundprocess:%s\n"),progname,strerror(errno));exit(1);}#endif/*Getasecondconnection*///获取第二个连接param->bgconn=GetConnection();if(!param->bgconn)/*ErrormessagealreadywritteninGetConnection()*/exit(1);/*Inpost-10cluster,pg_xloghasbeenrenamedtopg_wal*///在PG10,pg_xlog已命名为pg_walsnprintf(param->xlog,sizeof(param->xlog),"%s/%s",basedir,PQserverVersion(conn)<MINIMUM_VERSION_FOR_PG_WAL?"pg_xlog":"pg_wal");/*Temporaryreplicationslotsareonlysupportedin10andnewer*///临时复制slots只在PG10+支持if(PQserverVersion(conn)<MINIMUM_VERSION_FOR_TEMP_SLOTS)temp_replication_slot=false;/**Createreplicationslotifrequested*如要求,则创建复制slot*///staticchar*replication_slot=NULL;//staticbooltemp_replication_slot=true;if(temp_replication_slot&&!replication_slot)//创建replicationslotreplication_slot=psprintf("pg_basebackup_%d",(int)PQbackendPID(param->bgconn));if(temp_replication_slot||create_slot){//创建replicationslotif(!CreateReplicationSlot(param->bgconn,replication_slot,NULL,temp_replication_slot,true,true,false))exit(1);if(verbose){//显示诊断信息if(temp_replication_slot)fprintf(stderr,_("%s:createdtemporaryreplicationslot\"%s\"\n"),progname,replication_slot);elsefprintf(stderr,_("%s:createdreplicationslot\"%s\"\n"),progname,replication_slot);}}if(format=='p'){/**Createpg_wal/archive_statusorpg_xlog/archive_status(andthus*pg_walorpg_xlog)dependingonthetargetserversowecanwrite*tobasedir/pg_walorbasedir/pg_xlogasthedirectoryentryinthe*tarfilemayarrivelater.*基于目标服务器创建pg_wal/archive_status或pg_xlog/archive_status,*这样可以写入到basedir/pg_wal货basedir/pg_xlog,可作为后续访问的tar文件目录条目*/snprintf(statusdir,sizeof(statusdir),"%s/%s/archive_status",basedir,PQserverVersion(conn)<MINIMUM_VERSION_FOR_PG_WAL?"pg_xlog":"pg_wal");if(pg_mkdir_p(statusdir,pg_dir_create_mode)!=0&&errno!=EEXIST){fprintf(stderr,_("%s:couldnotcreatedirectory\"%s\":%s\n"),progname,statusdir,strerror(errno));exit(1);}}/**Startachildprocessandtellittostartstreaming.OnUnix,thisis*afork().OnWindows,wecreateathread.*启动子进程开始streaming.*在UNIX平台,是一个fork进程,在Windows平台,创建线程.*/#ifndefWIN32//UNIX:fork进程bgchild=fork();if(bgchild==0){//这是子进程,返回0/*inchildprocess*///启动新进程exit(LogStreamerMain(param));}elseif(bgchild<0){fprintf(stderr,_("%s:couldnotcreatebackgroundprocess:%s\n"),progname,strerror(errno));exit(1);}/**Elseweareintheparentprocessandalliswell.*在父进程中,返回的bgchild是子进程PID.*/atexit(kill_bgchild_atexit);#else/*WIN32*///WIN32:创建线程bgchild=_beginthreadex(NULL,0,(void*)LogStreamerMain,param,0,NULL);if(bgchild==0){fprintf(stderr,_("%s:couldnotcreatebackgroundthread:%s\n"),progname,strerror(errno));exit(1);}#endif}

LogStreamerMain
WAL流复制主函数,用于fork后的子进程调用

staticintLogStreamerMain(logstreamer_param*param){StreamCtlstream;//接收xlog流数据时的全局参数in_log_streamer=true;//初始化StreamCtl结构体MemSet(&stream,0,sizeof(stream));stream.startpos=param->startptr;stream.timeline=param->timeline;stream.sysidentifier=param->sysidentifier;stream.stream_stop=reached_end_position;#ifndefWIN32stream.stop_socket=bgpipe[0];#elsestream.stop_socket=PGINVALID_SOCKET;#endifstream.standby_message_timeout=standby_message_timeout;stream.synchronous=false;stream.do_sync=do_sync;stream.mark_done=true;stream.partial_suffix=NULL;stream.replication_slot=replication_slot;if(format=='p')stream.walmethod=CreateWalDirectoryMethod(param->xlog,0,do_sync);elsestream.walmethod=CreateWalTarMethod(param->xlog,compresslevel,do_sync);//接收数据if(!ReceiveXlogStream(param->bgconn,&stream))/**Anyerrorswillalreadyhavebeenreportedinthefunctionprocess,*butweneedtotelltheparentthatwedidn'tshutdowninanice*way.*在函数执行过程中出现的错误已通过警告的方式发出,*但仍需要告知父进程不能优雅的关闭本进程.*/return1;if(!stream.walmethod->finish()){fprintf(stderr,_("%s:couldnotfinishwritingWALfiles:%s\n"),progname,strerror(errno));return1;}//结束连接PQfinish(param->bgconn);//普通文件格式if(format=='p')FreeWalDirectoryMethod();elseFreeWalTarMethod();//是否内存pg_free(stream.walmethod);return0;}三、跟踪分析

备份命令

pg_basebackup-h192.168.26.25-Ureplicator-p5432-D/data/backup-P-Xs-R-v

启动gdb跟踪

[xdb@localhost~]$gdbpg_basebackupGNUgdb(GDB)RedHatEnterpriseLinux7.6.1-110.el7Copyright(C)2013FreeSoftwareFoundation,Inc.LicenseGPLv3+:GNUGPLversion3orlater<http://gnu.org/licenses/gpl.html>Thisisfreesoftware:youarefreetochangeandredistributeit.ThereisNOWARRANTY,totheextentpermittedbylaw.Type"showcopying"and"showwarranty"fordetails.ThisGDBwasconfiguredas"x86_64-redhat-linux-gnu".Forbugreportinginstructions,pleasesee:<http://www.gnu.org/software/gdb/bugs/>...Readingsymbolsfrom/appdb/atlasdb/pg11.2/bin/pg_basebackup...done.(gdb)bStartLogStreamerBreakpoint1at0x403e6b:filepg_basebackup.c,line555.(gdb)setargs-h192.168.26.25-Ureplicator-p5432-D/data/backup-P-Xs-R-v(gdb)rStartingprogram:/appdb/xdb/pg11.2/bin/pg_basebackup-h192.168.26.25-Ureplicator-p5432-D/data/backup-P-Xs-R-v[Threaddebuggingusinglibthread_dbenabled]Usinghostlibthread_dblibrary"/lib64/libthread_db.so.1".Password:pg_basebackup:initiatingbasebackup,waitingforcheckpointtocompletepg_basebackup:checkpointcompletedpg_basebackup:write-aheadlogstartpoint:0/57000060ontimeline16pg_basebackup:startingbackgroundWALreceiverBreakpoint1,StartLogStreamer(startpos=0x7fffffffdf60"0/57000060",timeline=16,sysidentifier=0x61f1a0"6666964067616600474")atpg_basebackup.c:555555param=pg_malloc0(sizeof(logstreamer_param));(gdb)

输入参数
startpos=0x7fffffffdf60 “0/57000060”,
timeline=16,
sysidentifier=0x61f1a0 “6666964067616600474”
构造参数

(gdb)n556param->timeline=timeline;(gdb)557param->sysidentifier=sysidentifier;(gdb)560if(sscanf(startpos,"%X/%X",&hi,&lo)!=2)(gdb)567param->startptr=((uint64)hi)<<32|lo;(gdb)phi$1=0(gdb)plo$2=1459617888(gdb)n569param->startptr-=XLogSegmentOffset(param->startptr,WalSegSz);(gdb)n573if(pipe(bgpipe)<0)(gdb)p*param$3={bgconn=0x0,startptr=1459617792,xlog='\000'<repeats1023times>,sysidentifier=0x61f1a0"6666964067616600474",timeline=16}(gdb)

建立连接,创建replication slot

(gdb)n583param->bgconn=GetConnection();(gdb)584if(!param->bgconn)(gdb)591PQserverVersion(conn)<MINIMUM_VERSION_FOR_PG_WAL?(gdb)589snprintf(param->xlog,sizeof(param->xlog),"%s/%s",(gdb)595if(PQserverVersion(conn)<MINIMUM_VERSION_FOR_TEMP_SLOTS)(gdb)601if(temp_replication_slot&&!replication_slot)(gdb)602replication_slot=psprintf("pg_basebackup_%d",(int)PQbackendPID(param->bgconn));(gdb)603if(temp_replication_slot||create_slot)(gdb)605if(!CreateReplicationSlot(param->bgconn,replication_slot,NULL,(gdb)609if(verbose)(gdb)611if(temp_replication_slot)(gdb)612fprintf(stderr,_("%s:createdtemporaryreplicationslot\"%s\"\n"),(gdb)pg_basebackup:createdtemporaryreplicationslot"pg_basebackup_59378"620if(format=='p')(gdb)(gdb)n630PQserverVersion(conn)<MINIMUM_VERSION_FOR_PG_WAL?(gdb)628snprintf(statusdir,sizeof(statusdir),"%s/%s/archive_status",

创建备份目录

(gdb)633if(pg_mkdir_p(statusdir,pg_dir_create_mode)!=0&&errno!=EEXIST)(gdb)p*param$4={bgconn=0x62a280,startptr=1459617792,xlog="/data/backup/pg_wal",'\000'<repeats1004times>,sysidentifier=0x61f1a0"6666964067616600474",timeline=16}(gdb)n647bgchild=fork();(gdb)#############[xdb@localhostbackup]$lspg_wal

fork进程,父进程返回子进程的PID

(gdb)n647bgchild=fork();(gdb)nDetachingafterforkfromchildprocess43001.648if(bgchild==0)(gdb)pbgchild$5=43001(gdb)

子进程(PID=43001)

[xdb@localhostbackup]$ps-ef|grep43001xdb4300142820111:54pts/100:00:01/appdb/xdb/pg11.2/bin/pg_basebackup-h192.168.26.25-Ureplicator-p5432-D/data/backup-P-Xs-R-v[xdb@localhostbackup]$ps-ef|grep192.168.26.25xdb4282042756011:48pts/100:00:00/appdb/xdb/pg11.2/bin/pg_basebackup-h192.168.26.25-Ureplicator-p5432-D/data/backup-P-Xs-R-vxdb4300142820011:54pts/100:00:01/appdb/xdb/pg11.2/bin/pg_basebackup-h192.168.26.25-Ureplicator-p5432-D/data/backup-P-Xs-R-v

完成调用

(gdb)n653elseif(bgchild<0)(gdb)672}(gdb)BaseBackup()atpg_basebackup.c:19371937for(i=0;i<PQntuples(res);i++)(gdb)

pg_wal目录中的数据

[xdb@localhostbackup]$ls-l./pg_wal/total16388-rw-------.1xdbxdb16777216Mar1811:54000000100000000000000057-rw-------.1xdbxdb217Mar1811:5400000010.historydrwx------.2xdbxdb35Mar1811:54archive_status[xdb@localhostbackup]$

到此,相信大家对“PostgreSQL中StartLogStreamer分析”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!