·  阅读

回到上篇文章的讲的原理,那么android客户端该如何实现AIO Sokcet编程呢?在此先要感谢三刀同学提供的Android版AIO框架 smart-ioc (虽然框架使用Java NIO 的技术写的,但是整个框架处理机制还是异步非阻塞的,所以姑且认为是AIO框架),给我们提供了即简便又高效的IM基础框架。当然使用别人框架的时候还是要隆重介绍下😁,下面我们简单介绍下smart-ioc

smart-ioc简介

Java NIO实现的smart-socket android 版通信框架,他是smart-socket精简版本,是作者为了广大android开发者想要使用,又碍于api限制的福音,在这里再次感谢作者。

前言

JDK7虽然已经发布很长一段时间了,但开源社区对于其AIO的新特性貌似并不热情。对于通信方面的技术诉求,似乎大家都习惯于Netty、Mina。 “Stop Trying to Reinvent the Wheel”不要重复造轮子,几乎每个程序员都被灌输过这个概念,理所当然的沉浸在各自的舒适区,享受着开源社区提供的各项技术支撑。 举个跟本文相契合的例子,如果工作中遇到通信相关的需求,广大Java程序员脑海里首先想到的必然是Netty或Mina,即便从未接触过Netty/Mina,但在心里认定只有这两个框架才能解决自己面临的问题。 这样的现状可能归咎于现在我们太急躁,工作的压力致使没时间给自己充电,尤其是那种不常用且稍微有点深度的技术点,已经没心力再去细细琢磨了。 所幸还有一批好学的程序员在工作之余做着一些看似平凡的事,踏踏实实专研这些技术点并作出一些小小的作品,同时为开源社区注入新鲜血液。 目前码云上已知的Java AIO项目有Voovan、baseio以及本文的主角:smart-socket,这几个作品还无任何一款能形成足够的影响力被广大Javaer认可,但通过开源的推广与磨练,相信未来AIO的开源环境会比现在更加繁荣。

smart-socket是什么

smart(百度翻译:聪明的;敏捷的;漂亮的;整齐的),从为项目起名开始,便对其寄予了厚望。专注于通信组件的研究与发展,摒弃一切大而全的解决方案,仅提供小而美的基础服务。无论您是从事IOT、IM、RPC,还是其余通信领域的开发工作,相信smart-socket都是非常酷的选择。如果要用一句话来为smart-socket打call,那就是:遇见smart-socket,你就已经赢在起跑线了。

smart-socket立项之初便已严苛的要求进行开发,追求各方面都达到极致。首先,smart-socket是个非常轻量级的项目,只有依赖slf4j作为项目的日志组件。smart-socket发布的jar包仅仅20KB,简洁的接口设计可以非常方便的在业务中接入通信服务。不过我们更期望看到的是接触到smart-socket的朋友可以将其作为学习Java Socket编程的素材,如果smart-socket能在这方面给予您一丝帮助,那我便会绝对自己做了一件有意义的事。

准备工作

smart-ioc代码清单

名称 类型 可见性 说明
Protocol interface public 协议接口
MessageProcessor interface public 消息处理器接口
Filter interface public 过滤器接口
StateMachine @interface public 服务状态码
AioQuickClient class public AIO客户端
AioSession class public AIO传输会话
IoServerConfig class package AIO服务配置
ReadCompletionHandler class package AIO读操作CompletionHandler实现类
WriteCompletionHandler class package AIO写操作CompletionHandler实现类
FastBlockingQueue class package 自定义队列

核心接口

Protocol

public interface Protocol<T> {
    public T decode(final ByteBuffer data, AioSession<T> session, boolean eof);
    public ByteBuffer encode(T msg, AioSession<T> session);
}复制代码

Protocol是一个泛型接口,指的是业务消息实体类,Protocol定义了消息编解码的接口,我们先来了解一下其中的两个方法decode、encode。

  • decode:消息解码,AIO的数据传输是以ByteBuffer为媒介的。在读取到数据并填充到ByteBuffer后,会调用Protocol实现类的decode方法,并将ByteBuffer作为第一个参数传入,而第二个参数AioSession为当前Socket连接的会话对象,后续会详解。 Protocol实现类从ByteBuffer中读取字节并按其协议规则进行消息解码,待解码完成后封装成业务对象并返回。不过实际情况下,已读取到并传入ByteBuffer的字节可能不足以完成消息解码(即所谓的:半包/拆包),Protocol实现类可根据其实际情况选择部分解码或等待ByteBuffer足以完成解码后再执行解码操作,不过在消息未完成解码的情况下必须返回null。
  • encode:消息编码,业务消息在输出至网络对端前,需要将其编码成字节流,也是以ByteBuffer为载体的。该方法的第一个参数泛型T便是业务消息对象。Protocol的实现类也得按照业务规则,将T指代的对象转为ByteBuffer并返回,smart-socket会将编码后的ByteBuffer输出。

