回到上篇文章的讲的原理,那么android客户端该如何实现AIO Sokcet编程呢?在此先要感谢三刀同学提供的Android版AIO框架 smart-ioc (虽然框架使用Java NIO 的技术写的,但是整个框架处理机制还是异步非阻塞的,所以姑且认为是AIO框架),给我们提供了即简便又高效的IM基础框架。当然使用别人框架的时候还是要隆重介绍下😁,下面我们简单介绍下smart-ioc
smart-ioc简介
Java NIO实现的smart-socket android 版通信框架,他是smart-socket精简版本,是作者为了广大android开发者想要使用,又碍于api限制的福音,在这里再次感谢作者。
前言
JDK7虽然已经发布很长一段时间了,但开源社区对于其AIO的新特性貌似并不热情。对于通信方面的技术诉求,似乎大家都习惯于Netty、Mina。 “Stop Trying to Reinvent the Wheel”不要重复造轮子,几乎每个程序员都被灌输过这个概念,理所当然的沉浸在各自的舒适区,享受着开源社区提供的各项技术支撑。 举个跟本文相契合的例子,如果工作中遇到通信相关的需求,广大Java程序员脑海里首先想到的必然是Netty或Mina,即便从未接触过Netty/Mina,但在心里认定只有这两个框架才能解决自己面临的问题。 这样的现状可能归咎于现在我们太急躁,工作的压力致使没时间给自己充电,尤其是那种不常用且稍微有点深度的技术点,已经没心力再去细细琢磨了。 所幸还有一批好学的程序员在工作之余做着一些看似平凡的事,踏踏实实专研这些技术点并作出一些小小的作品,同时为开源社区注入新鲜血液。 目前码云上已知的Java AIO项目有Voovan、baseio以及本文的主角:smart-socket,这几个作品还无任何一款能形成足够的影响力被广大Javaer认可,但通过开源的推广与磨练,相信未来AIO的开源环境会比现在更加繁荣。
smart-socket是什么
smart(百度翻译:聪明的;敏捷的;漂亮的;整齐的),从为项目起名开始,便对其寄予了厚望。专注于通信组件的研究与发展,摒弃一切大而全的解决方案,仅提供小而美的基础服务。无论您是从事IOT、IM、RPC,还是其余通信领域的开发工作,相信smart-socket都是非常酷的选择。如果要用一句话来为smart-socket打call,那就是:遇见smart-socket,你就已经赢在起跑线了。
smart-socket立项之初便已严苛的要求进行开发,追求各方面都达到极致。首先,smart-socket是个非常轻量级的项目,只有依赖slf4j作为项目的日志组件。smart-socket发布的jar包仅仅20KB,简洁的接口设计可以非常方便的在业务中接入通信服务。不过我们更期望看到的是接触到smart-socket的朋友可以将其作为学习Java Socket编程的素材,如果smart-socket能在这方面给予您一丝帮助,那我便会绝对自己做了一件有意义的事。
准备工作
smart-ioc代码清单
名称 | 类型 | 可见性 | 说明 |
---|---|---|---|
Protocol | interface | public | 协议接口 |
MessageProcessor | interface | public | 消息处理器接口 |
Filter | interface | public | 过滤器接口 |
StateMachine | @interface | public | 服务状态码 |
AioQuickClient | class | public | AIO客户端 |
AioSession | class | public | AIO传输会话 |
IoServerConfig | class | package | AIO服务配置 |
ReadCompletionHandler | class | package | AIO读操作CompletionHandler实现类 |
WriteCompletionHandler | class | package | AIO写操作CompletionHandler实现类 |
FastBlockingQueue | class | package | 自定义队列 |
核心接口
Protocol
public interface Protocol<T> {
public T decode(final ByteBuffer data, AioSession<T> session, boolean eof);
public ByteBuffer encode(T msg, AioSession<T> session);
}复制代码
Protocol是一个泛型接口,指的是业务消息实体类,Protocol定义了消息编解码的接口,我们先来了解一下其中的两个方法decode、encode。
- decode:消息解码,AIO的数据传输是以ByteBuffer为媒介的。在读取到数据并填充到ByteBuffer后,会调用Protocol实现类的decode方法,并将ByteBuffer作为第一个参数传入,而第二个参数AioSession为当前Socket连接的会话对象,后续会详解。 Protocol实现类从ByteBuffer中读取字节并按其协议规则进行消息解码,待解码完成后封装成业务对象并返回。不过实际情况下,已读取到并传入ByteBuffer的字节可能不足以完成消息解码(即所谓的:半包/拆包),Protocol实现类可根据其实际情况选择部分解码或等待ByteBuffer足以完成解码后再执行解码操作,不过在消息未完成解码的情况下必须返回null。
- encode:消息编码,业务消息在输出至网络对端前,需要将其编码成字节流,也是以ByteBuffer为载体的。该方法的第一个参数泛型T便是业务消息对象。Protocol的实现类也得按照业务规则,将T指代的对象转为ByteBuffer并返回,smart-socket会将编码后的ByteBuffer输出。
MessageProcessor
public interface MessageProcessor<T> {
public void process(AioSession<T> session, T msg);
void stateEvent(AioSession<T> session, @StateMachine int stateMachine, Throwable throwable);
}复制代码
MessageProcessor定义了消息处理器接口,在通过Protocol完成消息解码后,会将消息对象交由MessageProcessor实现类进行业务处理。
- process消息处理器,每接收到一个完整的业务消息,都会交由该处理器执行。
- stateEvent:执行状态回调。MessageProcessor实现类可在此方法中处理其关注的事件。
Filter
Filter是框架提供的通信层过滤器接口,用户可基于该接口开发一些扩展性服务。这个接口不常用,但利用的好的话可以帮助你获悉服务器的运行状况。
public interface Filter<T> {
public void readFilter(AioSession<T> session, int readSize);
public void processFilter(AioSession<T> session, T msg);
public void processFailHandler(AioSession<T> session, T msg, Throwable e);
public void writeFilter(AioSession<T> session, int writeSize);
}复制代码
- readFilter:读操作过滤,每当发生读操作便会触发该方法。第一个参数AioSession为本次发生读事件的会话,第二个参数readSize为本次读取到的字节数
- processFilter:消息处理过滤器,每一个业务消息在执行MessageProcessor.process之前都会先执行一遍Filter.processFilter
- processFailHandler:当业务消息执行processFilter出现运行时异常时,会触发processFailHandler
- writeFilter:写操作过滤,每当发生写操作便会触发该方法。第一个参数AioSession为本次发生写事件的会话,第二个参数writeSize为本次输出的字节数
服务状态码
当特定事件发生时通知消息处理器MessageProcessor的实现类,用户需要自己实现处理
状态码 | 说明 |
---|---|
NEW_SESSION | 网络连接建立时触发,连接建立时会构建传输层的AioSession,如果业务层面也需要维护一个会话,可在此状态机中处理 |
INPUT_SHUTDOWN |
数据读取完毕时触发,即传统意义中的
read()==-1
|
INPUT_EXCEPTION | 读数据过程中发生异常时触发此状态机 |
OUTPUT_EXCEPTION | 写数据过程中发生异常时触发此状态机 |
SESSION_CLOSING | 触发了AioSession.close方法,但由于当前AioSession还有未完成的事件,会进入SESSION_CLOSING状态 |
SESSION_CLOSED | AioSesson完成close操作后触发此状态机 |
PROCESS_EXCEPTION | 业务处理异常 |
SESSION_CONNECT_FAILED | 会话连接失败 |
服务配置IoServerConfig
配置项 | 类型 | 默认值 | 备注 |
---|---|---|---|
BANNER | String | - | 控制台打印的启动banner |
VERSION | String | v1.3.11 | 当前smart-socket版本号 |
writeQueueSize | int | 4 | AioSession中的输出缓存队列长度 |
readBufferSize | int | 512 | AioSession进行数据读操作是ByteBuffer大小,单位:byte |
host | String | null | 客户端连接远程服务器的地址 |
filters | Filter数组 | [ ] | 定义过滤器数组 |
port | int | 8088 | 服务端开放的端口号 |
processor | MessageProcessor | null | 自定义消息处理器 |
protocol | Protocol | null | 自定义协议编解码 |
AsynchronousSocketChannel
模拟JDK7的AIO处理方式写的异步SocketChannel,也就是SocketChannel的封装类。目的兼容低版本的android版本能够使用AIO的处理方式。
属性名 | 类型 | 说明 |
---|---|---|
readBuffer | ByteBuffer | 需要从SocketChannel读取字节的字节缓冲区 |
writeBuffer | ByteBuffer | 需要写进SocketChannel的字节缓冲区 |
readCompletionHandler | ReadCompletionHandler | 读事件回调处理类 |
writeCompletionHandler | WriteCompletionHandler | 写事件回调处理类 |
selectionKey | SelectionKey | SocketChannel注册Selector操作类 |
channel | SocketChannel | 实践操作socket通道 |
核心方法
方法 | 说明 |
---|---|
public AsynchronousSocketChannel(SelectionKey selectionKey) | 构造函数 |
public final void read(ByteBuffer dst, A attachment, CompletionHandler handler) | 异步读 |
public final void write(ByteBuffer dst, A attachment, CompletionHandler handler) | 异步写 |
public void doRead() | 从channel中读取操作 |
public void doWrite() | 从channel写读取操作 |
通信会话AioSession
AioSession是smart-ioc中最核心、复杂度最高的类
核心成员属性
属性名 | 类型 | 说明 |
---|---|---|
status | byte | 当前会话的状态,取值范围:SESSION_STATUS_CLOSED(1),处于该状态的AioSession无法再进行读写操作;SESSION_STATUS_CLOSING(2),AioSession状态从SESSION_STATUS_ENABLED到SESSION_STATUS_CLOSED的过渡状态。在SESSION_STATUS_CLOSING状态下,AioSession不接受新的读写请求,但会把缓存中待输出的数据进行写操作,输出完毕后更改状态至SESSION_STATUS_CLOSED;SESSION_STATUS_ENAB(3),AioSessio的默认状态,表示当前会话状态可以进行正常的消息通信、协议编解码、业务处理业务处理 |
writeCacheQueue | FastBlockingQueue | 输出缓冲队列 |
readCompletionHandler | ReadCompletionHandler | 读回调 |
writeCompletionHandler | WriteCompletionHandler | 写回调 |
channel | AsynchronousSocketChannel | 当前AioSession映射的网络通道 |
semaphore | Semaphore | 信号量,控制输出资源的竞争 |
ioServerConfig | IoServerConfig | AioQuickClient透传过来的配置项 |
核心方法
方法名 | 说明 |
---|---|
AioSession(AsynchronousSocketChannel, IoServerConfig, ReadCompletionHandler, WriteCompletionHandler) | 唯一的一个构造方法 |
void readFromChannel() | 数据解码——>业务处理——>注册读事件 |
public void write(final ByteBuffer buffer) |
将编码后的业务消息写入缓冲区,并触发
writeToChannel()
|
void writeToChannel() | 将缓冲区的数据写入至网络通道 |
public void close(boolean immediate) | 关闭会话 |
write() 将数据buffer输出至网络对端
public final void write(final ByteBuffer buffer) throws IOException {
if (isInvalid()) {
throw new IOException("session is " + (status == SESSION_STATUS_CLOSED ? "closed" : "invalid"));
if (!buffer.hasRemaining()) {
throw new InvalidObjectException("buffer has no remaining");
//是否设置了输出队列
if (ioServerConfig.getWriteQueueSize() <= 0) {
try {
semaphore.acquire();
//若当前无待输出的数据,则立即输出buffer
writeBuffer = buffer;
writeToChannel0(writeBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e.getMessage());
return;
} else if ((semaphore.tryAcquire())) {
//若当前无待输出的数据,则立即输出buffer
writeBuffer = buffer;
writeToChannel0(writeBuffer);
return;
try {
//存放到写队列中
writeCacheQueue.put(buffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 因为是异步的所以这里再次判断是否空闲
if (semaphore.tryAcquire()) {
writeToChannel();
}复制代码
writeToChannel()触发AIO的写操作需要调用控制同步
void writeToChannel() {
if (writeBuffer != null && writeBuffer.hasRemaining()) {
writeToChannel0(writeBuffer);
return;
if (writeCacheQueue == null || writeCacheQueue.size() == 0) {
writeBuffer = null;
semaphore.release();
//此时可能是Closing或Closed状态
if (isInvalid()) {
close();
//也许此时有新的消息通过write方法添加到writeCacheQueue中
else if (writeCacheQueue != null && writeCacheQueue.size() > 0 && semaphore.tryAcquire()) {
writeToChannel();
return;
int totalSize = writeCacheQueue.expectRemaining(MAX_WRITE_SIZE);
ByteBuffer headBuffer = writeCacheQueue.poll();
if (headBuffer.remaining() == totalSize) {
writeBuffer = headBuffer;
} else {
if (writeBuffer == null || totalSize << 1 <= writeBuffer.capacity() || totalSize > writeBuffer.capacity()) {
writeBuffer = ByteBuffer.allocate(totalSize);
} else {
writeBuffer.clear().limit(totalSize);
writeBuffer.put(headBuffer);
writeCacheQueue.pollInto(writeBuffer);
writeBuffer.flip();
writeToChannel0(writeBuffer);
}复制代码
writeToChannel0 触发通道的写操作
protected final void writeToChannel0(ByteBuffer buffer) {
if (null != channel && channel.isOpen()) {
channel.write(buffer, this, writeCompletionHandler);
}复制代码
readFromChannel 触发通道的读操作,当发现存在严重消息积压时,会触发流控
void readFromChannel(Integer readSize) {
readBuffer.flip();
T dataEntry;
while ((dataEntry = ioServerConfig.getProtocol().decode(readBuffer, this, readSize)) != null) {
//处理消息
try {
for (Filter<T> h : ioServerConfig.getFilters()) {
h.processFilter(this, dataEntry);
ioServerConfig.getProcessor().process(this, dataEntry);
} catch (Exception e) {
ioServerConfig.getProcessor().stateEvent(this, StateMachine.PROCESS_EXCEPTION, e);
for (Filter<T> h : ioServerConfig.getFilters()) {
h.processFail(this, dataEntry, e);
if (readSize == -1 || status == SESSION_STATUS_CLOSING) {
close(false);
return;
if (status == SESSION_STATUS_CLOSED) {
return;
//数据读取完毕
if (readBuffer.remaining() == 0) {
readBuffer.clear();
} else if (readBuffer.position() > 0) {
// 仅当发生数据读取时调用compact,减少内存拷贝
readBuffer.compact();
} else {
readBuffer.position(readBuffer.limit());
readBuffer.limit(readBuffer.capacity());
readFromChannel0(readBuffer);
}复制代码
writeToChannel0 触发通道的读操作
protected final void readFromChannel0(ByteBuffer buffer) {
if (null != channel && channel.isOpen()) {
channel.read(buffer, this, readCompletionHandler);
}复制代码
AioQuickClient
成员属性
属性名 | 类型 | 说明 |
---|---|---|
mAsynchronousSocketChannel | AsynchronousSocketChannel | 异步的socket通道,异步处理读写 |
mSession | AioSession | 客户端会话信息 |
mSelector | Selector | 选择器管理channel |
config | IoServerConfig | 存储AioQuickClient服务配置项 |
config | IoServerConfig | 存储AioQuickClient服务配置项 |
核心方法
方法 | 说明 |
---|---|
public AioQuickClient(String host, int port, Protocol protocol, MessageProcessormessageProcessor) | 构造函数 |
public final void start() | 开始连接服务器 |
private void acceptConnect(SelectionKey key) | 接受并建立客户端与服务端的连接 |
public final void timeout() | 连接超时需要手动调用这个方法 |
public final void shutdown() | 关闭 |
构造函数
public AioQuickClient(String host, int port, Protocol<T> protocol, MessageProcessor<T> messageProcessor) {
mConfig.setHost(host);
mConfig.setPort(port);
mConfig.setProtocol(protocol);
mConfig.setProcessor(messageProcessor);
}复制代码
start()开始连接服务端
public final void start() {
try {
mSelector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(mSelector, SelectionKey.OP_CONNECT);
mAsynchronousSocketChannel = new AsynchronousSocketChannel(selectionKey);
socketChannel.connect(new InetSocketAddress(mConfig.getHost(), mConfig.getPort()));
// ToolLog.e(TAG, "开始连接服务器");
ClientThread serverThread = new ClientThread();
serverThread.start();
} catch (final IOException | UnresolvedAddressException e) {
e.printStackTrace();
mConfig.getProcessor().stateEvent(mSession, StateMachine.SESSION_CONNECT_FAILED, e);
}复制代码
内部类ClentThread 传输层Channel处理线程
class ClientThread extends Thread {
@Override
public void run() {
try {
while (mAsynchronousSocketChannel.isOpen()) {
// 优先获取SelectionKey,若无关注事件触发则阻塞在selector.select(),减少select被调用次数
Set<SelectionKey> keySet = mSelector.selectedKeys();
if (keySet.isEmpty()) {
mSelector.select();
Iterator<SelectionKey> keyIterator = keySet.iterator();
// 执行本次已触发待处理的事件
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 读取数据
if (key.isReadable()) {
mAsynchronousSocketChannel.removeOps(SelectionKey.OP_READ);
mAsynchronousSocketChannel.doRead();
} else if (key.isWritable()) {// 输出数据
mAsynchronousSocketChannel.removeOps(SelectionKey.OP_WRITE);
mAsynchronousSocketChannel.doWrite();
} else if (key.isConnectable()) {// 建立新连接,Client触发Connect,Server触发Accept
acceptConnect(key);
// 移除已处理的事件
keyIterator.remove();
} catch (Exception e) {
shutdown();
}复制代码
acceptConnect() 处理接收到链接就绪操作
private void acceptConnect(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
channel.finishConnect();
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
// 创建AioSession会话
mSession = new AioSession<>(mAsynchronousSocketChannel, mConfig, new ReadCompletionHandler(), new WriteCompletionHandler());
mSession.initSession();
//触发当前已连接状态
mConfig.getProcessor().stateEvent(mSession, StateMachine.NEW_SESSION, null);
}复制代码
结尾
如果有讲错望指正,特此感谢🙏