阿言聊Java之BufferedInputStream困扰记

阿言聊Java之BufferedInputStream困扰记

背景

最近一直在忙公司的项目,涉及到Hadoop计算与存储分离的核心插件优化。这个插件的作用主要是起到把Hadoop的存储层从HDFS替换成对象存储,这样Hadoop计算用于读取数据和存储计算结果的存储池就可以无限扩展,并且会得到其高IOPS的访问性能。但是目前的问题就出现在这个插件上,读取速度总是很慢。刚开始确实还怀疑对象存储的性能问题,所以跟直接读写HDFS做了测试对比,发现对象存储读比HDFS略快,写的时候,如果对象存储用了并行,速度是HDFS的1.4倍(走测试用例和单独HDFS测试的机器配置相同)。所以第一直觉应该是插件慢的问题,于是多年不跟Java接触的我“孤身深入”大数据部门“虎穴”蹲守多次,有时一坐就是一个下午...不过我还是有了不少收获:

  1. Hadoop的使用方式;平时基本上很少能直接触碰到这个大数据分析的“明星“的,这次自己也能实操一遍,同时结合实践,查阅相关资料,对Hadoop的整体架构、工作机制有了个客观认识;
  2. Hadoop文件存储层插件的实现方式;深刻体会到开源“明星”设计的博大精深之处,文件系统抽象层的设计使得Hadoop这套大数据分析框架的底层存储可以灵活接入,只要你实现了对应的文件系统接口,你可以接入任意的物理存储实体;
  3. Java流:虽然都是流式的数据,但是流的底层数据来源会影响到应用上层获取数据要处理的行为。
  4. 解决困难问题不能急:再难的问题都有其原因,需要先了解场景,大家期待的是什么样子,而表现的差异在哪里,再把差异缩小范围......特别是自己不擅长的领域需要自己先仔仔细细学习其相关的知识,不论是书本还是来自同事的帮助解答,当然想我这种情况最快的方式就是向大数据部门的同事咨询啦;

原理

实现 FileSystem 的子类OSFileSystem,并把该存储实例注册到Hadoop中,这样当用hadoop操作文件时,如果指定的路径是对象存储对应的路径(这个一般用不同的Scheme前缀来表示不同的存储层,比如HDFS的就是默认hdfs,假设这里是os://mybucket/myfile.txt),那么hadoop会直接调用对应的OSFileSystem类来操作对象存储mybucket桶下的myfile.txt文件;

这里OSFileSystem需要实现的几个重要的接口:

// 初始化FileSystem
void initialize(URI name, Configuration conf);
// 获取指定路径的文件状态对象,包含了文件的元数据信息
abstract FileStatus getFileStatus(Path f);
// 获取指定路径下的文件或者目录状态对象
abstract FileStatus[] listStatus(Path f);
// 创建文件目录
abstract boolean mkdirs(Path f, FsPermission permission);
// 获取FileSystem的Scheme类型
String	getScheme();
// 获取当前FileSystem的工作路径
abstract Path getWorkingDirectory();
// 设置FileSystem的工作路径
abstract Path getWorkingDirectory();
abstract boolean delete(Path f, boolean recursive);
abstract boolean rename(Path src, Path dst);
// 打开一个文件,并把它作为输出流返回
abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress);
// 打开一个文件,并把它作为输入流返回
abstract FSDataInputStream open(Path f, int bufferSize);

这里包含了三类操作:

  1. 涉及文件系统控制的操作,其实就是操作对象存储的Key;
  2. 获取对象存储Value的下载流;
  3. 获取对象存储Value的上传流;

以下函数就是获取对象存储Value的下载流,

// 打开一个文件,并把它作为输入流返回
abstract FSDataInputStream open(Path f, int bufferSize);

