在Reactor经典模型中,反应器查询到IO事件后会分发到Handler业务处理器,由Handler完成IO操作和业务处理。

整个IO处理操作环节大致包括从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端

整个的IO处理操作环节的前后两个环节(包括从通道读数据包和由通道发送到对端),由Netty的底层负责完成,不需要用户程序负责。用户程序主要涉及的Handler环节为数据包解码、业务处理、目标数据编码、把数据包写到通道中。

从应用程序开发人员的角度来看有入站和出站两种类型操作:

  • 入站处理触发的方向为自底向上,从Netty的内部(如通道)到ChannelInboundHandler入站处理器。
  • 出站处理触发的方向为自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。
  • IO处理操作环节前面的数据包解码、业务处理两个环节属于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个环节属于出站处理器的工作。

    1 ChannelInboundHandler入站处理器

    当对端数据入站到Netty通道时,Netty将触发ChannelInboundHandler入站处理器所对应的入站API,进行入站操作处理。

    public interface ChannelInboundHandler extends ChannelHandler
    

    1.1 核心API

    public interface ChannelInboundHandler extends ChannelHandler {
         当通道注册完成后,Netty会调用fireChannelRegistered()方法,触发通道注册事件,
         而在通道流水线注册过的入站处理器的channelRegistered()回调方法会被调用。
        void channelRegistered(ChannelHandlerContext ctx) throws Exception;
         * 当通道激活完成后,Netty会调用fireChannelActive()方法,触发通道激活事件,
         而在通道流水线注册过的入站处理器的channelActive()回调方法会被调用。
        void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
         * 当通道激活完成后,Netty会调用fireChannelActive()方法,触发通道激活事件,
         而在通道流水线注册过的入站处理器的channelActive()回调方法会被调用。
        void channelActive(ChannelHandlerContext ctx) throws Exception;
         * 当连接被断开或者不可用时,Netty会调用fireChannelInactive()方法
         ,触发连接不可用事件,
         而在通道流水线注册过的入站处理器的channelInactive()回调方法会被调用。
        void channelInactive(ChannelHandlerContext ctx) throws Exception;
         当通道缓冲区可读时,Netty会调用fireChannelRead()方法,触发通道可读事件,
         而在通道流水线注册过的入站处理器的channelRead()回调方法会被调用,
         以便完成入站数据的读取和处理。
        void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
         * 当通道缓冲区读完时,Netty会调用fireChannelReadComplete()方法,
         触发通道缓冲区读完事件,
         而在通道流水线注册过的入站处理器的channelReadComplete()回调方法会被调用。
        void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
         * 如果触发了用户事件,则被调用。
        void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
         * 一旦Channel的可写状态改变,就会被调用。 
         可以使用Channel.isWritable()检查状态。
        void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
         * 异常回调
        @Override
        @SuppressWarnings("deprecation")
        void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    

    在Netty中,入站处理器的默认实现ChannelInboundHandlerAdapter,开发人员可以继承改类,按需实现方法即可

    2 ChannelOutboundHandler出站处理器

    Netty出站处理的方向是通过上层Netty通道去操作底层Java IO通道: 当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler出站处理器完成Netty通道到底层通道的操作,比如建立底层连接、断开底层连接、写入底层Java NIO通道等。

    public interface ChannelOutboundHandler extends ChannelHandler
    

    2.1 核心API

    public interface ChannelOutboundHandler extends ChannelHandler {
        监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑定。
        如果使用TCP传输协议,这个方法用于服务端。
        void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
          连接服务端:完成底层Java IO通道的服务端的连接操作。
          如果使用TCP传输协议,那么这个方法将用于客户端。
        void connect(
                ChannelHandlerContext ctx, SocketAddress remoteAddress,
                SocketAddress localAddress, ChannelPromise promise) throws Exception;
         * 断开服务器连接:断开底层Java IO通道的socket连接。
         如果使用TCP传输协议,此方法主要用于客户端。
        void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
         * 主动关闭通道:关闭底层的通道,例如服务端的新连接监听通道。
        void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
       从当前注册的EventLoop进行取消注册操作后调用。
        void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
         * 从底层读数据:完成Netty通道从Java IO通道的数据读取。
        void read(ChannelHandlerContext ctx) throws Exception;
        * 写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作,并不是完成实际的数据写入操作。
        void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
         * 将底层缓存区的数据腾空,立即写出到对端。
        void flush(ChannelHandlerContext ctx) throws Exception;
    

    在Netty中,它的默认实现为ChannelOutboundHandlerAdapter。在实际开发中,只需要继承ChannelOutboundHandlerAdapter默认实现,重写自己需要的方法即可。

    3 ChannelInitializer通道初始化处理器

    Channel和Handler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作发生在通道开始工作之前。

    那么如何向流水线中装配业务处理器呢?就需要借助通道的初始化处理器——ChannelInitializer。

    之前helloWolrd程序中:

    // 装备子通道流水线
    this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        // 有连接到达时会创建一个通道
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 流水线的职责:负责管理通道中的处理器
            // 向“子通道”(传输通道)流水线添加一个处理器
            ch.pipeline().addLast(new DiscardHandler());
    

    ChannelInitializer属于入站处理器的类型,继承了ChannelInboundHandlerAdapter

    public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
    

    在上面代码中,使用了ChannelInitializer的initChannel()方法。initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。

    一旦Channel被注册,这个方法就会被调用。 方法返回后,此实例将从Channel的ChannelPipeline中删除。 protected abstract void initChannel(C ch) throws Exception;

    在通道初始化时,会调用提前注册的初始化处理器的initChannel()方法。比如,在父通道接收到新连接并且要初始化其子通道时,会调用初始化器的initChannel()方法,并且会将新接收的通道作为参数,传递给此方法。

    一般来说,initChannel()方法的大致业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。

    4 【案例】ChannelInboundHandler的生命周期案例

    为了弄清Handler业务处理器的各个方法的执行顺序和生命周期,这里定义一个简单的入站Handler处理器——InHandlerDemo。这个类继承于ChannelInboundHandlerAdapter适配器,实现了基类的大部分入站处理方法

    package study.wyy.netty.handler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
     * @author wyaoyao
     * @date 2021/7/12 14:37
    public class InHandlerDemo extends ChannelInboundHandlerAdapter {
        public InHandlerDemo() {
            super();
         * 添加的完成的时候调用
         * @param ctx
         * @throws Exception
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded 被调用");
            super.handlerAdded(ctx);
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerRemoved 被调用");
            super.handlerRemoved(ctx);
         * channel注册的时候调用
         * @param ctx
         * @throws Exception
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered 被调用");
            super.channelRegistered(ctx);
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelUnregistered 被调用");
            super.channelUnregistered(ctx);
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive 被调用");
            super.channelActive(ctx);
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelInactive 被调用");
            super.channelInactive(ctx);
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channelRead 被调用");
            super.channelRead(ctx, msg);
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelReadComplete 被调用");
            super.channelReadComplete(ctx);
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelWritabilityChanged 被调用");
            super.channelWritabilityChanged(ctx);
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("channelInactive 被调用");
            super.exceptionCaught(ctx, cause);
    
    public static void main(String[] args) {
        System.out.println("dadasdas");
        final InHandlerDemo inHandler = new InHandlerDemo();
        // 初始化处理器, 使用EmbeddedChannel嵌入式通道
        ChannelInitializer<EmbeddedChannel> channelInitializer = new ChannelInitializer<EmbeddedChannel>() {
            @Override
            protected void initChannel(EmbeddedChannel channel) throws Exception {
                // 流水线的职责:负责管理通道中的处理器
                // 向“子通道”(传输通道)流水线添加一个处理器
                channel.pipeline().addLast(inHandler);
        // 创建通道
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(channelInitializer);
        ByteBuf buf = Unpooled.buffer();
        //模拟入站,写一个入站包
        buf.writeInt(1);
        embeddedChannel.writeInbound(buf);
        embeddedChannel.flush();
        //模拟入站,再写一个入站包
        embeddedChannel.writeInbound(buf);
        embeddedChannel.flush();
        // 关闭通道
        embeddedChannel.close();
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
    
    handlerAdded 被调用
    channelRegistered 被调用
    channelActive 被调用
    channelRead 被调用
    channelReadComplete 被调用
    channelRead 被调用
    channelReadComplete 被调用
    channelInactive 被调用
    channelUnregistered 被调用
    handlerRemoved 被调用
    

    上面的几个方法中,channelRead、channelReadComplete是入站处理方法;而其他的6个方法是入站处理器的周期方法。从输出的结果可以看到,ChannelHandler中回调方法的执行顺序为:

    handlerAdded()→channelRegistered()→channelActive()→数据传输的入站回调→channelInactive()→channelUnregistered()→handlerRemoved()

    其中,数据传输的入站回调过程为:

    channelRead()→channelReadComplete()

    读数据的入站回调过程会根据入站数据的数量被重复调用,每一次有ByteBuf数据包入站都会调用到。

    除了两个入站回调方法外,其余的6个方法都和ChannelHandler的生命周期有关:

  • handlerAdded():当业务处理器被加入到流水线后,此方法将被回调。也就是在完成ch.pipeline().addLast(handler)语句之后会回调handlerAdded()。
  • channelRegistered():当通道成功绑定一个NioEventLoop反应器后,此方法将被回调。
  • channelActive():当通道激活成功后,此方法将被回调。通道激活成功指的是所有的业务处理器添加、注册的异步任务完成,并且与NioEventLoop反应器绑定的异步任务完成。
  • channelInactive():当通道的底层连接已经不是ESTABLISH状态或者底层连接已经关闭时,会首先回调所有业务处理器的channelInactive()方法。
  • channelUnregistered():通道和NioEventLoop反应器解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的channelUnregistered ()方法。
  • handlerRemoved():Netty会移除掉通道上所有的业务处理器,并且回调所有业务处理器的handlerRemoved()方法。 在上面的6个生命周期方法中,前面3个在通道创建和绑定时被先后回调,后面3个在通道关闭时会先后被回调。
  • 除了生命周期的回调,还有数据传输的入站回调方法。对于Inhandler入站处理器,有两个很重要的回调方法:

  • channelRead():有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。
  • channelReadComplete():流水线完成入站处理后,会从前向后依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。
  •