在Reactor经典模型中,反应器查询到IO事件后会分发到Handler业务处理器,由Handler完成IO操作和业务处理。
整个IO处理操作环节大致包括从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端
整个的IO处理操作环节的前后两个环节(包括从通道读数据包和由通道发送到对端),由Netty的底层负责完成,不需要用户程序负责。用户程序主要涉及的Handler环节为数据包解码、业务处理、目标数据编码、把数据包写到通道中。
从应用程序开发人员的角度来看有入站和出站两种类型操作:
IO处理操作环节前面的数据包解码、业务处理两个环节属于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个环节属于出站处理器的工作。
1 ChannelInboundHandler入站处理器
当对端数据入站到Netty通道时,Netty将触发ChannelInboundHandler入站处理器所对应的入站API,进行入站操作处理。
public interface ChannelInboundHandler extends ChannelHandler
1.1 核心API
public interface ChannelInboundHandler extends ChannelHandler {
当通道注册完成后,Netty会调用fireChannelRegistered()方法,触发通道注册事件,
而在通道流水线注册过的入站处理器的channelRegistered()回调方法会被调用。
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
* 当通道激活完成后,Netty会调用fireChannelActive()方法,触发通道激活事件,
而在通道流水线注册过的入站处理器的channelActive()回调方法会被调用。
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
* 当通道激活完成后,Netty会调用fireChannelActive()方法,触发通道激活事件,
而在通道流水线注册过的入站处理器的channelActive()回调方法会被调用。
void channelActive(ChannelHandlerContext ctx) throws Exception;
* 当连接被断开或者不可用时,Netty会调用fireChannelInactive()方法
,触发连接不可用事件,
而在通道流水线注册过的入站处理器的channelInactive()回调方法会被调用。
void channelInactive(ChannelHandlerContext ctx) throws Exception;
当通道缓冲区可读时,Netty会调用fireChannelRead()方法,触发通道可读事件,
而在通道流水线注册过的入站处理器的channelRead()回调方法会被调用,
以便完成入站数据的读取和处理。
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
* 当通道缓冲区读完时,Netty会调用fireChannelReadComplete()方法,
触发通道缓冲区读完事件,
而在通道流水线注册过的入站处理器的channelReadComplete()回调方法会被调用。
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
* 如果触发了用户事件,则被调用。
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
* 一旦Channel的可写状态改变,就会被调用。
可以使用Channel.isWritable()检查状态。
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
* 异常回调
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
在Netty中,入站处理器的默认实现ChannelInboundHandlerAdapter,开发人员可以继承改类,按需实现方法即可
2 ChannelOutboundHandler出站处理器
Netty出站处理的方向是通过上层Netty通道去操作底层Java IO通道: 当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler出站处理器完成Netty通道到底层通道的操作,比如建立底层连接、断开底层连接、写入底层Java NIO通道等。
public interface ChannelOutboundHandler extends ChannelHandler
2.1 核心API
public interface ChannelOutboundHandler extends ChannelHandler {
监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑定。
如果使用TCP传输协议,这个方法用于服务端。
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
连接服务端:完成底层Java IO通道的服务端的连接操作。
如果使用TCP传输协议,那么这个方法将用于客户端。
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
* 断开服务器连接:断开底层Java IO通道的socket连接。
如果使用TCP传输协议,此方法主要用于客户端。
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
* 主动关闭通道:关闭底层的通道,例如服务端的新连接监听通道。
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
从当前注册的EventLoop进行取消注册操作后调用。
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
* 从底层读数据:完成Netty通道从Java IO通道的数据读取。
void read(ChannelHandlerContext ctx) throws Exception;
* 写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作,并不是完成实际的数据写入操作。
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
* 将底层缓存区的数据腾空,立即写出到对端。
void flush(ChannelHandlerContext ctx) throws Exception;
在Netty中,它的默认实现为ChannelOutboundHandlerAdapter。在实际开发中,只需要继承ChannelOutboundHandlerAdapter默认实现,重写自己需要的方法即可。
3 ChannelInitializer通道初始化处理器
Channel和Handler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作发生在通道开始工作之前。
那么如何向流水线中装配业务处理器呢?就需要借助通道的初始化处理器——ChannelInitializer。
之前helloWolrd程序中:
// 装备子通道流水线
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
// 有连接到达时会创建一个通道
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 流水线的职责:负责管理通道中的处理器
// 向“子通道”(传输通道)流水线添加一个处理器
ch.pipeline().addLast(new DiscardHandler());
ChannelInitializer属于入站处理器的类型,继承了ChannelInboundHandlerAdapter
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
在上面代码中,使用了ChannelInitializer的initChannel()方法。initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。
一旦Channel被注册,这个方法就会被调用。
方法返回后,此实例将从Channel的ChannelPipeline中删除。
protected abstract void initChannel(C ch) throws Exception;
在通道初始化时,会调用提前注册的初始化处理器的initChannel()方法。比如,在父通道接收到新连接并且要初始化其子通道时,会调用初始化器的initChannel()方法,并且会将新接收的通道作为参数,传递给此方法。
一般来说,initChannel()方法的大致业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。
4 【案例】ChannelInboundHandler的生命周期案例
为了弄清Handler业务处理器的各个方法的执行顺序和生命周期,这里定义一个简单的入站Handler处理器——InHandlerDemo。这个类继承于ChannelInboundHandlerAdapter适配器,实现了基类的大部分入站处理方法
package study.wyy.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
* @author wyaoyao
* @date 2021/7/12 14:37
public class InHandlerDemo extends ChannelInboundHandlerAdapter {
public InHandlerDemo() {
super();
* 添加的完成的时候调用
* @param ctx
* @throws Exception
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded 被调用");
super.handlerAdded(ctx);
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被调用");
super.handlerRemoved(ctx);
* channel注册的时候调用
* @param ctx
* @throws Exception
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered 被调用");
super.channelRegistered(ctx);
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered 被调用");
super.channelUnregistered(ctx);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive 被调用");
super.channelActive(ctx);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive 被调用");
super.channelInactive(ctx);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead 被调用");
super.channelRead(ctx, msg);
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete 被调用");
super.channelReadComplete(ctx);
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged 被调用");
super.channelWritabilityChanged(ctx);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("channelInactive 被调用");
super.exceptionCaught(ctx, cause);
public static void main(String[] args) {
System.out.println("dadasdas");
final InHandlerDemo inHandler = new InHandlerDemo();
// 初始化处理器, 使用EmbeddedChannel嵌入式通道
ChannelInitializer<EmbeddedChannel> channelInitializer = new ChannelInitializer<EmbeddedChannel>() {
@Override
protected void initChannel(EmbeddedChannel channel) throws Exception {
// 流水线的职责:负责管理通道中的处理器
// 向“子通道”(传输通道)流水线添加一个处理器
channel.pipeline().addLast(inHandler);
// 创建通道
EmbeddedChannel embeddedChannel = new EmbeddedChannel(channelInitializer);
ByteBuf buf = Unpooled.buffer();
//模拟入站,写一个入站包
buf.writeInt(1);
embeddedChannel.writeInbound(buf);
embeddedChannel.flush();
//模拟入站,再写一个入站包
embeddedChannel.writeInbound(buf);
embeddedChannel.flush();
// 关闭通道
embeddedChannel.close();
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
handlerAdded 被调用
channelRegistered 被调用
channelActive 被调用
channelRead 被调用
channelReadComplete 被调用
channelRead 被调用
channelReadComplete 被调用
channelInactive 被调用
channelUnregistered 被调用
handlerRemoved 被调用
上面的几个方法中,channelRead、channelReadComplete是入站处理方法;而其他的6个方法是入站处理器的周期方法。从输出的结果可以看到,ChannelHandler中回调方法的执行顺序为:
handlerAdded()→channelRegistered()→channelActive()→数据传输的入站回调→channelInactive()→channelUnregistered()→handlerRemoved()
其中,数据传输的入站回调过程为:
channelRead()→channelReadComplete()
读数据的入站回调过程会根据入站数据的数量被重复调用,每一次有ByteBuf数据包入站都会调用到。
除了两个入站回调方法外,其余的6个方法都和ChannelHandler的生命周期有关:
handlerAdded():当业务处理器被加入到流水线后,此方法将被回调。也就是在完成ch.pipeline().addLast(handler)语句之后会回调handlerAdded()。
channelRegistered():当通道成功绑定一个NioEventLoop反应器后,此方法将被回调。
channelActive():当通道激活成功后,此方法将被回调。通道激活成功指的是所有的业务处理器添加、注册的异步任务完成,并且与NioEventLoop反应器绑定的异步任务完成。
channelInactive():当通道的底层连接已经不是ESTABLISH状态或者底层连接已经关闭时,会首先回调所有业务处理器的channelInactive()方法。
channelUnregistered():通道和NioEventLoop反应器解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的channelUnregistered ()方法。
handlerRemoved():Netty会移除掉通道上所有的业务处理器,并且回调所有业务处理器的handlerRemoved()方法。
在上面的6个生命周期方法中,前面3个在通道创建和绑定时被先后回调,后面3个在通道关闭时会先后被回调。
除了生命周期的回调,还有数据传输的入站回调方法。对于Inhandler入站处理器,有两个很重要的回调方法:
channelRead():有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。
channelReadComplete():流水线完成入站处理后,会从前向后依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。