MessageProcessor

public interface MessageProcessor<T> {
    public void process(AioSession<T> session, T msg);
    void stateEvent(AioSession<T> session, @StateMachine int stateMachine, Throwable throwable);
}复制代码

MessageProcessor定义了消息处理器接口,在通过Protocol完成消息解码后,会将消息对象交由MessageProcessor实现类进行业务处理。

  • process消息处理器,每接收到一个完整的业务消息,都会交由该处理器执行。
  • stateEvent:执行状态回调。MessageProcessor实现类可在此方法中处理其关注的事件。

Filter

Filter是框架提供的通信层过滤器接口,用户可基于该接口开发一些扩展性服务。这个接口不常用,但利用的好的话可以帮助你获悉服务器的运行状况。

public interface Filter<T> {
    public void readFilter(AioSession<T> session, int readSize);
    public void processFilter(AioSession<T> session, T msg);
    public void processFailHandler(AioSession<T> session, T msg, Throwable e);
    public void writeFilter(AioSession<T> session, int writeSize);
}复制代码
  • readFilter:读操作过滤,每当发生读操作便会触发该方法。第一个参数AioSession为本次发生读事件的会话,第二个参数readSize为本次读取到的字节数
  • processFilter:消息处理过滤器,每一个业务消息在执行MessageProcessor.process之前都会先执行一遍Filter.processFilter
  • processFailHandler:当业务消息执行processFilter出现运行时异常时,会触发processFailHandler
  • writeFilter:写操作过滤,每当发生写操作便会触发该方法。第一个参数AioSession为本次发生写事件的会话,第二个参数writeSize为本次输出的字节数

服务状态码

当特定事件发生时通知消息处理器MessageProcessor的实现类,用户需要自己实现处理

状态码 说明
NEW_SESSION 网络连接建立时触发,连接建立时会构建传输层的AioSession,如果业务层面也需要维护一个会话,可在此状态机中处理
INPUT_SHUTDOWN 数据读取完毕时触发,即传统意义中的 read()==-1
INPUT_EXCEPTION 读数据过程中发生异常时触发此状态机
OUTPUT_EXCEPTION 写数据过程中发生异常时触发此状态机
SESSION_CLOSING 触发了AioSession.close方法,但由于当前AioSession还有未完成的事件,会进入SESSION_CLOSING状态
SESSION_CLOSED AioSesson完成close操作后触发此状态机
PROCESS_EXCEPTION 业务处理异常
SESSION_CONNECT_FAILED 会话连接失败

服务配置IoServerConfig

配置项 类型 默认值 备注
BANNER String - 控制台打印的启动banner
VERSION String v1.3.11 当前smart-socket版本号
writeQueueSize int 4 AioSession中的输出缓存队列长度
readBufferSize int 512 AioSession进行数据读操作是ByteBuffer大小,单位:byte
host String null 客户端连接远程服务器的地址
filters Filter数组 [ ] 定义过滤器数组
port int 8088 服务端开放的端口号
processor MessageProcessor null 自定义消息处理器
protocol Protocol null 自定义协议编解码

AsynchronousSocketChannel

模拟JDK7的AIO处理方式写的异步SocketChannel,也就是SocketChannel的封装类。目的兼容低版本的android版本能够使用AIO的处理方式。

属性名 类型 说明
readBuffer ByteBuffer 需要从SocketChannel读取字节的字节缓冲区
writeBuffer ByteBuffer 需要写进SocketChannel的字节缓冲区
readCompletionHandler ReadCompletionHandler 读事件回调处理类
writeCompletionHandler WriteCompletionHandler 写事件回调处理类
selectionKey SelectionKey SocketChannel注册Selector操作类
channel SocketChannel 实践操作socket通道

核心方法

方法 说明
public AsynchronousSocketChannel(SelectionKey selectionKey) 构造函数
public final void read(ByteBuffer dst, A attachment, CompletionHandler handler) 异步读
public final void write(ByteBuffer dst, A attachment, CompletionHandler handler) 异步写
public void doRead() 从channel中读取操作
public void doWrite() 从channel写读取操作

通信会话AioSession

AioSession是smart-ioc中最核心、复杂度最高的类

核心成员属性

