zookeeper(9)源码分析-事件监听Watcher(2)
接着上一篇文章,继续分析和Watcher相关的类的源码。
ClientWatchManagerpublic Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path);
该接口,只有一个方法,需要实现。该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。
ZKWatchManager1、ZKWatchManager是ZooKeeper的内部类,实现了ClientWatchManager。
2、ZKWatchManager定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。
static class ZKWatchManager implements ClientWatchManager { //数据变化的watchers private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); //节点存在与否的watchers private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); //子节点变化的watchers private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
3、materialize方法
该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。
@Override public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { //返回结果集合 Set<Watcher> result = new HashSet<Watcher>(); switch (type) {//事件类型 case None://无类型 //添加默认watcher result.add(defaultWatcher); //根据disableAutoWatchReset和Zookeeper的状态是否为同步连接判断是否需要清空 boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected; //针对3个不同的watcherMap进行操作 synchronized(dataWatches) { for(Set<Watcher> ws: dataWatches.values()) { // 添加至结果集合 result.addAll(ws); } if (clear) { // 是否需要清空 dataWatches.clear(); } } synchronized(existWatches) { for(Set<Watcher> ws: existWatches.values()) { result.addAll(ws); } if (clear) { existWatches.clear(); } } synchronized(childWatches) { for(Set<Watcher> ws: childWatches.values()) { result.addAll(ws); } if (clear) { childWatches.clear(); } } return result; case NodeDataChanged:// 节点数据变化 case NodeCreated:// 创建节点 synchronized (dataWatches) { //移除clientPath对应的Watcher后全部添加至结果集合 addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { //移除clientPath对应的Watcher后全部添加至结果集合 addTo(existWatches.remove(clientPath), result); } break; case NodeChildrenChanged: // 节点子节点变化 synchronized (childWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(childWatches.remove(clientPath), result); } break; case NodeDeleted:// 删除节点 synchronized (dataWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(dataWatches.remove(clientPath), result); } // XXX This shouldn't be needed, but just in case synchronized (existWatches) { Set<Watcher> list = existWatches.remove(clientPath); if (list != null) { addTo(list, result); LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!"); } } synchronized (childWatches) { //移除clientPath对应的Watcher后全部添加至结果集合 addTo(childWatches.remove(clientPath), result); } break; default: String msg = "Unhandled watch event type " + type + " with state " + state + " on path " + clientPath; LOG.error(msg); throw new RuntimeException(msg); } return result; } }
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。