


2、FastLeaderElection,是标准的fast paxos算法的实现,基于TCP协议进行选举。



static public class Notification { /* * Format version, introduced in 3.4.6 */ public final static int CURRENTVERSION = 0x2; int version; /* * Proposed leader 被选举者的服务器id */ long leader; /* * zxid of the proposed leader 被选举者的事务zxid */ long zxid; /* * Epoch 选举者的选举周期 */ long electionEpoch; /* * current state of sender 选举者的节点状态 * 总共有4中 * LOOKING 寻找leader状态 * FOLLOWING 跟随者 * LEADING leader状态 *OBSERVING 不参与操作和选举 */ QuorumPeer.ServerState state; /* * Address of sender 选举者的服务器id */ long sid; QuorumVerifier qv; /* * epoch of the proposed leader 被选举者的选举周期 */ long peerEpoch; }2、ToSend类


static public class ToSend { static enum mType {crequest, challenge, notification, ack} ToSend(mType type, long leader, long zxid, long electionEpoch, ServerState state, long sid, long peerEpoch, byte[] configData) { this.leader = leader; this.zxid = zxid; this.electionEpoch = electionEpoch; this.state = state; this.sid = sid; this.peerEpoch = peerEpoch; this.configData = configData; } /* * Proposed leader in the case of notification 被推举的leader的sid */ long leader; /* * id contains the tag for acks, and zxid for notifications * 被推举的leader的最大事务id */ long zxid; /* * Epoch 选举者的选举周期 */ long electionEpoch; /* * Current state; 选举者的节点状态 */ QuorumPeer.ServerState state; /* * Address of recipient选举者的服务器sid */ long sid; /* * Used to send a QuorumVerifier (configuration info) */ byte[] configData = dummyData; /* * Leader epoch 被选举者的选举周期 */ long peerEpoch; }3、Messenger类3.1、内部类


WorkerReceiver(QuorumCnxManager manager) { super("WorkerReceiver"); this.stop = false; this.manager = manager; }//从QuorumCnxManager中的recvQueue中获取投票消息 response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(response == null) continue;


ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg);


ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);


void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); manager.toSend(m.sid, requestBuffer); }3.2、Messenger构造函数

Messenger(QuorumCnxManager manager) { //创建WorkerSender this.ws = new WorkerSender(manager); // 新创建线程 this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); // 设置为守护线程 this.wsThread.setDaemon(true); // 创建WorkerReceiver this.wr = new WorkerReceiver(manager); // 新创建线程 this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); // 设置为守护线程 this.wrThread.setDaemon(true); }2、FastLeaderElection类属性

// 完成Leader选举之后需要等待时长 final static int finalizeWait = 200; // 两个连续通知检查之间的最大时长 final static int maxNotificationInterval = 60000; // 管理服务器之间的连接 QuorumCnxManager manager; // 选票发送队列,用于保存待发送的选票 LinkedBlockingQueue<ToSend> sendqueue; // 选票接收队列,用于保存接收到的外部投票 LinkedBlockingQueue<Notification> recvqueue; //投票者 QuorumPeer self; Messenger messenger; //逻辑始终,当前选举周期 AtomicLong logicalclock = new AtomicLong(); /* Election instance */ //被选举者服务器sid long proposedLeader; //被选举者服务器zxid long proposedZxid; //被选举者服务器选举周期 long proposedEpoch;FastLeaderElection核心方法1、发送选票


private void sendNotifications() { for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); // 构造发送消息 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } // 将发送消息放置于队列 sendqueue.offer(notmsg); } }2、totalOrderPredicate函数


protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ // 1. 判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader // 2. 如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader // 3. 如果前面两个都相等那就比较服务器id,如果大,则其就是leader return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }3、termPredicate方法


protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } /* * First make the views consistent. Sometimes peers will have different * zxids for a server depending on timing. */ for (Map.Entry<Long, Vote> entry : votes.entrySet()) { if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); } } return voteSet.hasAllQuorums(); }4、lookForLeader函数



Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);


if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications();


/* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { //设置leading状态,否则设置为flowing self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); //最终选票 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); // 清空recvqueue队列的选票 leaveInstance(endVote); return endVote; }总结
