跟我学IO(BufferedInputStream类)

BufferedInputStream为另一个输入流添加一些功能,即缓冲输入以及支持 mark 和 reset 方法的能力。在创建 BufferedInputStream 时,会创建一个内部缓冲区数组。在读取或跳过流中的字节时,可根据需要从包含的输入流再次填充该内部缓冲区,一次填充多个字节。mark 操作记录输入流中的某个点,reset 操作使得在从包含的输入流中获取新字节之前,再次读取自最后一次 mark 操作后读取的所有字节。

BufferedInputStream为另一个输入流添加一些功能,即缓冲输入以及支持 mark 和 reset 方法的能力。在创建 BufferedInputStream 时,会创建一个内部缓冲区数组。在读取或跳过流中的字节时,可根据需要从包含的输入流再次填充该内部缓冲区,一次填充多个字节。mark 操作记录输入流中的某个点,reset 操作使得在从包含的输入流中获取新字节之前,再次读取自最后一次 mark 操作后读取的所有字节。

原理图如下:

BufferedInputStream实现原理

从上面的图中可以看出,客户端获取数据不是直接获取输入源中的数据,而是从BufferedInputStream的内部缓冲区获取。当BufferedInputStream内部缓冲区数据取完了就从InptStream输入流源载入数据到内部缓冲区,然后客户端再重内部缓冲区中取。

BufferedInputStream.java源码:

package java.io;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

public class BufferedInputStream extends FilterInputStream {
    // 缓冲区默认大小(8KB)
    private static int defaultBufferSize = 8192;
    // 缓冲数组,注意该成员变量同样使用了volatile关键字进行
    // 修饰,作用为在多线程环境中,当对该变量引用进行修改时
    // 保证了内存的可见性。
    protected volatile byte buf[];
    // 缓存数组的原子更新器,该成员变量与buf数组的volatile
    // 关键字共同组成了buf数组的原子更新功能实现。
    private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = 
        AtomicReferenceFieldUpdater.newUpdater(BufferedInputStream.class,  byte[].class, "buf");
    // 该成员变量表示目前缓冲区域中有多少有效的字节
    protected int count;
    // 该成员变量表示了当前缓冲区的读取位置
    protected int pos;
    // 表示标记位置,该标记位置的作用为:
    //  实现流的标记特性,即流的某个位置可以被设置为标记,
    // 允许通过设置reset(),将流的读取位置进行重置到该标
    // 记位置,但是InputStream注释上明确表示,该流不会无
    // 限的保证标记长度可以无限延长,即markpos=10,pos=139734,
    // 该保留区间可能已经超过了保留的极限
    protected int markpos = -1;
    // 该成员变量表示了上面提到的标记最大保留区间大小,当
    // (pos - markpos > marklimit)时,mark标记可能会被清除。
    protected int marklimit;

    // 检查确保底层输入流不为空,如果为空则流被关闭了
    private InputStream getInIfOpen() throws IOException {
        InputStream input = in;
        if (input == null) {
            throw new IOException("Stream closed");
        }
        return input;
    }

    // 检查确保buffer不为空,如果为空则流被关闭了
    private byte[] getBufIfOpen() throws IOException {
        byte[] buffer = buf;
        if (buffer == null) {
            throw new IOException("Stream closed");
        }
        return buffer;
    }

    public BufferedInputStream(InputStream in) {
        this(in, defaultBufferSize);
    }

    // 用指定的InputStream和缓冲区大小创建一个BufferedInputStream对象
    public BufferedInputStream(InputStream in, int size) {
        super(in);
        if (size <= 0) {
            throw new IllegalArgumentException("Buffer size <= 0");
        }
        // 初始化缓冲区
        buf = new byte[size];
    }

    /**
     * 提供了缓冲区域的读取、写入、区域元素的移动更新。
     * 假设它是被一个同步方法调用。这个方法也假设所有的数据都已经读入。
     * 因此pos > count
     */
    private void fill() throws IOException {
        byte[] buffer = getBufIfOpen();
        if (markpos < 0) {
            /**如果不存在标记位置(即没有需要进行reset的位置需求)
              则可以进行大胆地直接重置pos标识下一可读取位置,但是
              这样不是会读取到以前的旧数据吗?不用担心,在后面的
              代码里会实现输入流的新数据填充 */
            pos = 0;
        } else if (pos >= buffer.length) {
            /** 位置大于缓冲区长度,这里表示已经没有可用空间了 */
            if (markpos > 0) {
                /**表示存在mark位置,则要对mark位置到pos位置的数据予以保留,
                  以确保后面如果调用reset()重新从mark位置读取会取得成功 */
                int sz = pos - markpos;
                /** 该实现是通过将缓冲区域中markpos至pos部分的移至缓冲区头部实现 */
                System.arraycopy(buffer, markpos, buffer, 0, sz);
                pos = sz;
                markpos = 0;
            } else if (buffer.length >= marklimit) {
                /** 如果缓冲区已经足够大,可以容纳marklimit,则直接重置 */
                markpos = -1;
                /** 丢弃所有的缓冲区内容 */
                pos = 0;
            } else {
                /** 如果缓冲区还能增长的空间,则进行缓冲区扩容 */
                int nsz = pos * 2;
                /** 新的缓冲区大小设置成满足最大标记极限即可 */
                if (nsz > marklimit) {
                    nsz = marklimit;
                }
                byte nbuf[] = new byte[nsz];
                // 将原来的较小的缓冲内容COPY至增容的新缓冲区中
                System.arraycopy(buffer, 0, nbuf, 0, pos);
                // 这里使用了原子变量引用更新,确保多线程环境下内存的可见性
                if ( !bufUpdater.compareAndSet(this, buffer, nbuf) ) {
                    // Can't replace buf if there was an async close.
                    // Note: This would need to be changed if fill()
                    // is ever made accessible to multiple threads.
                    // But for now, the only way CAS can fail is via close.
                    // assert buf == null;
                    throw new IOException("Stream closed");
                }
                buffer = nbuf;
            }
        }

        count = pos;
        // 从原始输入流中读取数据,填充缓冲区
        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
        // 根据实际读取的字节数更新缓冲区中可用字节数 
        if (n > 0) {
            count = n + pos;
        }
    }

