ZooKeeperServer,为所有服务器的父类。
QuorumZooKeeperServer,其是所有参与选举的服务器的父类,是抽象类,其继承了ZooKeeperServer类。
LeaderZooKeeperServer,Leader服务器,继承了QuorumZooKeeperServer类,也会继承ZooKeeperServer中的很多方法。
LearnerZooKeeper,其是Learner服务器的父类,为抽象类,也继承了QuorumZooKeeperServer类。
FollowerZooKeeperServer,Follower服务器,继承了LearnerZooKeeper。
ObserverZooKeeperServer,Observer服务器,继承了LearnerZooKeeper。
ReadOnlyZooKeeperServer,只读服务器,不提供写服务,继承QuorumZooKeeperServer。

ZooKeeperServer1、类的继承关系

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {}

ZooKeeperServer是ZooKeeper中所有服务器的父类,其实现了Session.Expirer和ServerStats.Provider接口,SessionExpirer中定义了expire方法(表示会话过期)和getServerId方法(表示获取服务器ID),而Provider则主要定义了获取服务器某些数据的方法。

2、类属性

protected static final Logger LOG; static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); Environment.logEnv("Server environment:", LOG); } //jmx服务 protected ZooKeeperServerBean jmxServerBean; protected DataTreeBean jmxDataTreeBean; // 默认心跳频率 public static final int DEFAULT_TICK_TIME = 3000; protected int tickTime = DEFAULT_TICK_TIME; // 最小会话过期时间 /** value of -1 indicates unset, use default */ protected int minSessionTimeout = -1; // 最大会话过期时间 /** value of -1 indicates unset, use default */ protected int maxSessionTimeout = -1; protected SessionTracker sessionTracker; // 事务日志快照 private FileTxnSnapLog txnLogFactory = null; // Zookeeper内存数据库 private ZKDatabase zkDb; private final AtomicLong hzxid = new AtomicLong(0); public final static Exception ok = new Exception("No prob"); // 请求处理器 protected RequestProcessor firstProcessor; protected volatile State state = State.INITIAL; protected enum State { INITIAL, RUNNING, SHUTDOWN, ERROR } /** * This is the secret that we use to generate passwords. For the moment, * it's more of a checksum that's used in reconnection, which carries no * security weight, and is treated internally as if it carries no * security weight. */ static final private long superSecret = 0XB3415C00L; private final AtomicInteger requestsInProcess = new AtomicInteger(0); // 未处理的ChangeRecord final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>(); // this data structure must be accessed under the outstandingChanges lock // 记录path对应的ChangeRecord final HashMap<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>(); protected ServerCnxnFactory serverCnxnFactory; protected ServerCnxnFactory secureServerCnxnFactory; // 服务器统计数据 private final ServerStats serverStats; private final ZooKeeperServerListener listener; private ZooKeeperServerShutdownHandler zkShutdownHandler; private volatile int createSessionTrackerServerId = 1;3、核心函数3.1 loadData

该函数用于加载数据,其首先会判断内存库是否已经加载设置zxid,之后会调用killSession函数删除过期的会话

if(zkDb.isInitialized()){ // 内存数据库已被初始化 // 设置为最后处理的Zxid setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { // 未被初始化,则加载数据库 setZxid(zkDb.loadDataBase()); } // Clean up dead sessions LinkedList<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) {// 遍历所有的会话 if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } for (long session : deadSessions) { // 删除过期的会话 // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } // Make a clean snapshot //初始化一个快照 takeSnapshot();3.2、submitRequest

提交请求,处理器进行处理

public void submitRequest(Request si) { if (firstProcessor == null) {// 第一个处理器为空 synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. //服务器调用链还未初始化完成 while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); // 是否为合法的请求 boolean validpacket = Request.isValid(si.type); if (validpacket) { //调用链第一处理器开始处理 firstProcessor.proce***equest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().proce***equest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } }LeaderZooKeeperServer介绍1、类属性

// 提交请求处理器CommitProcessor commitProcessor;//处理链请求第一个处理处理器PrepRequestProcessor prepRequestProcessor;2、构造方法

LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self); }

直接调用父类QuorumZooKeeperServer的构造函数,然后再调用ZooKeeperServer的构造函数,逐级构造。

3、核心函数3.1、setupRequestProcessors

@Override protected void setupRequestProcessors() { //创建FinalRequestProcessor,处理链最后一个处理器 RequestProcessor finalProcessor = new FinalRequestProcessor(this); //创建ToBeAppliedRequestProcessor RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); // 创建CommitProcessor,提交处理器 commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); // 启动CommitProcessor commitProcessor.start(); // 创建ProposalRequestProcessor ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); // 初始化ProposalProcessor proposalProcessor.initialize(); //创建PrepRequestProcessor,作为以第一个处理链处理器 prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); // firstProcessor为PrepRequestProcessor firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); setupContainerManager(); }

setupRequestProcessors函数表示创建处理链,可以看到其处理链的顺序为PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor。