寂寞的眼镜 · 笔记本外接显示屏模糊解决办法,调缩放比没用? ...· 1 月前 · |
英俊的羊肉串 · ktv魔性改编歌曲-西瓜视频搜索· 1 年前 · |
善良的四季豆 · 大隐者原创深度影评 ...· 1 年前 · |
玩命的棒棒糖 · 北京吉利新能源销售电话分期0首付购车|重卡| ...· 1 年前 · |
健身的毛衣 · 5G为汽车而生 上汽荣威高层访谈_汽车_中国网· 1 年前 · |
当网络中两个进程需要通信时,我们往往会使用
Socket
来实现。
Socket
都不陌生。当三次握手成功后,客户端与服务端就能通信,并且,彼此之间通信的数据包格式都是二进制,由
TCP/IP
协议负责传输。
当客户端和服务端取得了二进制数据包后,我们往往需要『萃取』出想要的数据,这样才能更好的执行业务逻辑。所以,我们需要定义好数据结构来描述这些二进制数据的格式,这就是通信网络协议。简单讲,就是需要约定好二进制数据包中每一段字节的含义,比如从第 n 字节开始的 m 长度是核心数据,有了这样的约定后,我们就能解码出想要的数据,执行业务逻辑,这样我们就能畅通无阻的通信了。
概要划分
一个最基本的网络协议必须包含
了解
TCP
协议的同学一定听说过
粘包、拆包
这两个术语。因为
TCP
协议是数据流协议,它的底层根据二进制缓冲区的实际情况进行包的划分。所以,不可避免的会出现
粘包,拆包
现象 。为了解决它们,我们的网络协议往往会使用一个 4 字节的
int
类型来表示数据的大小。比如,
Netty
就为我们提供了
LengthFieldBasedFrameDecoder
解码器,它可以有效的使用自定义长度帧来解决上述问题。
同时一个好的网络协议,还会将动作和业务数据分离。试想一下,
HTTP
协议的分为请求头,请求体——
Http Method
、
HTTP
版本
这就是一种分离关注点的思想。所以自定义的网络协议也可以包含:
code
来分门别类的代表不同的业务逻辑
JAVA
对象和二进制之间转换的形式,提供多种序列化/反序列化方式。比如
json
、
protobuf
等等,甚至是自定义算法。比如:
rocketmq
等等。
同时,协议的开头可以定义一个约定的
魔数
。这个固定值(4字节),一般用来判断当前的数据包是否合法。比如,当我们使用
telnet
发送错误的数据包时,很显然,它不合法,会导致解码失败。所以,为了减轻服务器的压力,我们可以取出数据包的前
4
个字节与固定的
魔数
对比,如果是非法的格式,直接关闭连接,不继续解码。
网络协议结构如下所示 :
+--------------+-----------+------------+-----------+----------+
| 魔数(4) | code(1) |序列化算法(1) |数据长度(4) |数据(n) |
+--------------+-----------+------------+-----------+----------+
RocketMQ 网络协议
这一小节,我们从
RocketMQ
中,分析优秀通信网络协议的实现。
RocketMQ
项目中,客户端和服务端的通信是基于 Netty 之上构建的。同时,为了更加有效的通信,往往需要对发送的消息自定义网络协议。
RocketMQ
的网络协议,从数据分类的角度上看,可分为两大类
从左到右
byte[0]
代表序列化算法,
byte[1~3]
才是真正的长度
RocketMQ 消息头协议详细如下:
Header 字段名 |
类型 |
Request |
Response |
---|---|---|---|
code |
整数 |
请求操作代码,请求接收方根据不同的代码做不同的操作 |
应答结果代码,0表示成功,非0表示各种错误代码 |
language |
字符串 |
请求发起方实现语言,默认JAVA |
应答接收方实现语言 |
version |
整数 |
请求发起方程序版本 |
应答接收方程序版本 |
opaque |
整数 |
请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用 |
应答方不做修改,直接返回 |
flag |
整数 |
通信层的标志位 |
通信层的标志位 |
remark |
字符串 |
传输自定义文本信息 |
错误详细描述信息 |
extFields |
HashMap<String,String> |
请求自定义字段 |
应答自定义字段 |
编码过程
RocketMQ
的通信模块是基于
Netty
的。通过定义
NettyEncoder
来实现对每一个
Channel
的 出栈数据进行编码,如下所示:
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
} catch (Exception e) {
}
其中,核心的编码过程位于
RemotingCommand
对象中,
encodeHeader
阶段,需要统计出消息总长度,即:
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> 消息头长度,一个整数表示:占4个字节
int length = 4;
// 2> 消息头数据
byte[] headerData;
headerData = this.headerEncode();
// 再加消息头数据长度
length += headerData.length;
// 3> 再加消息体数据长度
length += bodyLength;
// 4> 额外加 4是因为需要加入消息总长度,一个整数表示:占4个字节
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// 5> 将消息总长度加入 ByteBuffer
result.putInt(length);
// 6> 将消息的头长度加入 ByteBuffer
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// 7> 将消息头数据加入 ByteBuffer
result.put(headerData);
result.flip();
return result;
}
其中,
encode
阶段会将
CommandCustomHeader
数据转换
HashMap<String,String>
,方便序列化
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
if (value != null) {
this.extFields.put(name, value.toString());
}
特别的,消息头序列化支持两种算法:
JSON
RocketMQ
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
这儿需要值得注意的是,
encode
阶段将当前
RPC
类型和
headerData
长度编码到一个
byte[4]
数组中,
byte[0]
位序列化类型。
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
其中,通过与运算
& 0xFF
取低八位数据。
所以, 最终
length
长度等于序列化类型 + header length + header data + body data 的字节的长度。
解码过程
RocketMQ
解码通过
NettyDecoder
来实现,它继承自
LengthFieldBasedFrameDecoder
,其中调用了父类
LengthFieldBasedFrameDecoder
的构造函数
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
这些参数设置
4
个字节代表
length
总长度,同时解码时跳过最开始的
4
个字节:
frame = (ByteBuf) super.decode(ctx, in);
所以,得到的
frame
= 序列化类型 + header length + header data + body data 。解码如下所示:
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
//总长度
int length = byteBuffer.limit();
//原始的 header length,4位
int oriHeaderLen = byteBuffer.getInt();
//真正的 header data 长度。忽略 byte[0]的 serializeType
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
cmd.body = bodyData;
return cmd;
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
return null;