    public synchronized int read() throws IOException {
        if (pos >= count) {
            /** 表示读取位置已经超过了缓冲区可用范围,则对缓冲区进行重新填充 */
            fill();
            /** 当填充后再次读取时发现没有数据可读,证明读到了流末尾 */
            if (pos >= count)
                return -1;
        }
        /** 这里表示读取位置尚未超过缓冲区有效范围,直接返回缓冲区内容 */
        return getBufIfOpen()[pos++] & 0xff;
    }

    /**
     * Read characters into a portion of an array, reading from the underlying
     * stream at most once if necessary.
     */
    private int read1(byte[] b, int off, int len) throws IOException {
        int avail = count - pos;
        if (avail <= 0) {
            /** 如果读取的长度大于缓冲区的长度并且没有markpos,
               则直接从原始输入流中进行读取,从而避免无谓的 
               COPY(从原始输入流至缓冲区,读取缓冲区全部数据,清空缓冲区,
               重新填入原始输入流数据)*/
            if (len >= getBufIfOpen().length && markpos < 0) {
                return getInIfOpen().read(b, off, len);
            }
            /** 当无数据可读时,从原始流中载入数据到缓冲区中 */
            fill();
            avail = count - pos;
            if (avail <= 0) return -1;
        }
        int cnt = (avail < len) ? avail : len;
        /** 从缓冲区中读取数据,返回实际读取到的大小 */
        System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
        pos += cnt;
        return cnt;
    }

    public synchronized int read(byte b[], int off, int len) throws IOException {
        getBufIfOpen(); // Check for closed stream
        if ( (off | len | (off + len) | (b.length - (off + len))) < 0 ) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }
        int n = 0;
        for (;;) {
            int nread = read1(b, off + n, len - n);
            if (nread <= 0) 
                return (n == 0) ? nread : n;
            n += nread;
            if (n >= len)
                return n;
            // if not closed but no bytes available, return
            InputStream input = in;
            if (input != null && input.available() <= 0)
                return n;
        }
    }

    public synchronized long skip(long n) throws IOException {
        getBufIfOpen(); // Check for closed stream
        if (n <= 0) {
            return 0;
        }
        
        long avail = count - pos;
        if (avail <= 0) {
            //如果没有mark标记,则直接从原始输入流中skip
            if (markpos <0) 
                return getInIfOpen().skip(n);
            
            // Fill in buffer to save bytes for reset
            fill();
            
            avail = count - pos;
            if (avail <= 0)
                return 0;
        }
        
        // 该方法的实现为尽量原则,不保证一定略过规定的字节数
        long skipped = (avail < n) ? avail : n;
        pos += skipped;
        return skipped;
    }

    public synchronized int available() throws IOException {
        return getInIfOpen().available() + (count - pos);
    }

    public synchronized void mark(int readlimit) {
        marklimit = readlimit;
        markpos = pos;
    }

    public synchronized void reset() throws IOException {
        getBufIfOpen(); // Cause exception if closed
        if (markpos < 0)
            throw new IOException("Resetting to invalid mark");
        // 将当前读取位置设置为最后的标记位置
        pos = markpos;
    }

    public boolean markSupported() {
        return true;
    }

    public void close() throws IOException {
        byte[] buffer;
        while ( (buffer = buf) != null) {
            // 用线程安全的方式将内部缓冲区引用设置为空
            if (bufUpdater.compareAndSet(this, buffer, null)) {
                InputStream input = in;
                in = null;
                if (input != null)
                    input.close(); // 关闭输入流
                return;
            }
            // Else retry in case a new buf was CASed in fill()
        }
    }
}

从上面的源码可以分析出,BufferedInputStream实际上就是将我们的输入流进行包装,提供缓存功能。让我们避免经常直接与IO设备打交道,这样就能提高IO读取的效率。

下面我们来看一个实例:使用BufferedInputStream来读取一个字节文件的数据,然后输出到终端。

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;

public class BufferedInputStreamTest {

	public static void main(String[] args) throws Exception {
		FileInputStream in = new FileInputStream(new File("document/stream_test.txt"));
		BufferedInputStream bufferedInput = new BufferedInputStream(in);
		byte[] buffer = new byte[1024];
		int len = -1;
		while ( (len = bufferedInput.read(buffer)) != -1 ) {
			System.out.println(new String(buffer, 0, len));
		}
		bufferedInput.close();
		in.close();
	}
	
}
锲而舍之,朽木不折;锲而不舍,金石可镂。——《荀子·劝学》
0 不喜欢
说说我的看法 -
全部评论(
没有评论
目录
热门标签
热门文章
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号