属性名 类型 说明
status byte 当前会话的状态,取值范围:SESSION_STATUS_CLOSED(1),处于该状态的AioSession无法再进行读写操作;SESSION_STATUS_CLOSING(2),AioSession状态从SESSION_STATUS_ENABLED到SESSION_STATUS_CLOSED的过渡状态。在SESSION_STATUS_CLOSING状态下,AioSession不接受新的读写请求,但会把缓存中待输出的数据进行写操作,输出完毕后更改状态至SESSION_STATUS_CLOSED;SESSION_STATUS_ENAB(3),AioSessio的默认状态,表示当前会话状态可以进行正常的消息通信、协议编解码、业务处理业务处理
writeCacheQueue FastBlockingQueue 输出缓冲队列
readCompletionHandler ReadCompletionHandler 读回调
writeCompletionHandler WriteCompletionHandler 写回调
channel AsynchronousSocketChannel 当前AioSession映射的网络通道
semaphore Semaphore 信号量,控制输出资源的竞争
ioServerConfig IoServerConfig AioQuickClient透传过来的配置项

核心方法

方法名 说明
AioSession(AsynchronousSocketChannel, IoServerConfig, ReadCompletionHandler, WriteCompletionHandler) 唯一的一个构造方法
void readFromChannel() 数据解码——>业务处理——>注册读事件
public void write(final ByteBuffer buffer) 将编码后的业务消息写入缓冲区,并触发 writeToChannel()
void writeToChannel() 将缓冲区的数据写入至网络通道
public void close(boolean immediate) 关闭会话

write() 将数据buffer输出至网络对端

