Netty 是一个高性能、异步事件驱动的 NIO 框架,它提供了对 TCP UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞的,通过 Future - Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。

作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于 Netty 的 NIO 框架构建。

协议解析代码:

package nettyserver;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.handler.timeout.IdleStateEvent;

import io.netty.util.ReferenceCountUtil;

import java .util.HashMap;

import java .util.Map;

class MyInHandler extends ChannelInboundHandlerAdapter {

final byte HEAD1 = 0x48;// H

final byte HEAD2 = 0x54;// T

final byte HEAD3 = 0x45;// E

final byte HEAD4 = 0x4D;// M

final byte HEAD5 = 0x50;// P

final byte HEAD6 = 0x3D;// =

public static final String SPLIT1 = "#";

public static Map<String, io.netty.buffer.ByteBuf> dicSn2Buffer = new HashMap<>();

// HTEMP=0026#1533022506#Meter - 001#01##

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

// 当出现异常就关闭连接

//cause.printStackTrace();

RemoveClient(ctx, "exceptionCaught");

@Override

public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception

CTxtHelp.AppendLog(" 客户端已经连接 ,sn:" + GetSN(ctx));

@Override

public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception

RemoveClient(ctx, "channelInactive");

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception

if (evt instanceof IdleStateEvent)

String type = " 未知 ";

IdleStateEvent event = (IdleStateEvent) evt;

if (null != event.state())

switch (event.state())

case READER_IDLE: type = "read idle"; break;

case WRITER_IDLE: type = "write idle"; break;

case ALL_IDLE: type = "all idle"; break;

default: break;

RemoveClient(ctx, " 超时退出 (" + type + ")");

super.userEventTriggered(ctx, evt);

@Override

public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg)

if (!ctx.channel().isActive()) return;

String sn = GetSN(ctx);

if(!dicSn2Buffer.containsKey(sn)) dicSn2Buffer.put(sn, Unpooled.buffer(1024));

io.netty.buffer.ByteBuf oldBuffer = dicSn2Buffer.get(sn);

if (null != msg)

io.netty.buffer.ByteBuf recvBuffer = (ByteBuf) msg;

int size = recvBuffer.readableBytes();

if (size > 0)

//recvBuffer.markReaderIndex();

oldBuffer.writeBytes(recvBuffer);

ReferenceCountUtil.release(recvBuffer);

//recvBuffer.resetReaderIndex();

//byte[] recv = new byte[size];

//recvBuffer.readBytes(recv);

//CTxtHelp.AppendLog(" 接收数据: " + CDataHelper.ArrayByteToString(recv));

byte head1 = 0; byte head2 = 0; byte head3 = 0; byte head4 = 0; byte head5 = 0; byte head6 = 0;

while (oldBuffer.isReadable())

oldBuffer.markReaderIndex();

head1 = oldBuffer.readByte(); if (!oldBuffer.isReadable()) { oldBuffer.resetReaderIndex(); return ; } // 掉包处理

if (HEAD1 == head1)// 垃圾包处理

head2 = oldBuffer.readByte(); if (!oldBuffer.isReadable()) { oldBuffer.resetReaderIndex(); return ; }

head3 = oldBuffer.readByte(); if (!oldBuffer.isReadable()) { oldBuffer.resetReaderIndex(); return ; }

head4 = oldBuffer.readByte(); if (!oldBuffer.isReadable()) { oldBuffer.resetReaderIndex(); return ; }

head5 = oldBuffer.readByte(); if (!oldBuffer.isReadable()) { oldBuffer.resetReaderIndex(); return ; }

head6 = oldBuffer.readByte(); if (!oldBuffer.isReadable()) { oldBuffer.resetReaderIndex(); return ; }

if (HEAD2 == head2 && HEAD3 == head3 && HEAD4 == head4 && HEAD5 == head5 && HEAD6 == head6)

break;

CTxtHelp.AppendLog("Error,Unable to parse the data2");

CTxtHelp.AppendLog("Error,Unable to parse the data1");

int lengthsize = 4;

if (GetWaitRecvRemain(oldBuffer, lengthsize)) { oldBuffer.resetReaderIndex(); return ; }

byte[] arrlen = new byte[lengthsize]; oldBuffer.readBytes(arrlen);

int lengthdata = CDataHelper.String2Int(CDataHelper.ArrayByteToString(arrlen));// if ( - 1 == len) return;

if (GetWaitRecvRemain(oldBuffer, lengthdata)) { oldBuffer.resetReaderIndex(); return ; }

byte[] source = new byte[lengthdata]; oldBuffer.readBytes(source);

String data = CDataHelper.ArrayByteToString(source);

CTxtHelp.AppendLog(" 解析后: " + data);

if (null == data || 0 == data.length() || data.length() - 1 != data.lastIndexOf(SPLIT1)) return;

data = data.substring(1, data.length() - 1);

String[] items = data.split(SPLIT1, - 1);

if (null == items || 4 != items.length) return;

String uid = items[0];

String taskid = items[1];

int cmd = CDataHelper.String2Int(items[2]);

String content = items[3];

String back = "";

switch (cmd)

case 1: back = "ok"; break;

case 2: break;

case 3: break;

catch (Exception ex)

CTxtHelp.AppendLog("error@" + ex.getMessage());

if (!"".equals(back))

byte[] sendBuff = GetProtocol(uid, cmd, back);

ByteBuf out = ctx.alloc().directBuffer(sendBuff.length);

out.writeBytes(sendBuff);

ctx.channel().writeAndFlush(out);

if (!oldBuffer.isReadable())

oldBuffer.clear();

channelRead(ctx, null);

boolean GetWaitRecvRemain( io.netty.buffer.ByteBuf buf, int len) {

return buf.readerIndex() + len > buf.writerIndex();

String GetSN(io.netty.channel.ChannelHandlerContext ctx)

return ctx.channel().remoteAddress().toString().replace("/", "");

void RemoveClient(io.netty.channel.ChannelHandlerContext ctx, String errmsg)

String sn = GetSN(ctx);

CTxtHelp.AppendLog(" 客户端退出 ,errmsg " + errmsg);

if(dicSn2Buffer.containsKey(sn)) {

io.netty.buffer.ByteBuf byteBuf = dicSn2Buffer.get(sn);

ReferenceCountUtil.release(byteBuf);

dicSn2Buffer.remove(sn);

ctx.close();

byte[] GetProtocol(String uid, int cmd, String msg)

String content = SPLIT1 + uid + SPLIT1 +

System.currentTimeMillis() + SPLIT1 +

CDataHelper.lpad(cmd, 2) + SPLIT1 +

msg + SPLIT1;

StringBuilder data = new StringBuilder();

data.append("HTEMP=");

data.append(CDataHelper.lpad(CDataHelper.GetStringLength(content), 4));

data.append(content);

return CDataHelper.StringToByte(data.toString());

String lpad(int number, int length) {

String f = "%0" + length + "d";

return String.format(f, number);

程序结构:

运行结果:

1. 包解析(正常包+掉包+粘包+垃圾包)

2. 超时清理链路

3. 并发测试