Java:使用缓冲区迭代流

时间:2020-01-09 10:35:31  来源:igfitidea点击:

有时在迭代字节流时,单独查看每个字节是不够的。我们可能一直需要缓冲区中的流中至少有N个下一个字节(例如1024个字节)可用。例如,如果我们正在解析一种编程语言,则可能需要向前或者向后移入从流中读取的数据,换句话说,就是从流中读取数据。为了能够在流数据中来回移动,我们可能需要保留下一个内容,例如缓冲区中可用的1024或者2048(或者更多)字节。

为了解决这个问题,我将开发一个RollingBufferInputStream类,该类在缓冲区中至少保留N个字节可用。从另一个InputStream中读取字节。在本文的最后,有一个完整的代码列表" RollingBufferInputStream",我们可以复制该列表,因此我们不必自己进行开发。

在流中进行处理时,基本上是将数据读入缓冲区的末尾,然后将数据从缓冲区的顶部取出。实际上,我们不会从缓冲区顶部取出数据。我们只需将数据向上移动,然后再向下移至缓冲区。然后,我们将新数据从流中读取到缓冲区的末尾。

基本上,我们执行两个操作:

  • 将数据从缓冲区的底部复制到顶部。这也称为压缩缓冲区。
  • 在已将任何现有数据读入缓冲区之后,将新数据读入缓冲区。我将此操作称为在缓冲区中填充数据。如果缓冲区中没有数据,则从顶部开始填充。

缓冲区大小

要解决的第一个问题是用于保留数据的缓冲区的大小。如果需要访问至少1024个字节,则还需要一个至少1024个字节的缓冲区。换句话说,我们有两个要考虑的属性:

  • 块大小:我们需要访问的字节数。
  • 缓冲区大小:缓冲区的大小。必须至少等于块大小。

缓冲区大小=块大小

如果缓冲区正好具有"块大小"字节,则每次从字节0开始检查缓冲区时,都需要压缩缓冲区。或者是因为我们从索引0找到了我们要寻找的东西,或者是因为我们没有找到要寻找的东西,并且需要从索引1和"块大小"字节开始查找。

如果我们假设每次找不到所需的内容时都只能跳过1个字节,那么我们需要先将缓冲区剩余部分中的"块大小1"字节复制到缓冲区顶部,然后再复制再次查找,然后将1个字节读入缓冲区的最后一个单元。

例如,如果块大小和缓冲区大小均为1024字节,那么每次我们从索引0中找不到要查找的内容时,都需要将缓冲区的最后1023个字节复制到索引0中,并读取一个字节进入缓冲区的底部。这意味着为缓冲区中处理的每个字节复制1023个字节。这是非常昂贵的。

缓冲区大小>块大小

我们可以使缓冲区的大小大于块的大小,而不必为从缓冲区顶部读取的每个字节压缩缓冲区。如果使缓冲区大小等于"块大小+ 1",则只需要为读取的每两个字节压缩缓冲区即可。如果使缓冲区大小等于"块大小\ * 2",则只需将每个"块大小"字节压缩为一个缓冲区。

例如,如果块大小为1024,缓冲区大小为2048,则我们可以向下处理缓冲区1024个字节,直到缓冲区中剩余的字节数少于1024个为止。这意味着每处理1024个字节仅压缩1次。这产生了比每处理1个字节进行1次压缩要好得多的性能。

由于缓冲区的压缩可能很昂贵,因此所需的缓冲区压缩越少越好。因此,缓冲区越大越好。当然,我们必须权衡缓冲区的大小与应用程序中所需的其他内存。由于在磁盘上消耗太多内存而导致性能下降的应用程序也开始在硬盘上进行交换。通常,缓冲区大小是"块大小"的2到4倍就可以了。

这是块大小,缓冲区大小和流迭代原理的说明:

<img src =“ http://theitroad.local/images/java-howto/block-buffer-stream.png” alt =“大于所需块大小的缓冲区大小。” />
大于所需块大小的缓冲区大小。</ b>

RollingBufferInputStream

