Netty线程模型、Future、Channel总结和源码分析
可根据需要配置线程模型:单线程Reactor、多线程Reactor、多层线程Reactor
无论几个线程,都通过单一的Acceptor接收客户端请求,可以创建更多的NioEventLoop来处理IO操作。
EventLoop和EventLoopGroup实际继承了Java的ScheduledExecutorService,使其具备了线程池的特性,其线程数量可动态配置。例如配置单线程模型,设置线程数量为1即可。
Future和PromiseFutureFuture即异步操作
future操作可以被close,但结果是未知的;调用get可以获取操作结果,但是会被阻塞;isDone可判断是否完成操作。
ChannelFuture是为了获取异步返回结果而设计
可以通过ChannelFutureListener接口获得回调,无需等待get方法返回。
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> { ChannelFutureListener CLOSE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { future.channel().close(); } }; ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { future.channel().close(); } } }; ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { future.channel().pipeline().fireExceptionCaught(future.cause()); } } };}
连接超时和channel超时配置
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
channelFutrue.awaitUninterruptibly(10, TimeUnit.SECONDS);
注意:
1、谨慎调用await,可能导致死锁。
2、ChannelFuture超时后如果调用了业务代码重连,而此时IO未超时,将可能导致多条连接并存,设置IO超时时间建议小于业务代码超时时间。
升级版的future,可写可操作(对回调过程)。future好比古代飞鸽传书,只能等鸽子回来或者不回来,不可控;promise就像现代快递员,送快递送一半可以打电话给他叫他不要送了或者中途请他帮忙买个饼。
例如:
DefaultPromise类
awaitUninterruptibly()可手动打断回调,使进程等待。
public Promise<V> awaitUninterruptibly() { if (this.isDone()) { return this; } else { boolean interrupted = false; synchronized(this) { while(!this.isDone()) { this.checkDeadLock(); this.incWaiters(); try { this.wait(); } catch (InterruptedException var9) { interrupted = true; } finally { this.decWaiters(); } } } if (interrupted) { Thread.currentThread().interrupt(); } return this; } }
进行了死锁判断,避免已存在相同任务;并限制了最大等待数量32767
protected void checkDeadLock() { EventExecutor e = this.executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(this.toString()); } }private void incWaiters() { if (this.waiters == 32767) { throw new IllegalStateException("too many waiters: " + this); } else { ++this.waiters; } }
Channel和UnSafe
Channel负责对外提供操作IO的接口,而UnSafe是Channel的内部接口类,如其名一样是不安全的操作,所以封装在接口内部不让外部调用,而实际的操作IO最终都是在Unsafe中执行。
//Channel调用连接为例,跟踪实现连接请求的过程ChannelFuture connect(SocketAddress var1);//DefaultChannelPipeline中执行,实际是调用尾部的pipeline public ChannelFuture connect(SocketAddress remoteAddress) { return this.tail.connect(remoteAddress); }//AbstractChannelHandlerContext是Pipeline容器中的对象,//持续寻找所有handler执行对象,直到全部被调用 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { AbstractChannelHandlerContext next = this.findContextOutbound(); next.invoker().invokeConnect(next, remoteAddress, localAddress, promise); return promise; } private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while(!ctx.outbound); return ctx; }//而真实的执行是寻找到UnSafe的Invoker public ChannelHandlerInvoker invoker() { return this.invoker == null ? this.channel().unsafe().invoker() : this.invoker; } public void invokeConnect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } else if (ChannelHandlerInvokerUtil.validatePromise(ctx, promise, false)) { if (this.executor.inEventLoop()) { ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise); } else { this.safeExecuteOutbound(new OneTimeTask() { public void run() { ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise); } }, promise); } } }
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。