Zookeeper的java实例
还是在之前的模块中写这个例子:
注意在pom.xml中加上Zookeeper的依赖,
现在开始写ZookeeperDemo.java
importorg.apache.log4j.Logger;importorg.apache.zookeeper.WatchedEvent;importorg.apache.zookeeper.Watcher;importorg.apache.zookeeper.ZooKeeper;importjava.io.IOException;importjava.util.concurrent.CountDownLatch;publicclassZookeeperDemoimplementsWatcher{Loggerlogger=Logger.getLogger(ZookeeperDemo.class);protectedCountDownLatchcountDownLatch=newCountDownLatch(1);//缓存时间privatestaticfinalintSESSION_TIME=2000;publicstaticZooKeeperzooKeeper=null;/***监控所有被触发的事件*@paramwatchedEvent*/publicvoidprocess(WatchedEventwatchedEvent){logger.info("收到事件通知:"+watchedEvent.getState());if(watchedEvent.getState()==Event.KeeperState.SyncConnected){countDownLatch.countDown();}}publicvoidconnect(Stringhosts){try{if(zooKeeper==null){//zk客户端允许我们将ZK服务的所有地址进行配置zooKeeper=newZooKeeper(hosts,SESSION_TIME,this);//使用countDownLatch的awaitcountDownLatch.await();}}catch(IOExceptione){logger.error("连接创建失败,发生IOException:"+e.getMessage());}catch(InterruptedExceptione){logger.error("连接创建失败,发生InterruptedException:"+e.getMessage());}}/***关闭连接*/publicvoidclose(){try{if(zooKeeper!=null){zooKeeper.close();}}catch(InterruptedExceptione){logger.error("释放连接错误:"+e.getMessage());}}}
我们详细解释一下为什么要有这个类:
这个类是实现了Watcher接口:Watcher机制:目的是为ZK客户端操作提供一种类似于异步获取数据的操作。采用Watcher方式来完成对节点状态的监视,通过对/hotsname节点的子节点变化事件的监听来完成这一目标。监听进程是作为一个独立的服务或者进程运行的,它覆盖了 process 方法来实现应急措施。
这里面涉及到的类:CountDownLatch:CountDownLatch是一个同步的工具类,允许一个或多个线程一直等待,直到其他线程的操作执行完成后再执行。在Java并发中,countdownLatch是一个常见的概念。CountDownLatch是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它们都存在于java.util.concurrent包下。
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
下面是本例里面用到的CountDownLatch的构造方法和其注释:
/***Constructsa{@codeCountDownLatch}initializedwiththegivencount.**@paramcountthenumberoftimes{@link#countDown}mustbeinvoked*beforethreadscanpassthrough{@link#await}*@throwsIllegalArgumentExceptionif{@codecount}isnegative*/publicCountDownLatch(intcount){if(count<0)thrownewIllegalArgumentException("count<0");this.sync=newSync(count);}
这段释义应该是说:以给定的count值来初始化一个countDownLatch对象。其中count是指等待的线程在count个线程执行完成后再执行。CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
那么count(计数值)实际上就是闭锁需要等待的线程数量。
与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他N 个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。
更多CountDownLatch类可以参考:http://www.importnew.com/15731.html
Zookeeper类【Zookeeper 中文API 】:http://www.cnblogs.com/ggjucheng/p/3370359.html
下面这是本例用到的Zookeeper的构造方法:第一个是主机地址,第二个是会话超时时间、第三个是监视者。
ZooKeeper(StringconnectStringsessionTimeoutWatcherwatcher)IOException{(connectStringsessionTimeoutwatcher)}
再来看第二个类:ZookeeperOperation.java。确切的说,这个名字改错了,应该叫ZNodeOperation。因为这就是对节点进行操作的一个类:
importorg.apache.log4j.Logger;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooDefs;importorg.apache.zookeeper.data.Stat;importjava.util.List;publicclassZookeeperOperation{Loggerlogger=Logger.getLogger(ZookeeperOperation.class);ZookeeperDemozookeeperDemo=newZookeeperDemo();/***创建节点*@parampath节点路径*@paramdata节点内容*@return*/publicbooleancreateZNode(Stringpath,Stringdata){try{StringzkPath=ZookeeperDemo.zooKeeper.create(path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);logger.info("Zookeeper创建节点成功,节点地址:"+zkPath);returntrue;}catch(KeeperExceptione){logger.error("创建节点失败:"+e.getMessage()+",path:"+path,e);}catch(InterruptedExceptione){logger.error("创建节点失败:"+e.getMessage()+",path:"+path,e);}returnfalse;}/***删除一个节点*@parampath节点路径*@return*/publicbooleandeteleZKNode(Stringpath){try{ZookeeperDemo.zooKeeper.delete(path,-1);logger.info("Zookeeper删除节点1成功,节点地址:"+path);returntrue;}catch(InterruptedExceptione){logger.error("删除节点失败:"+e.getMessage()+",path:"+path,e);}catch(KeeperExceptione){logger.error("删除节点失败:"+e.getMessage()+",path:"+path,e);}returnfalse;}/***更新节点内容*@parampath节点路径*@paramdata节点数据*@return*/publicbooleanupdateZKNodeData(Stringpath,Stringdata){try{Statstat=ZookeeperDemo.zooKeeper.setData(path,data.getBytes(),-1);logger.info("更新节点数据成功,path:"+path+",stat:"+stat);returntrue;}catch(KeeperExceptione){logger.error("更新节点数据失败:"+e.getMessage()+",path:"+path,e);}catch(InterruptedExceptione){logger.error("更新节点数据失败:"+e.getMessage()+",path:"+path,e);}returnfalse;}/***读取指定节点的内容*@parampath指定的路径*@return*/publicStringreadData(Stringpath){Stringdata=null;try{data=newString(ZookeeperDemo.zooKeeper.getData(path,false,null));logger.info("读取数据成功,其中path:"+path+",data-content:"+data);}catch(KeeperExceptione){logger.error("读取数据失败,发生KeeperException!path:"+path+",errMsg:"+e.getMessage(),e);}catch(InterruptedExceptione){logger.error("读取数据失败,InterruptedException!path:"+path+",errMsg:"+e.getMessage(),e);}returndata;}/***获取某个节点下的所有节点*@parampath节点路径*@return*/publicList<String>getChild(Stringpath){try{List<String>list=ZookeeperDemo.zooKeeper.getChildren(path,false);if(list.isEmpty()){logger.info(path+"的路径下没有节点");}returnlist;}catch(KeeperExceptione){logger.error("读取子节点数据失败,发生KeeperException!path:"+path+",errMsg:"+e.getMessage(),e);}catch(InterruptedExceptione){logger.error("读取子节点数据失败,InterruptedException!path:"+path+",errMsg:"+e.getMessage(),e);}returnnull;}publicbooleanisExists(Stringpath){try{Statstat=ZookeeperDemo.zooKeeper.exists(path,false);returnnull!=stat;}catch(KeeperExceptione){logger.error("读取数据失败,发生KeeperException!path:"+path+",errMsg:"+e.getMessage(),e);}catch(InterruptedExceptione){logger.error("读取数据失败,发生InterruptedException!path:"+path+",errMsg:"+e.getMessage(),e);}returnfalse;}}
其中,有个creatZNode()方法中,有用到:
StringzkPath=ZookeeperDemo.zooKeeper.create(path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
这个方法是这样的:Stringcreate(Stringpath, byte[]data,List<ACL>acl,CreateModecreateMode);
创建一个给定的目录节点 path, 并给它设置数据,CreateMode标识有四种形式的目录节点,分别是 PERSISTENT:持久化目录节点,这个目录节点存储的数据不会丢失;PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目录节点会根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名;EPHEMERAL:临时目录节点,一旦创建这个节点的客户端与服务器端口也就是 session 超时,这种节点会被自动删除;EPHEMERAL_SEQUENTIAL:临时自动编号节点。
更多关于Zookeeper的中文API,参考这个博客:
http://www.cnblogs.com/ggjucheng/p/3370359.html
第三个类,ZookeeperCliTest.java就是测试啦:
importjava.util.List;publicclassZookeeperCliTest{publicstaticvoidmain(String[]args){//定义父子类节点路径StringrootPath="/ZookeeperRoot01";StringchildPath2=rootPath+"/child101";StringchildPath3=rootPath+"/child201";//ZookeeperOperation操作APIZookeeperOperationzookeeperOperation=newZookeeperOperation();//连接Zookeeper服务器ZookeeperDemozookeeperDemo=newZookeeperDemo();zookeeperDemo.connect("127.0.0.1:2181");//创建节点if(zookeeperOperation.createZNode(rootPath,"<父>父节点数据")){System.out.println("节点["+rootPath+"],数据["+zookeeperOperation.readData(rootPath)+"]");}//创建子节点,读取+删除if(zookeeperOperation.createZNode(childPath2,"<父-子(1)>节点数据")){System.out.println("节点["+childPath2+"]数据内容["+zookeeperOperation.readData(childPath2)+"]");zookeeperOperation.deteleZKNode(childPath2);System.out.println("节点["+childPath2+"]删除值后["+zookeeperOperation.readData(childPath2)+"]");}//创建子节点,读取+修改if(zookeeperOperation.createZNode(childPath3,"<父-子(2)>节点数据")){System.out.println("节点["+childPath3+"]数据内容["+zookeeperOperation.readData(childPath3)+"]");zookeeperOperation.updateZKNodeData(childPath3,"<父-子(2)>节点数据,更新后的数据");System.out.println("节点["+childPath3+"]数据内容更新后["+zookeeperOperation.readData(childPath3)+"]");}//获取子节点List<String>childPaths=zookeeperOperation.getChild(rootPath);if(null!=childPaths){System.out.println("节点["+rootPath+"]下的子节点数["+childPaths.size()+"]");for(StringchildPath:childPaths){System.out.println("|--节点名["+childPath+"]");}}//判断节点是否存在System.out.println("检测节点["+rootPath+"]是否存在:"+zookeeperOperation.isExists(rootPath));System.out.println("检测节点["+childPath2+"]是否存在:"+zookeeperOperation.isExists(childPath2));System.out.println("检测节点["+childPath3+"]是否存在:"+zookeeperOperation.isExists(childPath3));zookeeperDemo.close();}}
由于开启了Logger的打印日志到控制台,所以运行结果如下:
以上是main启动的打印信息。
以上的汉字就是我们在代码中要求Logger打印到控制台的。
若不想看到这么多信息,可以注释掉log4j.properties.结果如下:除了System.out.println()的打印结果,Logger一个都没有出现。
log4j.properties的内容如下:
#logger
log4j.rootLogger=debug,appender1
log4j.appender.appender1=org.apache.log4j.ConsoleAppender
log4j.appender.appender1.layout=org.apache.log4j.TTCCLayout
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。