我开发的" RollingBufferInputStream"类可用于迭代" InputStream",同时确保缓冲区中始终有"块大小"字节可用。当然,除非达到流的末尾。

注意:即使该类称为" RollingBufferInputStream",它也不是" InputStream"子类。这是有道理的,因为在使用" RollingBufferInputStream"时,我们将不会使用" InputStream"的任何传统方法。我们将仅使用其特殊方法,并且将通过其内部缓冲区访问流中的数据。

使用RollingBufferInputStream

在向我们展示如何实现" RollingBufferInputStream"之前,我将首先向我们展示如何使用它。然后,我们可能会更好地了解其实现。

int               blockSize   = 1024;
byte[]            buffer      = new byte[blockSize * 4];

RollingBufferInputStream bufferInput =
    new RollingBufferInputStream(sourceInputStream, buffer);

while(bufferInput.hasAvailableBytes(blockSize)){

  boolean matchFound = lookForMatch(
                           bufferInput.getBuffer(),
                           bufferInput.getStart(),
                           bufferInput.getEnd());

  if(matchFound){
    localFileSource.moveStart(this.blockSize);
  } else {
    localFileSource.moveStart(1);
  }
}

首先,通过调用" hasAvailableBytes()"来询问缓冲区是否有N个可用字节。如果缓冲区不包含N个字节,则此方法将在内部尝试填充缓冲区。尝试执行此操作之后,该方法将检查缓冲区现在是否包含N个字节。如果是,则返回true。否则,它返回false。

其次,我们在缓冲区中寻找匹配项。 RollingBufferInputStream包含访问缓冲区,知道缓冲区中有多少字节以及当前起始索引其中的方法。

第三,将缓冲区中的起始索引增加适当的值。如果找到匹配项,此示例将跳过整个块。如果我们要解析一种语言,则可能只跳到find语句的末尾。如果未找到匹配项,则将起始索引移动1或者任何合适的值以向前跳过并从下一个索引开始搜索。

RollingBufferInputStream实现概述

这是RollingBufferInputStream类的第一个大纲。为了清楚起见,这里删除了一些代码。完整的代码列表可在本文末尾找到。该大纲主要用于向我们显示RollingBufferInputStream类的接口。

package com.Hyman.rsync;

import java.io.InputStream;
import java.io.IOException;

public class RollingBufferInputStream {

  InputStream source    = null;

  protected byte[]      buffer    = null;
  protected int         start     = 0; //current location in buffer.
  protected int         end       = 0; //current limit of data read
                                       //into the buffer
                                       //= next element to read into.

  protected int         bytesRead = 0;

  public RollingBufferInputStream(InputStream source, byte[] buffer) {
    this.source = source;
    this.buffer = buffer;
  }

  public byte[] getBuffer() {
    return buffer;
  }

  public int getStart() {
    return start;
  }

  public int getEnd() {
    return end;
  }

  public void moveStart(int noOfBytesToMove){
  }

  public int availableBytes() {
    return this.end - this.start;
  }

  public boolean hasAvailableBytes(int numberOfBytes) throws IOException {
    // if less than numberOfBytes available in buffer,
    // method will attempt to fill the buffer,
    // so at least numberOfBytes are available...
  }

}

" start"和" end"变量标记缓冲区中的开始索引和结束索引。当我们将起始索引向下移动通过缓冲区时,起始索引将继续进行。结束索引始终标记已读入缓冲区的字节数。

填充缓冲区

如果对" hasAvailableBytes()"的调用检测到缓冲区中没有足够的可用字节,它将尝试填充缓冲区。这是完整的hasAvailableBytes()方法实现,因此我们可以看到它的工作方式:

public boolean hasAvailableBytes(int numberOfBytes) throws IOException {

    if(! hasAvailableBytesInBuffer(numberOfBytes)){
      if(streamHasMoreData()){
        if(!bufferHasSpaceFor(numberOfBytes)){
          compact();
         }
         fillDataFromStreamIntoBuffer();
      }
    }
}

return hasAvailableBytesInBuffer(numberOfBytes);

