本文主要给大家简单讲讲etcd raft library设计原理和使用,相关专业术语大家可以上网查查或者找一些相关书籍补充一下,这里就不涉猎了,我们就直奔主题吧,希望etcd raft library设计原理和使用这篇文章可以给大家带来一些实际帮助。

这个library使用起来相对来说还是有点麻烦。官方有一个使用示例在https://github.com/coreos/etcd/tree/master/contrib/raftexample。整体来说,这个库实现了raft协议核心的内容,比如appendlog的逻辑,选主逻辑,snapshot,成员变更等逻辑。需要明确的是:library没有实现消息的网络传输和接收,库只会把一些待发送的消息保存在内存中,用户自定义的网络传输层取出消息并发送出去,并且在网络接收端,需要调一个library的函数,用于将收到的消息传入library,后面会详细说明。同时,library定义了一个Storage接口,需要library的使用者自行实现。

Storage接口如下:

//Storageisaninterfacethatmaybeimplementedbytheapplication//toretrievelogentriesfromstorage.////IfanyStoragemethodreturnsanerror,theraftinstancewill//becomeinoperableandrefusetoparticipateinelections;the//applicationisresponsibleforcleanupandrecoveryinthiscase.typeStorageinterface{//InitialStatereturnsthesavedHardStateandConfStateinformation.InitialState()(pb.HardState,pb.ConfState,error)//Entriesreturnsasliceoflogentriesintherange[lo,hi).//MaxSizelimitsthetotalsizeofthelogentriesreturned,but//Entriesreturnsatleastoneentryifany.Entries(lo,hi,maxSizeuint64)([]pb.Entry,error)//Termreturnsthetermofentryi,whichmustbeintherange//[FirstIndex()-1,LastIndex()].Thetermoftheentrybefore//FirstIndexisretainedformatchingpurposeseventhoughthe//restofthatentrymaynotbeavailable.Term(iuint64)(uint64,error)//LastIndexreturnstheindexofthelastentryinthelog.LastIndex()(uint64,error)//FirstIndexreturnstheindexofthefirstlogentrythatis//possiblyavailableviaEntries(olderentrieshavebeenincorporated//intothelatestSnapshot;ifstorageonlycontainsthedummyentrythe//firstlogentryisnotavailable).FirstIndex()(uint64,error)//Snapshotreturnsthemostrecentsnapshot.//Ifsnapshotistemporarilyunavailable,itshouldreturnErrSnapshotTemporarilyUnavailable,//soraftstatemachinecouldknowthatStorageneedssometimetoprepare//snapshotandcallSnapshotlater.Snapshot()(pb.Snapshot,error)}

这些接口在library中会被用到。熟悉raft协议的人不难理解。上面提到的官方示例https://github.com/coreos/etcd/tree/master/contrib/raftexample中使用了library自带的MemoryStorage,和etcd的wal和snap包做持久化,重启的时候从wal和snap中获取日志恢复MemoryStorage。

要提供这种IO/网络密集型的东西,提高吞吐最好的手段就是batch加批处理了。etcd raft library正是这么做的。

下面看一下为了做这事,etcd提供的核心抽象Ready结构体:

//Readyencapsulatestheentriesandmessagesthatarereadytoread,//besavedtostablestorage,committedorsenttootherpeers.//AllfieldsinReadyareread-only.typeReadystruct{//ThecurrentvolatilestateofaNode.//SoftStatewillbenilifthereisnoupdate.//ItisnotrequiredtoconsumeorstoreSoftState.*SoftState//ThecurrentstateofaNodetobesavedtostablestorageBEFORE//Messagesaresent.//HardStatewillbeequaltoemptystateifthereisnoupdate.pb.HardState//ReadStatescanbeusedfornodetoservelinearizablereadrequestslocally//whenitsappliedindexisgreaterthantheindexinReadState.//NotethatthereadStatewillbereturnedwhenraftreceivesmsgReadIndex.//Thereturnedisonlyvalidfortherequestthatrequestedtoread.ReadStates[]ReadState//EntriesspecifiesentriestobesavedtostablestorageBEFORE//Messagesaresent.Entries[]pb.Entry//Snapshotspecifiesthesnapshottobesavedtostablestorage.Snapshotpb.Snapshot//CommittedEntriesspecifiesentriestobecommittedtoa//store/state-machine.Thesehavepreviouslybeencommittedtostable//store.CommittedEntries[]pb.Entry//MessagesspecifiesoutboundmessagestobesentAFTEREntriesare//committedtostablestorage.//IfitcontainsaMsgSnapmessage,theapplicationMUSTreportbacktoraft//whenthesnapshothasbeenreceivedorhasfailedbycallingReportSnapshot.Messages[]pb.Message//MustSyncindicateswhethertheHardStateandEntriesmustbesynchronously//writtentodiskorifanasynchronouswriteispermissible.MustSyncbool}