其中的原理就是实现返回一个是一个实现了FSDataInputStream的类实例(假设这个类名字叫做OSInputStream),OSInputStream中通过调用对象存储的SDK去下载文件,返回的Http Response中的Content包装成InputStream作为OSInputStream的成员变量(假设为inputStream),当上层Hadoop调用OSInputStream重写了FSDataInputStream的方法:

public synchronized int read(byte buf[], int off, int len);

就会从inputSream中读取数据放入buf参数指定的数组中;

问题现象

同样的机型配置,同样的测试用例,如果Hadoop集群走HDFS存储层时的处理速度是走对象存储的2倍!这现象太假了,因为我单独通过工具直接读写HDFS和对象存储时,对象存储的读写速度还比HDFS快...

测试用例的处理逻辑是,读取部分数据(比如说1MB),处理后,再读取下一部分数据处理,也就是不连续的读取。同时在对象存储的服务端日志中发现了报错信息提示客户端读取速度过慢。这里提下在下载情况下,我们对象存储的实现逻辑是有一个限速保证的,我们在每个客户端下载连接都有一个发送Buffer,每隔一段时间检查发现发送Buffer的数据还没有发送出去,就会把下载的速度降低。所以有了这个原理,我就很容易推论出这个过程了(假设了这个处理的时间线):

  1. Hadoop插件打开一个文件的OSInputStream;
  2. 然后立即调用这个OSInputStream的read方法,不过此时只读1MB;
  3. 然后Hadoop去处理分析这边数据;
  4. 对象存储刚开始很快,Buffer却只被消费了一小块,所以很快又填满了这个Buffer,于是降低发送速度,同时睡眠;
  5. Hadoop分析完了刚才的1MB数据后,再次来取一块去分析,然后发送了数据的网络传输,这里也耗时了;
  6. Hadoop分析完了刚才的1MB数据后,再次来取下一块去分析,然后发送了数据的网络传输,这里也耗时了;
  7. Hadoop分析完了刚才的1MB数据后,再次来取一块去分析,发现是空的,返回空数据;
  8. 对象存储该处理该请求的会话唤醒,再次填充发送Buffer;
  9. ...
注意这里的时间线是推测的一种情况

从上面的处理过程来看,我们很容易知道慢在哪里了,网络次数过多,同时由于分析读取再分析读取这种场景,导致对象存储端的发送速度一再下降,所以加剧了整个处理时延;

那要解决这个问题就有思路了,给OSInputStream的inputStream成员再包装一层BufferedInputStream不就可以了嘛 ,这样减少了网络IO次数,把大量IO查询操作变成了本地内存操作,能不快吗?

于是里面做了这样的封装,并发BufferedInputStream的buffer size设置成了128MB,按照我的想法:

  1. 第一次调用OSInputStream的read方法读1MB数据,内部调用inputStream(注意此时它是一个BufferedInputStream),发现自己的Buffer的是空的,里面调用底层的InputStream(也就是HTTPResponse的Content返回的流),从对象存储总直接加载64MB的数据到Buffer中,因为一次性读这么多,对象存储也会全力以赴吐出数据,所以这个耗时不会太长;
  2. 第二次读取时,很明显直接从inputStream的buffer中读取;
  3. ...
  4. 当inputStream中的buffer消费完后再次从底层的InputStream中读取64MB数据来填满buffer,同时一大块的数据连续读取,对象存储的发送速度也能得到保证;
  5. ...

改造后测试,却让我大跌眼镜( ⊙ o ⊙ )啊!竟然没啥卵用......

于是我在插件中加了debug信息,在每次调用read的时候答应下inputStream的可读buffer大小,调用的接口是:

public int available();

结果让我惊讶的是,竟然是0!0!0!那很显然啊,这个buffer加了跟没加一样啊,不慢才怪呢。我决定看下 BufferedInputStream源码

