对于请求处理链而言,所有请求处理器的父接口为RequestProcessor。

RequestProcessor内部类RequestProcessorException,用来表示处理过程中的出现的异常,而proceequest和shutdown方法则是核心方法,是子类必须要实现的方法,处理的主要逻辑在proceequest中,通过proce***equest方法可以将请求传递到下个处理器。而shutdown表示关闭处理器,其意味着该处理器要关闭和其他处理器的连接。

public interface RequestProcessor { @SuppressWarnings("serial") public static class RequestProcessorException extends Exception { public RequestProcessorException(String msg, Throwable t) { super(msg, t); } } void proce***equest(Request request) throws RequestProcessorException; void shutdown();}

实现RequestProcessor的processor有很多,PrepRequestProcessor,通常是请求处理链的第一个处理器。

PrepRequestProcessor1、类的定义

public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {}

PrepRequestProcessor继承了ZooKeeperCriticalThread类并实现了RequestProcessor接口,表示其可以作为线程使用。

2、类核心成员

//已提交的请求队列 LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); //下一个处理器 private final RequestProcessor nextProcessor; // zk服务器 ZooKeeperServer zks;3、核心函数3.1、run

while (true) { //从队列获取请求 Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } //requestOfDeath类型的请求,代表当前处理器已经关闭,不再处理请求。 if (Request.requestOfDeath == request) { break; } //调用关键函数 pRequest(request); }3.2、pRequest

pRequest会确定请求类型,并根据请求类型不同生成不同的请求对象,我们以创建节点为例子分析

//设置消息头和事务为空 request.setHdr(null); request.setTxn(null); try { switch (request.type) { case OpCode.createContainer: case OpCode.create: case OpCode.create2: //创建节点请求 CreateRequest create2Request = new CreateRequest(); //处理请求 pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break; //省略其他代码 //给请求的zxid赋值 request.zxid = zks.getZxid(); //交给下一个处理器继续处理 nextProcessor.proce***equest(request);

pRequest2Txn函数是实际的处理请求的函数,对于创建方法会调用pRequest2TxnCreate函数

//设置请求头request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); switch (type) { case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { pRequest2TxnCreate(type, request, record, deserialize); break; }

pRequest2TxnCreate方法如下:

private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { if (deserialize) { //反序列化,将ByteBuffer转化为Record ByteBufferInputStream.byteBuffer2Record(request.request, record); } int flags; String path; List<ACL> acl; byte[] data; long ttl; if (type == OpCode.createTTL) { CreateTTLRequest createTtlRequest = (CreateTTLRequest)record; flags = createTtlRequest.getFlags(); path = createTtlRequest.getPath(); acl = createTtlRequest.getAcl(); data = createTtlRequest.getData(); ttl = createTtlRequest.getTtl(); } else { //转换createRequest对象 CreateRequest createRequest = (CreateRequest)record; flags = createRequest.getFlags(); path = createRequest.getPath(); acl = createRequest.getAcl(); data = createRequest.getData(); ttl = -1; } CreateMode createMode = CreateMode.fromFlag(flags); validateCreateRequest(path, createMode, request, ttl); //获取父节点路径 String parentPath = validatePathForCreate(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, acl); //获取父节点的record ChangeRecord parentRecord = getRecordForPath(parentPath); //检查ACL列表 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); //是否创建顺序节点 if (createMode.isSequential()) { //子路径后追加一串数字,顺序的 path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL; //父节点不能是临时节点 if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } //新的子节点版本号 int newCversion = parentRecord.stat.getCversion()+1; //新生事务 if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl)); } else { request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); } StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { //是否临时节点 s.setEphemeralOwner(request.sessionId); } //拷贝 parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); //子节点数量+1 parentRecord.childCount++; //设置新版本号 parentRecord.stat.setCversion(newCversion); //将parentRecord添加至outstandingChanges和outstandingChangesForPath中 addChangeRecord(parentRecord); // 将新生成的ChangeRecord(包含了StatPersisted信息)添加至outstandingChanges和outstandingChangesForPath中 addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL)); }

addChangeRecord函数将ChangeRecord添加至ZooKeeperServer的outstandingChanges和outstandingChangesForPath中。

private void addChangeRecord(ChangeRecord c) { synchronized (zks.outstandingChanges) { zks.outstandingChanges.add(c); zks.outstandingChangesForPath.put(c.path, c); } }

outstandingChanges 位于ZooKeeperServer 中,用于存放刚进行更改还没有同步到ZKDatabase中的节点信息。

znode节点会由于用户的读写操作频繁发生变化,为了提升数据的访问效率,ZooKeeper中有一个三层的数据缓冲层用于存放节点数据。

outstandingChanges->ZKDatabase->FileSnap+FileTxnLog