可以说,这个Ready结构体封装了一批更新,这些更新包括:

pb.HardState: 包含当前节点见过的最大的term,以及在这个term给谁投过票,已经当前节点知道的commit index

Messages: 需要广播给所有peers的消息

CommittedEntries:已经commit了,还没有apply到状态机的日志

Snapshot:需要持久化的快照

库的使用者从node结构体提供的一个ready channel中不断的pop出一个个的Ready进行处理,库使用者通过如下方法拿到Ready channel:

func(n*node)Ready()<-chanReady{returnn.readyc}

应用需要对Ready的处理包括:

将HardState, Entries, Snapshot持久化到storage。

将Messages(上文提到的msgs)非阻塞的广播给其他peers

将CommittedEntries(已经commit还没有apply)应用到状态机。

如果发现CommittedEntries中有成员变更类型的entry,调用node的ApplyConfChange()方法让node知道(这里和raft论文不一样,论文中只要节点收到了成员变更日志就应用)

调用Node.Advance()告诉raft node,这批状态更新处理完了,状态已经演进了,可以给我下一批Ready让我处理。

应用通过raft.StartNode()来启动raft中的一个副本,函数内部通过启动一个goroutine运行

func(n*node)run(r*raft)

来启动服务。

应用通过调用

func(n*node)Propose(ctxcontext.Context,data[]byte)error

来Propose一个请求给raft,被raft开始处理后返回。

增删节点通过调用

func(n*node)ProposeConfChange(ctxcontext.Context,ccpb.ConfChange)error

node结构体包含几个重要的channel:

//nodeisthecanonicalimplementationoftheNodeinterfacetypenodestruct{propcchanpb.Messagerecvcchanpb.Messageconfcchanpb.ConfChangeconfstatecchanpb.ConfStatereadycchanReadyadvancecchanstruct{}tickcchanstruct{}donechanstruct{}stopchanstruct{}statuschanchanStatusloggerLogger}

propc: propc是一个没有buffer的channel,应用通过Propose接口写入的请求被封装成Message被push到propc中,node的run方法从propc中pop出Message,append自己的raft log中,并且将Message放入mailbox中(raft结构体中的msgs []pb.Message),这个msgs会被封装在Ready中,被应用从readyc中取出来,然后通过应用自定义的transport发送出去。

recvc: 应用自定义的transport在收到Message后需要调用

func(n*node)Step(ctxcontext.Context,mpb.Message)error

来把Message放入recvc中,经过一些处理后,同样,会把需要发送的Message放入到对应peers的mailbox中。后续通过自定义transport发送出去。

readyc/advancec: readyc和advancec都是没有buffer的channel,node.run()内部把相关的一些状态更新打包成Ready结构体(其中一种状态就是上面提到的msgs)放入readyc中。应用从readyc中pop出Ready中,对相应的状态进行处理,处理完成后,调用

rc.node.Advance()

往advancec中push一个空结构体告诉raft,已经对这批Ready包含的状态进行了相应的处理,node.run()内部从advancec中得到通知后,对内部一些状态进行处理,比如把已经持久化到storage中的entries从内存(对应type unstable struct)中删除等。

tickc:应用定期往tickc中push空结构体,node.run()会调用tick()函数,对于leader来说,tick()会给其他peers发心跳,对于follower来说,会检查是否需要发起选主操作。

confc/confstatec:应用从Ready中拿出CommittedEntries,检查其如果含有成员变更类型的日志,则需要调用

func(n*node)ApplyConfChange(ccpb.ConfChange)*pb.ConfState

这个函数会push ConfChange到confc中,confc同样是个无buffer的channel,node.run()内部会从confc中拿出ConfChange,然后进行真正的增减peers操作,之后将最新的成员组push到confstatec中,而ApplyConfChange函数从confstatec pop出最新的成员组返回给应用。

etcd raft library设计原理和使用就先给大家讲到这里,对于其它相关问题大家想要了解的可以持续关注我们的行业资讯。我们的板块内容每天都会捕捉一些行业新闻及专业知识分享给大家的。