NioServerSocketChannel的注册源码解析
我们上一章分析了Netty中NioServerSocketChaennl的创建于初始化,本章节将继续分析NioServerSocketChannel的分析,NioServerSocketChannel是Netty官方封装的一个通道对象,旨用来代替或者包装JDK原生的SocketChannel对象,那么他是如何讲NioServerSocketChannel于JDK的NIO相关代码关联起来的呢?
一、源码入口寻找
我们上一节课主要分析的源码方法是
initAndRegister
方法,其实从名字可以看出来,这里是做通道的初始化于注册的,我们继续回到这个方法,该方法的寻找,参照上一章节:
AbstractBootstrap#initAndRegister
我们跳过上节课已经分析的代码,直接来到注册相关的逻辑:
ChannelFuture regFuture = config().group().register(channel);
我们逐个方法进行分析:
config()
现在我们创建的ServerBootstrap,所以为什么选这个我就不多说了:
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
我们可以看到,他返回的是这个对象,该对象是再创建ServerBootstrap的时候自动创建的,我们看,他构造方法里面穿了一个this,证明 他持有一个ServerBootstrap的引用 ,这代表着他可以通过这个对象,获取ServerBootstrap内所有的属性和方法!获取到这个类之后干嘛了呢?
config().group()
估计大家很多都已经猜出来了,我们直接点进group里面去验证一下:
@SuppressWarnings("deprecation")
public final EventLoopGroup group() {
return bootstrap.group();
该代码是获取到了我们再构建ServerBootstrap的时候设置的bossGroup对象,有兴趣的可以追一下,这里比较简单就不做太多的阐述了,我们继续回到主线,
config().group().register(channel);
我们通过上述代码的分析,知道了group方法返回的是NioEventLoopGroup,我们进入到register方法:
我们发现这里并没有
NioEventLoopGroup
,但是通过前几章我们的学习,我们知道NioEventLoopGroup是
MultithreadEventLoopGroup
的子类,所以我们子类没有往父类找,我们进入到MultithreadEventLoopGroup源码里面:
@Override
public ChannelFuture register(Channel channel) {
//一般来说这里获取的NioEventLoop 他有继承与 SingleThreadEventLoop
return next().register(channel);
在这里,我们看到了一个我们前面分析过得代码,next(),他调用的是
chooser.next();
, chooser是我们在构建
NioEventLoopGroup
的时候创建的一个执行器的选择器,
next
方法的功能是轮训的返回一个线程执行器:
NioEventLoop
!记不太清的同学可以回头看NioEventLoopGroup初始化源码解析的那一章代码!
现在我们根据前几章的基础,我们知道了next()方法返回的是一个NioEventLoop类,我们进入到register()方法查看:
但是,我们发现NioEventLoop相关的实现,但是我们根据前面所学,我们可以知道,NioEventLoop的父类是SingleThreadEventLoop,所以我们进入到 SingleThreadEventLoop#register(io.netty.channel.Channel):
@Override
public ChannelFuture register(Channel channel) {
//调用本身的注册方法
return register(new DefaultChannelPromise(channel, this));
//没什么可说的继续往下追
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
我们一定能够猜到,这里的主要代码是:
promise.channel().unsafe().register(this, promise);
我们上一章分析过 unsafe是
NioMessageUnsafe
, 但是register却没有他的实现:
我们还是需要往父类追,进入到io.netty.channel.AbstractChannel.AbstractUnsafe#register(this, promise):
我们这里先关注一下参数 :
this : 传入的是他本身,他本身是个什么 NioEventLoop,也就是说,他传入了一个执行器
promise :NioServerSocketChannel的包装对象
我们进入到 register方法中,分析主要代码:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
......................暂时忽略不必要代码.............................
AbstractChannel.this.eventLoop = eventLoop;
//注意此时的thread = null 所以返回false
if (eventLoop.inEventLoop()) {
//实际的注册 注册selector 触发 handlerAdded事件和 channelRegistered事件
register0(promise);
} else {
.......................暂时忽略不必要代码......................
AbstractChannel.this.eventLoop = eventLoop;
首先我们将上一步获取的执行器保存在NioServerSocketChannel中! 这行代码有力的证明了,每一个Channel绑定一个NioEventLoop对象!
if (eventLoop.inEventLoop()) {
//实际的注册 注册selector 触发 handlerAdded事件和 channelRegistered事件
register0(promise);
注意:这里我需要澄清一点,真实的调试过程中,并不会走这个分支,而是会走else分支异步进行注册 ,这里为了更方便大家理解,我就依照if分支进行源码分析,其实没有太大变化,都是调用register0方法进行注册,只不过一个同步一个异步,关于异步,是Netty中及其重要的一个知识点,我将放到后面单独开一章进行讲解!
我们进入到register0源码里面:
private void register0(ChannelPromise promise) {
try {
..............忽略代码..................
//实际的注册 调用jdk底层的数据注册selector
// 调用 JDK 底层的 register() 进行注册
//io.netty.channel.nio.AbstractNioChannel.doRegister
doRegister();
neverRegistered = false;
registered = true;
//通知管道 传播handlerAdded事件
//触发 handlerAdded 事件 触发任务 add事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//通知管道 传播channelRegistered事件
// 触发 channelRegistered 事件
pipeline.fireChannelRegistered();
// 如果从未注册过频道,则仅触发channelActive。
// 如果取消注册并重新注册通道,则多个通道处于活动状态。
//isActive() 返回false
// 此时 Channel 还未注册绑定地址,所以处于非活跃状态
if (isActive()) {
....................忽略不必要代码..................
} catch (Throwable t) {
// 直接关闭通道以避免FD泄漏。
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
二、源码解析
doRegister();
doRegister();
真正的注册方法,该方法是将Netty本身的NioServerSocket与JDK连接起来的最重要的一个类!
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
javaChannel()方法是返回JDK原生的SocketChannel,他是再NioServerSocketChannel初始化的时候被保存的,还记得我们再讲述NIO开发Socket的时候的流程吗
我们重点关注一下javaChannel().register的参数:
eventLoop().unwrappedSelector() :NioEventLoop再创建的时候,会保存两个选择器,一个是JDK的原始的选择器,一个是经过Netty包装的选择器,这里返回的是原生的选择器!
0 :不关注任何事件
this :this代表着当前类,他是NioServerSocketChannel类型的,他将一个NioServerSocketChannel的对象,绑定到了JDK原生的选择器,后续只需要通过SelectionKey.attachment(),就能获取到NioServerSocketChannel,而一个NioServerSocketChannel里面又包含一个JDK原生的Channel对象,就可以基于该jdk原生的Channel来进行各种读写操作!
到现在为止,我们就完成JDK中的NIO的将通道绑定到选择器上,我们回到上一步:
pipeline.invokeHandlerAddedIfNeeded
pipeline.invokeHandlerAddedIfNeeded();
开始回调pipeline通道里面添加自定义事件:
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// 现在,我们已注册到EventLoop。现在该调用ChannelHandler的回调了,
// 在完成注册之前添加的内容。
callHandlerAddedForAllHandlers();
//callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() {
//task = PendingHandlerAddedTask
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
需要注意的是 PendingHandlerCallback task 是PendingHandlerAddedTask类型的,他是什么时候加载的呢?实在我们初始化NioServerSocketChannel的时候调用addLast方法的时候被赋的值,有兴趣的小伙伴可以自己去跟一下源码,这里直接进入到:
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
//进入到 callHandlerAdded0源码逻辑
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
ctx.callHandlerAdded();
.......................
//进入到ctx.callHandlerAdded();
final void callHandlerAdded() throws Exception {
if (setAddComplete()) {
handler().handlerAdded(this);
还接记得handler()吗,我再NioServerSocketChannel初始化的时候说过,当时程序向pipeline中添加了一个ChannelInitializer,这里返回的就是那个ChannelInitializer! 我们进入到 ChannelInitializer#handlerAdded 方法里面:
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
removeState(ctx);
首先我们重点关注一个 initChannel(ctx) ,
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
// 防止再次进入。
if (initMap.add(ctx)) {
try {
// 调用 ChannelInitializer 实现的 initChannel() 方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
................................
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
// 将 ChannelInitializer 自身从 Pipeline 中移出
pipeline.remove(this);
return true;
return false;
initChannel((C) ctx.channel());
该方法会回调ChannelInitializer的抽象方法initChannel,该抽象方法在我们初始化的时候完成,我们就要找到实现这个抽象方法的地方,我们回到上一节课的代码: io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) {
..........................;
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//将用户自定义的handler添加进管道 handler 是在构建ServerBootStr的时候传入的 handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
ch.eventLoop().execute(() -> {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
上一节课讲的时候,我们将这一段逻辑略过了,只说是会向通道中添加一个ChannelInitializer实现,现在开始回调他的initChannel方法了:
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
这段代码会将客户再构建ServerBootstrap的时候传入的handler添加进通道,我们为了方便理解,假设用户没有设置handler,所以这个handler判断不通过,跳过,我们继续往下:
ch.eventLoop().execute(() -> {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
这里异步的向管道流注册一个默认的Handler, 为什么说是异步的,我们暂且不说,我们暂且认为是同步的进行add,此时我们的通道如下:
ServerBootstrapAcceptor的作用是专门用于新连接接入的,
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
我们可以看到,他会保存一系列的参数,包括WorkGroup、childHandler、childOptions、childAttrs这些参数都是我们再创建serverBootstrap的时候传入的参数,这也证明了,这些参数是作用于客户端Socket连接的!
有关ServerBootstrapAcceptor,后续会进行一个详细的分析,我们接着说,这里需要重点讲一下addLast方法,
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
........................忽略.........................
//通知添加方法回调
callHandlerAdded0(newCtx);
return this;
在进行添加的时候,他会回调内部产生的
handlerAdded
方法,还记得,我们在介绍Netty的基本架构的业务通道章节吗?
再调用addLast之后,该方法会被回调!
这样就讲所有的方法注册完毕了,我们继续回到 ChannelInitializer#handlerAdded 方法,当**initChannel(ctx)**调用完了之后:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
// 防止再次进入。
if (initMap.add(ctx)) {
try {
// 调用 ChannelInitializer 实现的 initChannel() 方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
................................
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
// 将 ChannelInitializer 自身从 Pipeline 中移出
pipeline.remove(this);
return true;
return false;
我们会进入到finally里面,我们会看到,此时会做一个操作,删除当前的类,当前的类是谁,是ChannelInitializer,所以删除完毕后,此时管道对象的结构如图所示:
至此 invokeHandlerAddedIfNeeded 分析完毕
pipeline.fireChannelRegistered();
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
这行代码其实没什么可说的,大家可以自己跟一下调试一下代码,这个代码的意义是从HeadContext节点开始传播channelRegistered方法:
至此,NioServerSocketChannel的注册基本就分析完了,有的同学可能觉得少分析了一段:
if (isActive()) {
if (firstRegistration) {
//Channel 当前状态为活跃时,触发 channelActive 事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 该通道已注册,并已设置autoRead()。这意味着我们需要开始阅读
// 再次,以便我们处理入站数据。