private boolean hasAvailableBytesInBuffer(int numberOfBytes) {
  return   (this.end - this.start) >= numberOfBytes;
}

这是该方法的作用:

  • 检查缓冲区是否具有所需的可用字节数。
  • 如果不是,请检查流中是否有更多数据。
  • 如果流中有更多数据,请检查缓冲区底部是否有足够的数据空间。
  • 如果缓冲区没有足够的空间,则压缩缓冲区。
  • 缓冲区中填充了尽可能多的数据。

通过方法fillDataFromStreamIntoBuffer()将数据填充到缓冲区中,如下所示:

private void fillDataFromStreamIntoBuffer() throws IOException {
  this.bytesRead  = this.source.read(this.buffer, this.end,
                    this.buffer.length - this.end);
  this.end       += this.bytesRead;
}

如我们所见,此方法首先将尽可能多的数据读入缓冲区,并在内部存储字节数。然后,它以读取的字节数增加结尾索引。

实际上,此方法可能应该在while循环内将数据读入缓冲区,因此它将继续将数据读入缓冲区,直到缓冲区中有足够的字节为止。现在,它只是假设一次读取将产生足够的字节。我们可以根据需要解决此问题。

压缩缓冲区

只要缓冲区中没有足够的空间来读取新数据,便会压缩缓冲区。然后,缓冲区中剩余的所有数据(起始索引和结束索引之间)都将移至缓冲区的顶部。然后可以再次在底部填充缓冲区。这是compact()方法的实现方式:

private void compact() {
  int bytesToCopy = end - start;

    for(int i=0; i<bytesToCopy; i++){
      this.buffer[i] = this.buffer[start + i];
    }

    this.start = 0;
    this.end   = bytesToCopy;
}

完整代码列表

这是RollingBufferInputStream的完整代码列表:

package com.Hyman.rsync;

import java.io.InputStream;
import java.io.IOException;

public class RollingBufferInputStream {

  InputStream source    = null;

  protected byte[]      buffer    = null;
  protected int         start     = 0; //current location in buffer.
  protected int         end       = 0; //current limit of data read
                                       //into the buffer
                                       //= next element to read into.

  protected int         bytesRead = 0;

  public RollingBufferInputStream(InputStream source, byte[] buffer) {
    this.source = source;
    this.buffer = buffer;
  }

  public byte[] getBuffer() {
    return buffer;
  }

  public int getStart() {
    return start;
  }

  public int getEnd() {
    return end;
  }

  public void moveStart(int noOfBytesToMove){
    if(this.start + noOfBytesToMove > this.end){
      throw new RuntimeException(
        "Attempt to move buffer 'start' beyond 'end'. start= "
        + this.start + ", end: " + this.end + ", bytesToMove: "
        + noOfBytesToMove);
      }
    this.start += noOfBytesToMove;
  }

  public int availableBytes() {
    return this.end - this.start;
  }

  public boolean hasAvailableBytes(int numberOfBytes) throws IOException {
    if(! hasAvailableBytesInBuffer(numberOfBytes)){
      if(streamHasMoreData()){
        if(!bufferHasSpaceFor(numberOfBytes)){
          compact();
         }
         fillDataFromStreamIntoBuffer();
      }
    }

    return hasAvailableBytesInBuffer(numberOfBytes);
  }

  private void fillDataFromStreamIntoBuffer() throws IOException {
    this.bytesRead  = this.source.read(this.buffer, this.end,
                      this.buffer.length - this.end);
    this.end       += this.bytesRead;
  }

  private void compact() {
    int bytesToCopy = end - start;

      for(int i=0; i<bytesToCopy; i++){
        this.buffer[i] = this.buffer[start + i];
      }

      this.start = 0;
      this.end   = bytesToCopy;
  }

  private boolean bufferHasSpaceFor(int numberOfBytes) {
    return (this.buffer.length - this.start) >= numberOfBytes;
  }

  public boolean streamHasMoreData() {
    return this.bytesRead > -1;
  }

  private boolean hasAvailableBytesInBuffer(int numberOfBytes) {
    return   (this.end - this.start) >= numberOfBytes;
  }

}