接前文
http://blog.itpub.net/29254281/viewspace-1344706/
http://blog.itpub.net/29254281/viewspace-1347985/
http://blog.itpub.net/29254281/viewspace-2134876/
http://blog.itpub.net/29254281/viewspace-2135131/

还是那个程序,在之前的基础上,改用Netty作为客户端.
也不知道用的到底对不对,先记录一下,以后慢慢学习.


importjava.io.IOException; importjava.nio.channels.Selector; importjava.sql.Connection; importjava.sql.DriverManager; importjava.sql.PreparedStatement; importjava.sql.SQLException; importjava.sql.Timestamp; importjava.util.ArrayList; importjava.util.HashSet; importjava.util.Iterator; importjava.util.List; importjava.util.Set; importjava.util.concurrent.BlockingQueue; importjava.util.concurrent.LinkedBlockingQueue; importjava.util.concurrent.atomic.AtomicInteger; importjava.util.regex.Matcher; importjava.util.regex.Pattern; importio.netty.bootstrap.Bootstrap; importio.netty.buffer.ByteBuf; importio.netty.buffer.Unpooled; importio.netty.channel.Channel; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.ChannelInboundHandlerAdapter; importio.netty.channel.ChannelInitializer; importio.netty.channel.EventLoopGroup; importio.netty.channel.nio.NioEventLoopGroup; importio.netty.channel.socket.nio.NioSocketChannel; importio.netty.handler.codec.LineBasedFrameDecoder; importio.netty.handler.codec.string.StringDecoder; classReactorimplementsRunnable{ publicstaticintGETCOUNT(){ returnCOUNT.get(); } publicstaticintgetQueueSize(){ returnQUEUE.size(); } privatestaticfinalAtomicIntegerCOUNT=newAtomicInteger(); privatestaticfinalAtomicIntegerTASKCOUNT=newAtomicInteger(); publicintstartTask(){ returnTASKCOUNT.incrementAndGet(); } publicintfinishTask(){ returnTASKCOUNT.decrementAndGet(); } publicintincrementAndGet(){ returnCOUNT.incrementAndGet(); } publicfinalSelectorselector; privatestaticBlockingQueueQUEUE=newLinkedBlockingQueue(); publicvoidaddTask(Tasktask){ try{ QUEUE.put(task); }catch(InterruptedExceptione){ e.printStackTrace(); } } publicReactor()throwsIOException{ selector=Selector.open(); } @Override publicvoidrun(){ EventLoopGroupgroup=newNioEventLoopGroup(3); finalReactorreactor=this; while(!Thread.interrupted()){ intmaxClient=500; Tasktask=null; if(TASKCOUNT.get()<maxClient){ try{ while((task=(Task)QUEUE.take())!=null){ finalTaskt=task; reactor.startTask(); Bootstrapboot=newBootstrap(); boot.group(group).channel(NioSocketChannel.class).handler(newChannelInitializer(){ @Override protectedvoidinitChannel(Channelch)throwsException{ ch.pipeline().addLast(newLineBasedFrameDecoder(409600)); ch.pipeline().addLast(newStringDecoder()); ch.pipeline().addLast(newHttpClientInboundHandler(reactor,t)); } }); boot.connect(task.getHost(),task.getPort()); if(TASKCOUNT.get()>maxClient){ break; } } }catch(InterruptedExceptione){ e.printStackTrace(); } }else{ //如果已经连接了500个网页,则主线程休眠一段时间. try{ Thread.sleep(10); }catch(InterruptedExceptione){ e.printStackTrace(); } } } group.shutdownGracefully(); } } classHttpClientInboundHandlerextendsChannelInboundHandlerAdapter{ privateTasktask; privateReactorreactor; @Override publicvoidchannelInactive(ChannelHandlerContextctx)throwsException{ ctx.channel().closeFuture(); ctx.close(); this.reactor.finishTask(); task.setEndtime(System.currentTimeMillis()); this.reactor.incrementAndGet(); newParseHandler(reactor,task).run(); } publicHttpClientInboundHandler(Reactorreactor,Tasktask){ this.task=task; this.reactor=reactor; } @Override publicvoidchannelActive(ChannelHandlerContextctx)throwsException{ task.setStarttime(System.currentTimeMillis()); StringBuildersb=newStringBuilder(); sb.append("GET"+task.getCurrentPath()+"HTTP/1.0\r\n"); sb.append("HOST:"+task.getHost()+"\r\n"); sb.append("Accept:*/*\r\n"); sb.append("\r\n"); ByteBufbb=Unpooled.copiedBuffer(sb.toString().getBytes("utf8")); ctx.writeAndFlush(bb); } @Override publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{ Stringcontent=(String)msg; task.getContent().append(content); task.getContent().append("\n"); } @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{ cause.printStackTrace(); ctx.close(); } } publicclassProbe{ publicstaticvoidmain(String[]args)throwsIOException,InterruptedException{ for(inti=0;i<1;i++){ Reactorreactor=newReactor(); reactor.addTask(newTask("news.163.com",80,"/index.html")); newThread(reactor,"ReactorThread_"+i).start(); } longstart=System.currentTimeMillis(); while(true){ Thread.sleep(1000); longend=System.currentTimeMillis(); floatinterval=((end-start)/1000); intconnectTotal=Reactor.GETCOUNT(); intpersistenceTotal=PersistenceHandler.GETCOUNT(); intconnectps=Math.round(connectTotal/interval); intpersistenceps=Math.round(persistenceTotal/interval); System.out.print("\r连接总数:"+connectTotal+"\t每秒连接:"+connectps+"\t连接队列剩余:"+Reactor.getQueueSize() +"\t持久化总数:"+persistenceTotal+"\t每秒持久化:"+persistenceps+"\t持久化队列剩余:" +PersistenceHandler.getInstance().getSize()); } } } classTask{ privateStringhost; privateintport; privateStringcurrentPath; privatelongstarttime; privatelongendtime; privateStringtype; privateStringBuildercontent=newStringBuilder(2400); privateintstate; privatebooleanisValid=true; publicTask(){ } publicTask(Stringhost,intport,Stringpath){ init(host,port,path); } publicvoidinit(Stringhost,intport,Stringpath){ this.setCurrentPath(path); this.host=host; this.port=port; } publiclonggetStarttime(){ returnstarttime; } publicvoidsetStarttime(longstarttime){ this.starttime=starttime; } publiclonggetEndtime(){ returnendtime; } publicvoidsetEndtime(longendtime){ this.endtime=endtime; } publicbooleanisValid(){ returnisValid; } publicvoidsetValid(booleanisValid){ this.isValid=isValid; } publicintgetState(){ returnstate; } publicvoidsetState(intstate){ this.state=state; } publicStringgetCurrentPath(){ returncurrentPath; } publicvoidsetCurrentPath(StringcurrentPath){ this.currentPath=currentPath; inti=0; if(currentPath.indexOf("?")!=-1){ i=currentPath.indexOf("?"); }else{ if(currentPath.indexOf("#")!=-1){ i=currentPath.indexOf("#"); }else{ i=currentPath.length(); } } this.type=currentPath.substring(currentPath.indexOf(".")+1,i); } publiclonggetTaskTime(){ returngetEndtime()-getStarttime(); } publicStringgetType(){ returntype; } publicvoidsetType(Stringtype){ this.type=type; } publicStringgetHost(){ returnhost; } publicintgetPort(){ returnport; } publicStringBuildergetContent(){ returncontent; } publicvoidsetContent(StringBuildercontent){ this.content=content; } } classParseHandlerimplementsRunnable{ privatestaticfinalSetSET=newHashSet(); PersistenceHandlerpersistencehandler=PersistenceHandler.getInstance(); Listdomainlist=newArrayList(); Tasktask; privateinterfaceFilter{ voiddoFilter(TaskfatherTask,TasknewTask,Stringpath,Filterchain); } privateclassFilterChainimplementsFilter{ privateListlist=newArrayList(); { addFilter(newTwoLevel()); addFilter(newOneLevel()); addFilter(newFullPath()); addFilter(newRoot()); addFilter(newDefault()); } privatevoidaddFilter(Filterfilter){ list.add(filter); } privateIteratorit=list.iterator(); @Override publicvoiddoFilter(TaskfatherTask,TasknewTask,Stringpath,Filterchain){ if(it.hasNext()){ ((Filter)it.next()).doFilter(fatherTask,newTask,path,chain); } } } privateclassTwoLevelimplementsFilter{ @Override publicvoiddoFilter(TaskfatherTask,TasknewTask,Stringpath,Filterchain){ if(path.startsWith("../../")){ Stringprefix=getPrefix(fatherTask.getCurrentPath(),3); newTask.init(fatherTask.getHost(),fatherTask.getPort(),path.replace("../../",prefix)); }else{ chain.doFilter(fatherTask,newTask,path,chain); } } } privateclassOneLevelimplementsFilter{ @Override publicvoiddoFilter(TaskfatherTask,TasknewTask,Stringpath,Filterchain){ if(path.startsWith("../")){ Stringprefix=getPrefix(fatherTask.getCurrentPath(),2); newTask.init(fatherTask.getHost(),fatherTask.getPort(),path.replace("../",prefix)); }else{ chain.doFilter(fatherTask,newTask,path,chain); } } } privateclassFullPathimplementsFilter{ @Override publicvoiddoFilter(TaskfatherTask,TasknewTask,Stringpath,Filterchain){ if(path.startsWith("http://")){ Iteratorit=domainlist.iterator(); booleanflag=false; while(it.hasNext()){ Stringdomain=(String)it.next(); if(path.startsWith("http://"+domain+"/")){ newTask.init(domain,fatherTask.getPort(),path.replace("http://"+domain+"/","/")); flag=true; break; } } if(!flag){ newTask.setValid(false); } }else{ chain.doFilter(fatherTask,newTask,path,chain); } } } privateclassRootimplementsFilter{ @Override publicvoiddoFilter(TaskfatherTask,TasknewTask,Stringpath,Filterchain){ if(path.startsWith("/")){ newTask.init(fatherTask.getHost(),fatherTask.getPort(),path); }else{ chain.doFilter(fatherTask,newTask,path,chain); } } } privateclassDefaultimplementsFilter{ @Override publicvoiddoFilter(TaskfatherTask,TasknewTask,Stringpath,Filterchain){ if(path.contains(":")){ newTask.setValid(false); return; } Stringprefix=getPrefix(fatherTask.getCurrentPath(),1); newTask.init(fatherTask.getHost(),fatherTask.getPort(),prefix+"/"+path); } } publicParseHandler(Reactorreactor,Tasktask){ this.reactor=reactor; this.task=task; //增加白名单 this.domainlist.add("news.163.com"); } privateReactorreactor; privatePatternpattern=Pattern.compile("\"[^\"]+\\.htm[^\"]*\""); privatevoidparseTaskState(Tasktask){ if(task.getContent().toString().startsWith("HTTP/1.1")){ task.setState(Integer.parseInt(task.getContent().substring(9,12))); }else{ try{ task.setState(Integer.parseInt(task.getContent().substring(9,12))); }catch(Exceptionex){ ex.printStackTrace(); System.out.println(task.getContent()); } } } /** *@paramfatherTask *@parampath *@throwsException */ privatevoidcreateNewTask(TaskfatherTask,Stringpath)throwsException{ TasknewTask=newTask(); FilterChainfilterchain=newFilterChain(); filterchain.doFilter(fatherTask,newTask,path,filterchain); if(newTask.isValid()){ synchronized(SET){ if(SET.contains(newTask.getHost()+newTask.getCurrentPath())){ return; } SET.add(newTask.getHost()+newTask.getCurrentPath()); } reactor.addTask(newTask); } } privateStringgetPrefix(Strings,intcount){ Stringprefix=s; while(count>0){ prefix=prefix.substring(0,prefix.lastIndexOf("/")); count--; } return"".equals(prefix)?"/":prefix; } @Override publicvoidrun(){ try{ parseTaskState(task); if(200==task.getState()){ Matchermatcher=pattern.matcher(task.getContent()); while(matcher.find()){ Stringpath=matcher.group(); if(!path.contains("")&&!path.contains("\t")&&!path.contains("(")&&!path.contains(")")){ path=path.substring(1,path.length()-1); createNewTask(task,path); } } } persistencehandler.addTask(task); }catch(Exceptione){ e.printStackTrace(); } } } classPersistenceHandlerimplementsRunnable{ privatestaticclassSingletonHandler{ privatestaticPersistenceHandlerobj=newPersistenceHandler(); } publicstaticPersistenceHandlergetInstance(){ returnSingletonHandler.obj; } static{ try{ Class.forName("com.mysql.jdbc.Driver"); }catch(ClassNotFoundExceptione){ //TODOAuto-generatedcatchblock e.printStackTrace(); } } publicstaticintGETCOUNT(){ returnCOUNT.get(); } privatestaticfinalAtomicIntegerCOUNT=newAtomicInteger(); privateBlockingQueuepersistencelist; publicPersistenceHandler(){ this.persistencelist=newLinkedBlockingQueue(); newThread(this,"PersistenceThread").start(); } publicvoidaddTask(Tasktask){ try{ this.persistencelist.put(task); }catch(InterruptedExceptione){ //TODOAuto-generatedcatchblock e.printStackTrace(); } } publicintgetSize(){ returnpersistencelist.size(); } privateConnectionconn; privatePreparedStatementps; @Override publicvoidrun(){ try{ conn=DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/mvbox","xx","xx"); conn.setAutoCommit(false); ps=conn.prepareStatement( "insertintoprobe(host,path,state,tasktime,type,length,createtime)values(?,?,?,?,?,?,?)"); }catch(SQLExceptione){ //TODOAuto-generatedcatchblock e.printStackTrace(); } while(true){ this.handler(); COUNT.addAndGet(1); } } privatevoidhandler(){ try{ Tasktask=(Task)persistencelist.take(); ps.setString(1,task.getHost()); ps.setString(2,task.getCurrentPath()); ps.setInt(3,task.getState()); ps.setLong(4,task.getTaskTime()); ps.setString(5,task.getType()); ps.setInt(6,task.getContent().toString().length()); ps.setTimestamp(7,newTimestamp(task.getEndtime())); ps.addBatch(); if(GETCOUNT()%500==0){ ps.executeBatch(); conn.commit(); } }catch(InterruptedExceptione){ e.printStackTrace(); }catch(SQLExceptione){ e.printStackTrace(); } } }
性能和原来差不多