这篇文章给大家分享的是raft的实际运用,相信大部分人都还没学会这个技能,为了让大家学会,给大家总结了以下内容,话不多说,一起往下看吧。

1、raft.go 的raft结构体 补充字段。 字段应该尽量与raft论文的Figure2接近。

typeRaftstruct{musync.Mutex//Locktoprotectsharedaccesstothispeer'sstatepeers[]*labrpc.ClientEnd//RPCendpointsofallpeerspersister*Persister//Objecttoholdthispeer'spersistedstatemeint//thispeer'sindexintopeers[]deadint32//setbyKill()//Yourdatahere(2A,2B,2C).//Lookatthepaper'sFigure2foradescriptionofwhat//stateaRaftservermustmaintain.stateint//follower,candidateorleaderresetTimerchanstruct{}//forresetelectiontimerelectionTimer*time.Timer//electiontimerelectionTimeouttime.Duration//400~800msheartbeatIntervaltime.Duration//100msCurrentTermint//PersistedbeforerespondingtoRPCsVotedForint//PersistedbeforerespondingtoRPCsLogs[]LogEntry//PersistedbeforerespondingtoRPCscommitCond*sync.Cond//forcommitIndexupdate//newEntryCond[]*sync.Cond//fornewlogentrycommitIndexint//VolatilestateonallserverslastAppliedint//VolatilestateonallserversnextIndex[]int//Leaderonly,reinitializedafterelectionmatchIndex[]int//Leaderonly,reinitializedafterelectionapplyChchanApplyMsg//outgoingchanneltoserviceshutdownChchanstruct{}//shutdownchannel,shutraftinstancegracefully}获取当前raft节点的term与状态

func(rf*Raft)GetState()(int,bool){vartermintvarisleaderbool//Yourcodehere(2A).rf.mu.Lock()deferrf.mu.Unlock()term=rf.CurrentTermisleader=rf.state==Leaderreturnterm,isleader}2、填充RequestVoteArgs和RequestVoteReply结构。

typeRequestVoteArgsstruct{//Yourdatahere(2A,2B).Termint//candidate'stermCandidateIDint//candidaterequestingvoteLastLogIndexint//indexofcandidate'slastlogentryLastLogTermint//termofcandidate'slastlogentry}typeRequestVoteReplystruct{//Yourdatahere(2A).CurrentTermint//currentTerm,forcandidatetoupdateitselfVoteGrantedbool//truemeanscandidatereceivedvote}实现RPC方法RequestVote

1、获取当前节点的log个数,以及最后一个log的term 确定当前节点的term。

2、如果调用节点的term小于当前节点,返回当前term,并且不为其投票。

3、如果调用节点的term大于当前节点,修改当前节点的term,当前节点转为follower.

4、如果调用节点的term大于当前节点,或者等于当前节点term并且调用节点的log个数大于等于当前节点的log,则为调用节点投票。

5、投票后重置当前节点的选举超时时间。

func(rf*Raft)RequestVote(args*RequestVoteArgs,reply*RequestVoteReply){//Yourcodehere(2A,2B).select{case<-rf.shutdownCh:DPrintf("[%d-%s]:peer%disshuttingdown,rejectRVrpcrequest.\n",rf.me,rf,rf.me)returndefault:}rf.mu.Lock()deferrf.mu.Unlock()lastLogIdx,lastLogTerm:=rf.lastLogIndexAndTerm()DPrintf("[%d-%s]:rpcRV,frompeer:%d,argterm:%d,myterm:%d(lastlogidx:%d->%d,term:%d->%d)\n",rf.me,rf,args.CandidateID,args.Term,rf.CurrentTerm,args.LastLogIndex,lastLogIdx,args.LastLogTerm,lastLogTerm)ifargs.Term<rf.CurrentTerm{reply.CurrentTerm=rf.CurrentTermreply.VoteGranted=false}else{ifargs.Term>rf.CurrentTerm{//converttofollowerrf.CurrentTerm=args.Termrf.state=Followerrf.VotedFor=-1}//ifisnull(follower)oritselfisacandidate(orstaleleader)withsametermifrf.VotedFor==-1{//||(rf.VotedFor==rf.me&&!sameTerm){//||rf.votedFor==args.CandidateID{//checkwhethercandidate'slogisat-least-asupdateif(args.LastLogTerm==lastLogTerm&&args.LastLogIndex>=lastLogIdx)||args.LastLogTerm>lastLogTerm{rf.resetTimer<-struct{}{}rf.state=Followerrf.VotedFor=args.CandidateIDreply.VoteGranted=trueDPrintf("[%d-%s]:peer%dvotetopeer%d(lastlogidx:%d->%d,term:%d->%d)\n",rf.me,rf,rf.me,args.CandidateID,args.LastLogIndex,lastLogIdx,args.LastLogTerm,lastLogTerm)}}}}修改make

除了一些基本的初始化过程,新开了一个goroutine。

funcMake(peers[]*labrpc.ClientEnd,meint,persister*Persister,applyChchanApplyMsg)*Raft{rf:=&Raft{}rf.peers=peersrf.persister=persisterrf.me=merf.applyCh=applyCh//Yourinitializationcodehere(2A,2B,2C).rf.state=Followerrf.VotedFor=-1rf.Logs=make([]LogEntry,1)//firstindexis1rf.Logs[0]=LogEntry{//placeholderTerm:0,Command:nil,}rf.nextIndex=make([]int,len(peers))rf.matchIndex=make([]int,len(peers))rf.electionTimeout=time.Millisecond*time.Duration(400+rand.Intn(100)*4)rf.electionTimer=time.NewTimer(rf.electionTimeout)rf.resetTimer=make(chanstruct{})rf.shutdownCh=make(chanstruct{})//shutdownraftgracefullyrf.commitCond=sync.NewCond(&rf.mu)//commitCh,adistinctgoroutinerf.heartbeatInterval=time.Millisecond*40//smallenough,nottoosmall//initializefromstatepersistedbeforeacrashrf.readPersist(persister.ReadRaftState())gorf.electionDaemon()//kickoffelectionreturnrf}选举核心electionDaemon

除了shutdown,还有两个通道,一个是electionTimer,用于选举超时。

一个是resetTimer,用于重置选举超时。

注意time.reset是很难正确使用的。

一旦选举超时,调用go rf.canvassVotes()

//electionDaemonfunc(rf*Raft)electionDaemon(){for{select{case<-rf.shutdownCh:DPrintf("[%d-%s]:peer%disshuttingdownelectionDaemon.\n",rf.me,rf,rf.me)returncase<-rf.resetTimer:if!rf.electionTimer.Stop(){<-rf.electionTimer.C}rf.electionTimer.Reset(rf.electionTimeout)case<-rf.electionTimer.C:rf.mu.Lock()DPrintf("[%d-%s]:peer%delectiontimeout,issueelection@term%d\n",rf.me,rf,rf.me,rf.CurrentTerm)rf.mu.Unlock()gorf.canvassVotes()rf.electionTimer.Reset(rf.electionTimeout)}}}拉票

replyHandler是进行请求返回后的处理。

当前节点为了成为leader,会调用每一个节点的RequestVote方法。

如果返回过来的term大于当前term,那么当前节点变为follower,重置选举超时时间。

否则,如果收到了超过一半节点的投票,那么其变为了leader,并立即给其他节点发送心跳检测。

//canvassVotesissuesRequestVoteRPCfunc(rf*Raft)canvassVotes(){varvoteArgsRequestVoteArgsrf.fillRequestVoteArgs(&voteArgs)peers:=len(rf.peers)varvotes=1replyHandler:=func(reply*RequestVoteReply){rf.mu.Lock()deferrf.mu.Unlock()ifrf.state==Candidate{ifreply.CurrentTerm>voteArgs.Term{rf.CurrentTerm=reply.CurrentTermrf.turnToFollow()//rf.persist()rf.resetTimer<-struct{}{}//resettimerreturn}ifreply.VoteGranted{ifvotes==peers/2{rf.state=Leaderrf.resetOnElection()//resetleaderstategorf.heartbeatDaemon()//newleader,startheartbeatdaemonDPrintf("[%d-%s]:peer%dbecomenewleader.\n",rf.me,rf,rf.me)return}votes++}}}fori:=0;i<peers;i++{ifi!=rf.me{gofunc(nint){varreplyRequestVoteReplyifrf.sendRequestVote(n,&voteArgs,&reply){replyHandler(&reply)}}(i)}}}心跳检测

1、leader调用每一个节点的AppendEntries方法。

2、如果当前节点大于调用节点,那么AppendEntries失败。否则,修改当前的term为最大。

3、如果当前节点是leader,始终将其变为follower(为了让leader稳定)

4、将当前节点投票给调用者(对于落后的节点)。

5、重置当前节点的超时时间。

func(rf*Raft)heartbeatDaemon(){for{if_,isLeader:=rf.GetState();!isLeader{return}//resetleader'selectiontimerrf.resetTimer<-struct{}{}select{case<-rf.shutdownCh:returndefault:fori:=0;i<len(rf.peers);i++{ifi!=rf.me{gorf.consistencyCheck(i)//routineheartbeat}}}time.Sleep(rf.heartbeatInterval)}}func(rf*Raft)consistencyCheck(nint){rf.mu.Lock()deferrf.mu.Unlock()pre:=rf.nextIndex[n]-1varargs=AppendEntriesArgs{Term:rf.CurrentTerm,LeaderID:rf.me,PrevLogIndex:pre,PrevLogTerm:rf.Logs[pre].Term,Entries:nil,LeaderCommit:rf.commitIndex,}gofunc(){DPrintf("[%d-%s]:consistencyChecktopeer%d.\n",rf.me,rf,n)varreplyAppendEntriesReplyifrf.sendAppendEntries(n,&args,&reply){rf.consistencyCheckReplyHandler(n,&reply)}}()}func(rf*Raft)AppendEntries(args*AppendEntriesArgs,reply*AppendEntriesReply){select{case<-rf.shutdownCh:DPrintf("[%d-%s]:peer%disshuttingdown,rejectAErpcrequest.\n",rf.me,rf,rf.me)returndefault:}DPrintf("[%d-%s]:rpcAE,frompeer:%d,term:%d\n",rf.me,rf,args.LeaderID,args.Term)rf.mu.Lock()deferrf.mu.Unlock()ifargs.Term<rf.CurrentTerm{//DPrintf("[%d-%s]:AEfailedfromleader%d.(heartbeat:leader'sterm<follower'sterm(%d<%d))\n",//rf.me,rf,args.LeaderID,args.Term,rf.currentTerm)reply.CurrentTerm=rf.CurrentTermreply.Success=falsereturn}ifrf.CurrentTerm<args.Term{rf.CurrentTerm=args.Term}//forstaleleaderifrf.state==Leader{rf.turnToFollow()}//forstraggler(follower)ifrf.VotedFor!=args.LeaderID{rf.VotedFor=args.LeaderID}//validAE,resetelectiontimer//ifthenoderecieveheartbeat.thenitwillresettheelectiontimeoutrf.resetTimer<-struct{}{}reply.Success=truereply.CurrentTerm=rf.CurrentTermreturn}处理心跳检测返回

如果心跳检测失败了,那么变为follower,重置选举超时。

//n:whichfollowerfunc(rf*Raft)consistencyCheckReplyHandler(nint,reply*AppendEntriesReply){rf.mu.Lock()deferrf.mu.Unlock()ifrf.state!=Leader{return}ifreply.Success{}else{//foundanewleader?turntofollowerifrf.state==Leader&&reply.CurrentTerm>rf.CurrentTerm{rf.turnToFollow()rf.resetTimer<-struct{}{}DPrintf("[%d-%s]:leader%dfoundnewterm(heartbeatrespfrompeer%d),turntofollower.",rf.me,rf,rf.me,n)return}}}

看完上述内容,你们掌握raft的运用方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!