......
66: public class BufferedInputStream extends FilterInputStream
67: {
......
135:   public BufferedInputStream(InputStream in, int size)
136:   {
137:     super(in);
138:     if (size <= 0)
139:       throw new IllegalArgumentException();
140:     buf = new byte[size];
141:     // initialize pos & count to bufferSize, to prevent refill from
142:     // allocating a new buffer (if the caller starts out by calling mark()).
143:     pos = count = bufferSize = size;
144:   }
......
259:   public synchronized int read(byte[] b, int off, int len) throws IOException
260:   {
261:     if (off < 0 || len < 0 || b.length - off < len)
262:       throw new IndexOutOfBoundsException();
264:     if (len == 0)
265:       return 0;
267:     if (pos >= count && !refill())
268:       return -1;        // No bytes were read before EOF.
270:     int totalBytesRead = Math.min(count - pos, len);
271:     System.arraycopy(buf, pos, b, off, totalBytesRead);
272:     pos += totalBytesRead;
273:     off += totalBytesRead;
274:     len -= totalBytesRead;
276:     while (len > 0 && super.available() > 0 && refill())
277:       {
278:     int remain = Math.min(count - pos, len);
279:     System.arraycopy(buf, pos, b, off, remain);
280:     pos += remain;
281:     off += remain;
282:     len -= remain;
283:     totalBytesRead += remain;
284:       }
286:     return totalBytesRead;
287:   }
......
347:   private boolean refill() throws IOException
348:   {
349:     if (buf == null)
350:       throw new IOException("Stream closed.");
352:     if (markpos == -1 || count - markpos >= marklimit)
353:       {
354:     markpos = -1;
355:     pos = count = 0;
356:       }
357:     else
358:       {
359:     byte[] newbuf = buf;
360:     if (markpos < bufferSize)
361:       {
362:         newbuf = new byte[count - markpos + bufferSize];
363:       }
364:     System.arraycopy(buf, markpos, newbuf, 0, count - markpos);
365:     buf = newbuf;
366:     count -= markpos;
367:     pos -= markpos;
368:     markpos = 0;
369:       }
371:     int numread = super.read(buf, count, bufferSize);
373:     if (numread <= 0)    // EOF
374:       return false;
376:     count += numread;
377:     return true;
378:   }
379: }

大致看了下,其实思路也是类似,当读取的时候如果buf是空的就调用refill从in中读取,不过这里in被交给了FilterInputStream来管理;这里注意371~374行,这个逻辑,从TCP流中读取了一次就直接返回了,这里很有问题啊,想象下,由于TCP流中的的接收缓存是有限的,所以应用层能从传输层中获取的数据量是有限的,当这个bufferSize明显的超过了TCP的接收完的数据长度时,那371行的调用也就只能获取那小部分已经接收到的数据了,所以在Debug中发现每次读取后,读取的长度都是几十KB到一两百KB那么样子,结果BufferedInputStream的buffer根本用不到就被上层每次调用消费掉了(一次读1MB);

Fix it

为了修正这个问题,我自己重写了一个带Buffer的InputStream:BufInputStream