public final void write(final ByteBuffer buffer) throws IOException {
    if (isInvalid()) {
        throw new IOException("session is " + (status == SESSION_STATUS_CLOSED ? "closed" : "invalid"));
    if (!buffer.hasRemaining()) {
        throw new InvalidObjectException("buffer has no remaining");
    //是否设置了输出队列
    if (ioServerConfig.getWriteQueueSize() <= 0) {
        try {
            semaphore.acquire();
            //若当前无待输出的数据,则立即输出buffer
            writeBuffer = buffer;
            writeToChannel0(writeBuffer);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e.getMessage());
        return;
    } else if ((semaphore.tryAcquire())) {
    //若当前无待输出的数据,则立即输出buffer
        writeBuffer = buffer;
          writeToChannel0(writeBuffer);
        return;
    try {
        //存放到写队列中
        writeCacheQueue.put(buffer);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    // 因为是异步的所以这里再次判断是否空闲
    if (semaphore.tryAcquire()) {
        writeToChannel();
}复制代码

writeToChannel()触发AIO的写操作需要调用控制同步

void writeToChannel() {
    if (writeBuffer != null && writeBuffer.hasRemaining()) {
          writeToChannel0(writeBuffer);
        return;
    if (writeCacheQueue == null || writeCacheQueue.size() == 0) {
        writeBuffer = null;
        semaphore.release();
        //此时可能是Closing或Closed状态
        if (isInvalid()) {
            close();
        //也许此时有新的消息通过write方法添加到writeCacheQueue中
        else if (writeCacheQueue != null && writeCacheQueue.size() > 0 && semaphore.tryAcquire()) {
            writeToChannel();
        return;
    int totalSize = writeCacheQueue.expectRemaining(MAX_WRITE_SIZE);
    ByteBuffer headBuffer = writeCacheQueue.poll();
    if (headBuffer.remaining() == totalSize) {
        writeBuffer = headBuffer;
    } else {
        if (writeBuffer == null || totalSize << 1 <= writeBuffer.capacity() || totalSize > writeBuffer.capacity()) {
            writeBuffer = ByteBuffer.allocate(totalSize);
        } else {
            writeBuffer.clear().limit(totalSize);
        writeBuffer.put(headBuffer);
        writeCacheQueue.pollInto(writeBuffer);
        writeBuffer.flip();
    writeToChannel0(writeBuffer);
}复制代码

writeToChannel0 触发通道的写操作

protected final void writeToChannel0(ByteBuffer buffer) {
    if (null != channel && channel.isOpen()) {
        channel.write(buffer, this, writeCompletionHandler);
}复制代码

readFromChannel 触发通道的读操作,当发现存在严重消息积压时,会触发流控

void readFromChannel(Integer readSize) {
    readBuffer.flip();
    T dataEntry;
    while ((dataEntry = ioServerConfig.getProtocol().decode(readBuffer, this, readSize)) != null) {
        //处理消息
        try {
            for (Filter<T> h : ioServerConfig.getFilters()) {
                h.processFilter(this, dataEntry);
            ioServerConfig.getProcessor().process(this, dataEntry);
        } catch (Exception e) {
            ioServerConfig.getProcessor().stateEvent(this, StateMachine.PROCESS_EXCEPTION, e);
            for (Filter<T> h : ioServerConfig.getFilters()) {
                h.processFail(this, dataEntry, e);
    if (readSize == -1 || status == SESSION_STATUS_CLOSING) {
        close(false);
        return;
    if (status == SESSION_STATUS_CLOSED) {
        return;
    //数据读取完毕
    if (readBuffer.remaining() == 0) {
        readBuffer.clear();
    } else if (readBuffer.position() > 0) {
        // 仅当发生数据读取时调用compact,减少内存拷贝
        readBuffer.compact();
    } else {
        readBuffer.position(readBuffer.limit());
        readBuffer.limit(readBuffer.capacity());
    readFromChannel0(readBuffer);
}复制代码

writeToChannel0 触发通道的读操作

protected final void readFromChannel0(ByteBuffer buffer) {
    if (null != channel && channel.isOpen()) {
        channel.read(buffer, this, readCompletionHandler);
}复制代码

AioQuickClient

成员属性

属性名 类型 说明
mAsynchronousSocketChannel AsynchronousSocketChannel 异步的socket通道,异步处理读写
mSession AioSession 客户端会话信息
mSelector Selector 选择器管理channel
config IoServerConfig 存储AioQuickClient服务配置项
config IoServerConfig 存储AioQuickClient服务配置项

核心方法

方法 说明
public AioQuickClient(String host, int port, Protocol protocol, MessageProcessormessageProcessor) 构造函数
public final void start() 开始连接服务器
private void acceptConnect(SelectionKey key) 接受并建立客户端与服务端的连接
public final void timeout() 连接超时需要手动调用这个方法
public final void shutdown() 关闭
构造函数
public AioQuickClient(String host, int port, Protocol<T> protocol, MessageProcessor<T> messageProcessor) {
    mConfig.setHost(host);
    mConfig.setPort(port);
    mConfig.setProtocol(protocol);
    mConfig.setProcessor(messageProcessor);
}复制代码

start()开始连接服务端

public final void start() {
    try {
        mSelector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        SelectionKey selectionKey = socketChannel.register(mSelector, SelectionKey.OP_CONNECT);
        mAsynchronousSocketChannel = new AsynchronousSocketChannel(selectionKey);
        socketChannel.connect(new InetSocketAddress(mConfig.getHost(), mConfig.getPort()));
//            ToolLog.e(TAG, "开始连接服务器");
        ClientThread serverThread = new ClientThread();
        serverThread.start();
    } catch (final IOException | UnresolvedAddressException e) {
        e.printStackTrace();
        mConfig.getProcessor().stateEvent(mSession, StateMachine.SESSION_CONNECT_FAILED, e);
}复制代码

内部类ClentThread 传输层Channel处理线程

class ClientThread extends Thread {

    @Override
    public void run() {
        try {
            while (mAsynchronousSocketChannel.isOpen()) {
                // 优先获取SelectionKey,若无关注事件触发则阻塞在selector.select(),减少select被调用次数
                Set<SelectionKey> keySet = mSelector.selectedKeys();
                if (keySet.isEmpty()) {
                    mSelector.select();
                Iterator<SelectionKey> keyIterator = keySet.iterator();
                // 执行本次已触发待处理的事件
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    // 读取数据
                    if (key.isReadable()) {
                        mAsynchronousSocketChannel.removeOps(SelectionKey.OP_READ);
                        mAsynchronousSocketChannel.doRead();
                    } else if (key.isWritable()) {// 输出数据
                        mAsynchronousSocketChannel.removeOps(SelectionKey.OP_WRITE);
                        mAsynchronousSocketChannel.doWrite();
                    } else if (key.isConnectable()) {// 建立新连接,Client触发Connect,Server触发Accept
                        acceptConnect(key);
                    // 移除已处理的事件
                    keyIterator.remove();
        } catch (Exception e) {
            shutdown();
}复制代码

acceptConnect() 处理接收到链接就绪操作

private void acceptConnect(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    channel.finishConnect();
    key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
     // 创建AioSession会话
    mSession = new AioSession<>(mAsynchronousSocketChannel, mConfig, new ReadCompletionHandler(), new WriteCompletionHandler());
    mSession.initSession();
    //触发当前已连接状态
    mConfig.getProcessor().stateEvent(mSession, StateMachine.NEW_SESSION, null);
}复制代码

结尾

如果有讲错望指正,特此感谢🙏


分类:
Android
  •