zookeeper(12)源码分析-请求处理链(2)
SyncRequestProcessor,该处理器将请求存入磁盘,其将请求批量的存入磁盘以提高效率,请求在写入磁盘之前是不会被转发到下个处理器的。
类的核心属性SyncRequestProcessor维护了ZooKeeperServer实例,其用于获取ZooKeeper的数据库和其他信息;维护了一个处理请求的队列,其用于存放请求;维护了一个处理快照的线程,用于处理快照;维护了一个running标识,标识SyncRequestProcessor是否在运行;同时还维护了一个等待被刷新到磁盘的请求队列。
// Zookeeper服务器 private final ZooKeeperServer zks; // 请求队列 private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); // 下个处理器 private final RequestProcessor nextProcessor; // 快照处理线程 private Thread snapInProcess = null; // 是否在运行中 volatile private boolean running; /** * Transactions that have been written and are waiting to be flushed to * disk. Basically this is the list of SyncItems whose callbacks will be * invoked after flush returns successfully. */ // 等待被刷新到磁盘的请求队列 private final LinkedList<Request> toFlush = new LinkedList<Request>(); // 随机数生成器 private final Random r = new Random(); /** * The number of log entries to log before starting a snapshot */ // 快照个数 private static int snapCount = ZooKeeperServer.getSnapCount(); // 结束请求标识 private final Request requestOfDeath = Request.requestOfDeath;
构造函数
构造函数首先会调用父类的构造函数,然后根据构造函数参数给类的属性赋值,其中会确定下个处理器,并会设置该处理器正在运行的标识。
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("SyncThread:" + zks.getServerId(), zks .getZooKeeperServerListener()); this.zks = zks; this.nextProcessor = nextProcessor; running = true; }
核心方法1、run
@Override public void run() { try { // 写日志数量初始化为0 int logCount = 0; // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time // 防止集群中所有机器在同一时刻进行数据快照,对是否进行数据快照增加随机因素 int randRoll = r.nextInt(snapCount/2); while (true) { Request si = null; // 没有需要刷新到磁盘的请求 if (toFlush.isEmpty()) { // 从请求队列中取出一个请求,若queuedRequests队列为空会阻塞 si = queuedRequests.take(); } else { // 从请求队列中取出一个请求,若queuedRequests队列为空,则返回空,不会阻塞 si = queuedRequests.poll(); // 取出的请求为空 if (si == null) { // 刷新数据磁盘 flush(toFlush); continue; } } // 在关闭处理器之后,会添加requestOfDeath请求到queuedRequests队列,表示关闭后不再处理请求 if (si == requestOfDeath) { break; } // 请求不为空,处理请求 if (si != null) { // track the number of records written to the log // 将写请求添加至事务日志文件 FileTxnSnapLog.append(si) if (zks.getZKDatabase().append(si)) { // 日志写入,logCount加1 logCount++; //确定是否需要进行数据快照 if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log // 滚动日志,从当前日志文件滚到下一个日志文件,不是回滚 zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { // 正在进行快照 LOG.warn("Too busy to snap, skipping"); } else { // 创建线程来处理快照 snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { // 进行快照 zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; // 开始快照线程处理 snapInProcess.start(); } // 重置为0 logCount = 0; } } else if (toFlush.isEmpty()) {// 读请求会走到这里,查看此时toFlush是否为空,如果为空,说明近段时间读多写少,直接响应 // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor if (nextProcessor != null) { // 下个处理器开始处理请求 nextProcessor.proce***equest(si); // 处理器是Flushable的,刷新数据到磁盘 if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } // 将请求添加至被刷新至磁盘队列 toFlush.add(si); if (toFlush.size() > 1000) {// 队列大小大于1000,直接刷新到磁盘 flush(toFlush); } } } } catch (Throwable t) { handleException(this.getName(), t); } finally{ running = false; } LOG.info("SyncRequestProcessor exited!"); }
2、flush
flush将toFlush队列中的请求刷新到磁盘中。
private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException { if (toFlush.isEmpty()) return; // 提交事务至ZK数据库 zks.getZKDatabase().commit(); while (!toFlush.isEmpty()) { // 从队列移除请求 Request i = toFlush.remove(); // 下个处理器开始处理请求 if (nextProcessor != null) { nextProcessor.proce***equest(i); } } if (nextProcessor != null && nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } }
3、shutdown
函数用于关闭SyncRequestProcessor处理器,其首先会在queuedRequests队列中添加一个结束请求requestOfDeath,然后再判断SyncRequestProcessor是否还在运行,若是,则会等待其结束;之后判断toFlush队列是否为空,若不为空,则刷新到磁盘中
public void shutdown() { LOG.info("Shutting down"); // 添加结束请求请求至队列 queuedRequests.add(requestOfDeath); try { // 还在运行 if(running){ this.join();// 等待该线程终止 } if (!toFlush.isEmpty()) {// 队列不为空,刷新到磁盘 flush(toFlush); } } catch(InterruptedException e) { LOG.warn("Interrupted while wating for " + this + " to finish"); } catch (IOException e) { LOG.warn("Got IO exception during shutdown"); } catch (RequestProcessorException e) { LOG.warn("Got request processor exception during shutdown"); } if (nextProcessor != null) { nextProcessor.shutdown(); } }
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。