zookeeper(13)源码分析-请求处理链(3)
FinalRequestProcessor是请求处理链中最后的一个处理器。
public class FinalRequestProcessor implements RequestProcessor { ZooKeeperServer zks;}
FinalRequestProcessor只实现了RequestProcessor接口,需要实现process Request方法和shutdown方法。
核心属性为zks,表示Zookeeper服务器,可以通过zks访问到Zookeeper内存数据库。
我们看一下核心方法process Request代码:
同步代码块synchronized (zks.outstandingChanges) { // Need to process local session requests // 当前节点,处理请求,若为事务性请求,则提交到ZooKeeper内存数据库中。 // 对于processTxn函数而言,其最终会调用DataTree的processTxn rc = zks.processTxn(request); // request.hdr is set for write requests, which are the only ones // that add to outstandingChanges. //只有写请求才会有消息头 if (request.getHdr() != null) { TxnHeader hdr = request.getHdr(); Record txn = request.getTxn(); long zxid = hdr.getZxid(); //当outstandingChanges不为空且其首元素的zxid小于等于请求的zxid时, // 就会一直从outstandingChanges中取出首元素,并且对outstandingChangesForPath做相应的操作 while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.peek().zxid <= zxid) { ChangeRecord cr = zks.outstandingChanges.remove(); if (cr.zxid < zxid) { LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + zxid); } if (zks.outstandingChangesForPath.get(cr.path) == cr) { zks.outstandingChangesForPath.remove(cr.path); } } } // do not add non quorum packets to the queue. //判断是否为事务性请求则是通过调用isQuorum函数 //只将quorum包(事务性请求)添加进队列 //addCommittedProposal函数将请求添加至ZKDatabase的committedLog结构中 if (request.isQuorum()) { zks.getZKDatabase().addCommittedProposal(request); } }
如果请求是ping
根据请求的创建时间来更新Zookeeper服务器的延迟,updateLatency函数中会记录最大延迟、最小延迟、总的延迟和延迟次数。
然后更新响应中的状态,如请求创建到响应该请求总共花费的时间、最后的操作类型等。然后设置响应后返回
case OpCode.ping: { //更新延迟 zks.serverStats().updateLatency(request.createTime); lastOp = "PING"; // 更新响应的状态 cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, request.createTime, Time.currentElapsedTime()); // 设置响应 cnxn.sendResponse(new ReplyHeader(-2, zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response"); return; }
其他请求与此类似,
最后会根据其他请求再次更新服务器的延迟,设置响应的状态等
// 获取最后处理的zxidlong lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();// 响应头 ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());// 更新服务器延迟 zks.serverStats().updateLatency(request.createTime); // 更新状态 cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());
最后使用sendResponse函数将响应发送给请求方。
try { //返回相应 cnxn.sendResponse(hdr, rsp, "response"); if (request.type == OpCode.closeSession) { //关闭会话 cnxn.sendCloseSession(); } } catch (IOException e) { LOG.error("FIXMSG",e); }
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。