netty 5 alph1源码分析(服务端创建过程)
参照《Netty系列之Netty 服务端创建》,研究了netty的服务端创建过程。至于netty的优势,可以参照网络其他文章。《Netty系列之Netty 服务端创建》是李林锋撰写的netty源码分析的一篇好文,绝对是技术干货。但抛开技术来说,也存在一些瑕疵。
缺点如下
代码衔接不连贯,上下不连贯。
代码片段是截图,对阅读代理不便(可能和阅读习惯有关)
本篇主要内容,参照《Netty系列之Netty 服务端创建》,梳理出自己喜欢的阅读风格。
1.整体逻辑图
整体将服务端创建分为2部分:(1)绑定端口,提供服务过程;(2)轮询网络请求
1.1 绑定端口序列图
1.2 类图
类图仅仅涵盖了绑定过程中比较重要的几个组件
1.3 代码分析
step 2 doBind 绑定本地端口,启动服务
privateChannelFuturedoBind(finalSocketAddresslocalAddress){finalChannelFutureregFuture=initAndRegister();//1finalChannelchannel=regFuture.channel();if(regFuture.cause()!=null){returnregFuture;}finalChannelPromisepromise;if(regFuture.isDone()){promise=channel.newPromise();doBind0(regFuture,channel,localAddress,promise);//2}else{//Registrationfutureisalmostalwaysfulfilledalready,butjustincaseit'snot.promise=newDefaultChannelPromise(channel,GlobalEventExecutor.INSTANCE);regFuture.addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuturefuture)throwsException{doBind0(regFuture,channel,localAddress,promise);//2}});}returnpromise;}
主要分为2个处理单元
step3 initAndRegister
finalChannelFutureinitAndRegister(){Channelchannel;try{channel=createChannel();}catch(Throwablet){returnVoidChannel.INSTANCE.newFailedFuture(t);}try{init(channel);}catch(Throwablet){channel.unsafe().closeForcibly();returnchannel.newFailedFuture(t);}//注册NioServerSocketChannel到Reactor线程的多路复用器上ChannelPromiseregFuture=channel.newPromise();channel.unsafe().register(regFuture);if(regFuture.cause()!=null){if(channel.isRegistered()){channel.close();}else{channel.unsafe().closeForcibly();}}returnregFuture;}
createChannel由子类ServerBootstrap实现,创建新的NioServerSocketChannel,并完成Channel初始化,以及注册。
4.ServerBootstrap.createChannel
ChannelcreateChannel(){EventLoopeventLoop=group().next();returnchannelFactory().newChannel(eventLoop,childGroup);}
它有两个参数,参数1是从父类的NIO线程池中顺序获取一个NioEventLoop,它就是服务端用于监听和接收客户端连接的Reactor线程。第二个参数就是所谓的workerGroup线程池,它就是处理IO读写的Reactor线程组。
5.ServerBootstrap.init
voidinit(Channelchannel)throwsException{//设置Socket参数和NioServerSocketChannel的附加属性finalMap<ChannelOption<?>,Object>options=options();synchronized(options){channel.config().setOptions(options);}finalMap<AttributeKey<?>,Object>attrs=attrs();synchronized(attrs){for(Entry<AttributeKey<?>,Object>e:attrs.entrySet()){@SuppressWarnings("unchecked")AttributeKey<Object>key=(AttributeKey<Object>)e.getKey();channel.attr(key).set(e.getValue());}}//将AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中ChannelPipelinep=channel.pipeline();if(handler()!=null){p.addLast(handler());}finalChannelHandlercurrentChildHandler=childHandler;finalEntry<ChannelOption<?>,Object>[]currentChildOptions;finalEntry<AttributeKey<?>,Object>[]currentChildAttrs;synchronized(childOptions){currentChildOptions=childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized(childAttrs){currentChildAttrs=childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}//将用于服务端注册的HandlerServerBootstrapAcceptor添加到ChannelPipeline中p.addLast(newChannelInitializer<Channel>(){@OverridepublicvoidinitChannel(Channelch)throwsException{ch.pipeline().addLast(newServerBootstrapAcceptor(currentChildHandler,currentChildOptions,currentChildAttrs));}});}
到此处,Netty服务端监听的相关资源已经初始化完毕。
6.AbstractChannel.AbstractUnsafe.register
publicfinalvoidregister(finalChannelPromisepromise){//首先判断是否是NioEventLoop自身发起的操作,如果是,则不存在并发操作,直接执行Channel注册;if(eventLoop.inEventLoop()){register0(promise);}else{//如果由其它线程发起,则封装成一个Task放入消息队列中异步执行。try{eventLoop.execute(newRunnable(){@Overridepublicvoidrun(){register0(promise);}});}catch(Throwablet){logger.warn("Force-closingachannelwhoseregistrationtaskwasnotacceptedbyaneventloop:{}",AbstractChannel.this,t);closeForcibly();closeFuture.setClosed();promise.setFailure(t);}}}
7.register0
privatevoidregister0(ChannelPromisepromise){try{//checkifthechannelisstillopenasitcouldbeclosedinthemeantimewhentheregister//callwasoutsideoftheeventLoopif(!ensureOpen(promise)){return;}doRegister();registered=true;promise.setSuccess();pipeline.fireChannelRegistered();if(isActive()){//完成绑定时,不会调用该代码段pipeline.fireChannelActive();}}catch(Throwablet){//ClosethechanneldirectlytoavoidFDleak.closeForcibly();closeFuture.setClosed();if(!promise.tryFailure(t)){logger.warn("Triedtofailtheregistrationpromise,butitiscompletealready."+"Swallowingthecauseoftheregistrationfailure:",t);}}}
触发事件
8.doRegister
protectedvoiddoRegister()throwsException{booleanselected=false;for(;;){try{//将NioServerSocketChannel注册到NioEventLoop的Selector上selectionKey=javaChannel().register(eventLoop().selector,0,this);return;}catch(CancelledKeyExceptione){if(!selected){//ForcetheSelectortoselectnowasthe"canceled"SelectionKeymaystillbe//cachedandnotremovedbecausenoSelect.select(..)operationwascalledyet.eventLoop().selectNow();selected=true;}else{//WeforcedaselectoperationontheselectorbeforebuttheSelectionKeyisstillcached//forwhateverreason.JDKbug?throwe;}}}}
大伙儿可能会很诧异,应该注册OP_ACCEPT(16)到多路复用器上,怎么注册0呢?0表示只注册,不监听任何网络操作。这样做的原因如下:
注册方法是多态的,它既可以被NioServerSocketChannel用来监听客户端的连接接入,也可以用来注册SocketChannel,用来监听网络读或者写操作;
通过SelectionKey的interestOps(int ops)方法可以方便的修改监听操作位。所以,此处注册需要获取SelectionKey并给AbstractNioChannel的成员变量selectionKey赋值。
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。