package cn.xxx.xxx.fs;
import java.io.IOException;
import java.io.InputStream;
public class BufInputStream extends InputStream {
    /** 对应的数据流,这里应该是与UFile的TCP流 */
    private InputStream in;
    /** 用来缓存预读的数据 */
    private byte[] buf;
    /** 已经从in中读入到buf中的总数据量有多少 */
    private int count = 0;
    /** buf的读偏移 */
    private int readOffset = 0;
    /** 流in是否读取完毕,既是否已经碰到EOF */
    private boolean finish = false;
    /** 该流是否读取完毕,前提是finish为True,且buf消费完(count == readOffset) */
    private boolean closed = false;
    /** 已经被消费过的字节数 */
    private int hasConsumed = 0;
    /** 已经被消费过的字节数 */
    private byte[] oneByte = null;
    private LOGLEVEL loglevel;
    public BufInputStream(LOGLEVEL loglevel, InputStream in, int bufSize) {
        this.loglevel = loglevel;
        this.in = in;
        this.buf = new byte[bufSize];
    @Override
    public int available() throws IOException {
        return this.availableInBuff();
    public int availableInBuff() throws IOException {
        /** 不用buf的长度来计算是因为,有可能流的长度不一定是buf的整数倍,最后一段可能未填满buf*/
        int available = count - readOffset;
        if (available < 0) throw new IOException("[BufInputStream.availableInBuff] count is smaller than readOffset");
        return available;
    @Override
    public void close() throws IOException {
        this.in.close();
    @Override
    public synchronized int read() throws IOException {
        if (oneByte == null) {
            oneByte = new byte[1];
        int rs = read(oneByte, 0, 1);
        /** 这里如果返回0,是返回-1告诉其结尾吗? 理论上底层read不会返回0*/
        if (-1 == rs) { return -1; }
        if (0 == rs) throw new IOException("[BufInputStream.read] read zero byte");
        final int i = oneByte[0] & 0XFF;
        return i;
    @Override
    public synchronized int read(byte[] b, int off, int len) throws IOException {
        if (off < 0 || len < 0 || b.length < len + off)
            throw new IndexOutOfBoundsException();
        this.fill();
        final int canRead = this.availableInBuff();
        if (canRead > 0) {
            int totalBytesRead = Math.min(canRead, len);
            System.arraycopy(buf, this.readOffset, b, off, totalBytesRead);
            this.readOffset += totalBytesRead;
            this.hasConsumed += totalBytesRead;
            return totalBytesRead;
        if (this.finish) {
            return -1;
        return 0;
     * 真正发生IO操作的方法,它会从TCP流中获取字节数据填满buf
     * 只有当buf被消耗完时才会对in进行读取操作,这样可以集中read操作,发挥UFile的吞吐能力
     * @throws IOException
    private void fill() throws IOException {
        if (!this.finish && this.count == this.readOffset) {
            int readSum = 0;
            int offSet = 0;
            while (readSum < this.buf.length) {
                int c = in.read(this.buf, offSet, this.buf.length - readSum);
                UFileUtils.Data(loglevel, "[BufInputStream.fill] offset:%d buf.len:%d readSum:%d c:%d \n",
                        offSet,
                        this.buf.length,
                        readSum,
                if (-1 == c) {
                    this.finish = true;
                    break;
                } else if (0 != c) {
                    offSet += c;
                    readSum += c;
            this.count = readSum;
            this.readOffset = 0;
     * 这里覆盖父类的skip方法,因为数据流的一部分数据提前预先提取到buf中,可能只需在buf中移动偏移即可
     * @param n 需要skip的字节数
     * @return  返回实际skip的字节数
     * @throws IOException
    @Override
    public long skip(long n) throws IOException {
        /** 实际skip过的字节数 */
        long skipCount = 0;
        while (true) {
            if (isClosed()) {
                return skipCount;
            long avaiCount = available();
            if (n <= avaiCount) {
                /** 如果n小于剩余未消费的buf,那么只需要对buf的偏移进行操作即可*/
                readOffset += n;
                skipCount += n;
                return skipCount;
            } else {
                /** 如果还没有填充数据,先从流in中读取填充buf */
                if (avaiCount <= 0) {
                    fill();
                    /** 因为有可能重新读取过后,buf里的数据会超过剩余需要skip的n个字节数*/
                    continue;
                skipCount += avaiCount;
                readOffset = count;
                n -= avaiCount;
     * 该方法来判断该流是否已经结束,因为in的消费碰到EOF,不代表上层
     * 已经从当前流中完全消费了,因为有可能在buf中的数据还没有被上层
     * 消费,所以判断关闭的条件为两个:
     * 1. buf里没有可以消费的数据
     * 2. 流in已经读取到文件结尾
     * @return
    private boolean isClosed() throws IOException {
        if (available() <= 0 && finish) {