关于流(Stream),ChatGPT给出了以下解释:

在计算机编程中,流(Stream)是一种用于读取或写入数据的抽象概念。它代表一系列连续的数据,可以是字节、字符、对象等。流使得程序可以以连续的方式处理数据,而不需要一次性将所有数据加载到内存中。

流可以分为输入流(Input Stream)和输出流(Output Stream)两种类型。输入流用于从外部输入设备(如键盘、文件、网络连接等)读取数据,而输出流用于将数据写入到外部输出设备(如屏幕、文件、网络连接等)中。

在程序中,流的使用可以通过两种方式实现:字符流(Character Stream)和字节流(Byte Stream)。字符流适用于处理文本数据,字节流适用于处理二进制数据。字符流通过字符编码的方式将字符转换为字节,而字节流直接操作字节数据。

流的操作可以包括读取数据、写入数据、关闭流等。流的读取操作可以按照字节、字符和行进行读取,而写入操作可以按照字节、字符和行进行写入。

概念

首先来了解一下流的一些基本概念

可读流

一个可读流是一个数据源,在 JavaScript 中用一个 ReadableStream 对象表示,数据从它的底层源流出,或者来自网络或其他域的某种资源。

常见的两种可读流的类型:音视频流和网络IO流,这两个后面会介绍。对应两种流的来源:Push sourcePull source,顾名思义两种流的流向是浏览器流向服务器和服务器流向浏览器。

流中的分块由一个 reader 读取——该数据处理过程一次只处理一个分块。

一个流一次只能被一个 reader 读取,当一个 reader 被创建并开始读一个流时,它被locked在该流上。如果想让另一个 reader 读这个流,则通常需要先取消第一个 reader,再执行其他操作,或者拷贝(teeing)。

每个 reader 都有一个关联的 controller,用来控制流,reader和controller合称为Consumer。

image-20231207200309591

可读流在浏览器中可以通过fetch的response.body获取,也可以通过构造函数ReadableStream实例化。

fetch 获取

fetch('/example.com/api')
  .then(response => response.body)
  .then(async readableStream => {
    const reader = readableStream.getReader();
    while (true) {
      const { done, value } = await reader.read();
      if (done) {
        break;
      }
      // process Uint8Array data
      console.log(value);
    }
  })

构造函数创建

new ReadableStream([underlyingSource[, queuingStrategy]])

underlyingSource 是一个描述构造行为和方法的对象:

  • start(controller):当对象被构造时立刻调用,一般用于设置流的访问以及其他设置,可以返回Promise,表明成功/失败;
  • pull(controller):当流的内部队列不满时,会重复调用这个方法,pull可以返回一个Promise,如果返回Promise直到Promise完成之前pull不再被调用;
  • cancel(reason):流将被取消时调用这个方法,一般用于释放流的访问,可以反悔Promise,表明成功/失败;
  • type:控制器对象(controller)默认为ReadableDefaultStreamController,如果设置为bytes,则控制器类型为ReadableByteStreamController;
  • autoAllocateChunkSize:自定义字节流(ReadableByteStream)的缓冲区大小,开启后分配指定大小的ArrayBuffer。

queuingStrategy 是一个定义流的队列策略的对象:

  • highWaterMark:在应用背压之前可以包含在内部队列中的块的总数;
  • size(chunk):自定义函数,将在每次数据块入队时调用,以便计算队列的大小。
class ReadableStreamConfig {
  timer: NodeJS.Timeout;
  button: HTMLButtonElement;
  constructor(button: HTMLButtonElement) {
    this.timer = null;
    this.button = button;
  }

  start(controller: ReadableStreamDefaultController) {
    this.timer = setInterval(() => {
      const str = ReadableStreamConfig.randomString();;
      controller.enqueue(str);
    }, 1000);

    this.button.addEventListener('click', () => {
      clearInterval(this.timer);
      controller.close();
    });

  }

  pull(controller: ReadableStreamDefaultController) {
    console.log('run pull');
  }

  cancel(reason: any) {
    if (this.timer) {
      clearInterval(this.timer);
    }
  }

  // 随机字符串
  static randomString() {
    return Math.random().toString(36).slice(2);
  }
}

const button = document.getElementById('button') as HTMLButtonElement;
const readableStream = new ReadableStream(new ReadableStreamConfig(button));

const reader = readableStream.getReader();
(async () => {
  while (true) {
    const { done, value } = await reader.read();
    if (done) {
      break;
    }
    console.log(value);
  }
})()

代码中定义了一个类,用于实例化ReadableStream的配置对象;start中设置了一个Interval,每隔一秒向流中塞入一个随机字符串,并给button按钮绑定事件,点击时取消循环;然后通过reader不断的读取流中的内容。

image-20231208232504650

通过ReadableStream构造函数获取实例,实例上有许多属性和方法供使用:

  • locked:标识流是否被锁定到reader;
  • cancel(reason):取消读取流,可以传入 reason 参数表示取消原因;
  • getReader():创建一个reader并锁定流;
  • pipeThrough():将当前流管道输出到一个转换流;
  • pipeTo():输出到WritableStream,返回一个Promise;
  • tee():复制流。

可写流

可写流是数据写入的终点,在 JavaScript 中以一个 WritableStream 对象表示,数据会通过它流向更底层的结点,存储系统或者网络进程。

数据由一个 writer 写入流中,每次只写入一个分块。分块和可读流的 reader 一样可以有多种类型。

同reader一样,writer 被创建并开始向一个流写入数据时也会locked,如果想通过其他writer写入数据,需要终止当前的writer。

每个 writer 都有一个关联的 controller,用来控制流,writer和controller合称为Producer。

image-20231207200326608

可写流在浏览器中的应用比较局限。

构造函数创建

new WritableStream([underlyingSink[, queuingStrategy]])

可写流和可读流的参数基本一致,有些许不同

  • pull(controller) -> write(chunk, controller):可读流中的pull操作,在可写流中对应的是write操作,当一个数据块(chunk)准备写入底层时会调用这个方法,可以通过返回Promise表示操作是否成功;
  • cancel(reason) -> close(reason):可读流中的cancel对用可写流中的close,完成所有分块的写入时会调用该方法,可以通过返回Promise来表示操作是否成功;
  • abort(reason):当出现报错希望中断时调用此方法。
class WritableStreamConfig {
  encoder: TextEncoder;
  encodedText: Uint8Array;

  constructor() {
    this.encoder = new TextEncoder();
    this.encodedText = new Uint8Array();
  }

  start(controller: WritableStreamDefaultController) {
    console.log('Stream started');
  }

  // 写入数据的方法
  write(chunk, controller: WritableStreamDefaultController) {
    console.log('Writing chunk:', chunk);
    return new Promise<void>(resolve => {
      const et = this.encoder.encode(chunk);
      const encodedText = new Uint8Array(this.encodedText.length + et.length)
      encodedText.set(this.encodedText);
      encodedText.set(et, this.encodedText.length);
      this.encodedText = encodedText;
      resolve();
    })
  }

  // 关闭流的方法
  close() {
    console.log('Stream closed');
  }

  // 处理错误的方法
  abort(reason) {
    console.error('Stream aborted. Reason:', reason);
  }
}

const config = new WritableStreamConfig();
const writableStream = new WritableStream(config);
const writer = writableStream.getWriter();

(async function () {
  for (let i = 0; i < 10; i++) {
    await writer.write(i);
  }
  const decoder = new TextDecoder();
  console.log(decoder.decode(config.encodedText));
})()

image-20231210215754708

可写流实例具有一下属性和方法:

  • locked:标识当前流是否被writer锁定;
  • abort():终止流,并丢弃所有入队数据;
  • close():关闭流;
  • getWriter():获取一个 writer 实例,并锁定到当前流(只能绑定一个)。

上面的代码示例没有实际意义,仅作为API使用演示,目前还没有发现WritableStream的实际应用场景。

拷贝

如果要对一个可读流进行两种不同的操作,需要对该流进行拷贝,因为一个流只能被一个reader读取。

前面说到过,如果已经获取了一个reader,此时流会被locked,此时再获取reader会报错

image-20231210171814154

此时可以取消前一个reader来解除对流的锁定,此时再获取reader就不会报错

const reader = readableStream.getReader();
reader.cancel('over').then(() => {
  console.log('canceled');
  reader.releaseLock()
  const reader2 = readableStream.getReader();
})

另一种方法是对流进行拷贝,同时使用多个reader对流进行不同的操作。

const readableStream = new ReadableStream(new ReadableStreamConfig(button));
const [stream1, stream2] = readableStream.tee()
const reader = stream1.getReader();
const reader2 = stream2.getReader();

通过stream.tee返回一个数组,数组包含两个流,可以分别对这两个流进行操作。

转换流和管道传输

转换流用于在管道传输中进行流的转换,例如视频编解码、文件格式转换等。

同样的可以通过构造函数来创建自定义转换流new TransformStream([transformer[, writableStrategy[, readableStrategy]]])

writableStrategy和readableStrategy是对可写流和可读流的队列策略定义,同之前可读/写流的一样。

transformer对象定义了转换的行为,如果未提供则生成一个恒等变化流(未发生改变)。

  • start(controller):构造函数调用时触发;
  • transform(chunk, controller):当一个readable chunk准备完成后调用,没有提供则返回恒等变化流;
  • flush(controller):所有块转换成功后调用,关闭readable。
const textDecoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextDecoder().decode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

TransformStream实例上有两个属性writable(WritableStream)和readable(ReadableStream)分别是转换流的可写端和可读端。

链式管道传输的起点称为原始源(original source),终点称为最终接收器(ultimate sink)。

  • pipeThrough():连接到转换流,可以在传输过程中对流进行读写操作;
  • pipeTo():输出到WritableStream。
const rawStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
const decodeStream = rawStream.pipeThrough(textDecoderStream);

async function read() {
  const reader = decodeStream.getReader();
  while (true) {
    const { done, value } = await reader.read();
    if (done) {
      break;
    }
    console.log('[read]', value);
  }
}

read()

image-20231211200752082

基于转换流和管道可以组合出很多种应用场景,实战部分会进行介绍。

背压和队列策略

流的一个重要概念是背压——这是单个流或一个链式管道调节读/写速度的过程。当链中后面的一个流仍然忙碌,尚未准备好接受更多的分块时,它会通过链向上游的流发送一个信号,告诉上游的转换流(或原始源)适当地减慢传输速度,这样你就不会在任何地方遇到瓶颈。

内置队列用于跟踪流中一直没有处理和完成的分块(可读流没有被读取/可写流没有被底层接收器处理)。

内置的队列采用一个队列策略,该策略规定如何基于内置队列的状态发出背压信号。

一般来说,该策略会将队列中的分块大小与称作 high water mark 的值进行比较,high water mark 是队列期望管理的最大分块的总和。

执行的计算是:

high water mark - total size of chunks in queue = desired size

所需大小(desired size)是流中仍然可以接收的分块数量,以保持流运行,但是小于 high water mark 的大小。当所需的大小大于 0 时,分块的生成将适当的减慢或者加速,以保持流尽可能快的运行。如果值降到 0(或者小于 0),这意味着分块的产生快于流的处理,这可能产生问题。

实战应用

下面看流的具体应用场景

打字机

2023年比较火的ChatGPT的打字机效果就可以通过SSE+ReadableStream来实现。

服务端SSE比较好实现,通过NodeJS流式返回相应数据是最简单的方法:

import Koa from 'koa';
import { PassThrough } from 'stream';

const app = new Koa();

app.use(async (ctx) => {
  if (ctx.path!== '/stream') {
    ctx.body = 'hello world';
    return;
  }

  ctx.set({
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    "Connection": "keep-alive",
    "Access-Control-Allow-Origin": "*",
    "Access-Control-Allow-Method": "*",
  });

  const stream = new PassThrough();
  ctx.status = 200;
  ctx.body = stream;

  stream.write("hello world ");

  for (let i = 0; i < 100; i++) {
    setTimeout(() => {
      stream.write(`${i}, `);
      if(i === 99)
        stream.end();
    }, i * 100)
  }
});

app.listen(3000, () => {
  console.log('server is running at port 3000');
});

这里使用Koa返回一个流,使用定时器进行延时输出。

之前说过fetch的Response.body天然就是一个ReadableStream实例,我们就可以通过reader来获取服务端流式相应的数据,然后进行实时的解析渲染。

const box = document.querySelector('.box') as HTMLDivElement;

fetch('http://localhost:3000/stream')
  .then(res => res.body.pipeThrough(new TextDecoderStream()))
  .then(async readableStream => {
    const reader = readableStream.getReader();
    while (true) {
      const { done, value } = await reader.read();
      if (done) {
        break;
      }
      box.innerText += value;
    }
  })

stream

TextDecoderStream和TextEncoderStream是浏览器内置的TextDecoder.decode 和TextEncoder.encode 的转换流,对Uint8Array和string进行正反向转换。

PS:fetch有响应流,自然也会有请求流,⚠️请求流需要设置半双工

function wait(milliseconds) {
  return new Promise(resolve => setTimeout(resolve, milliseconds));
}

const stream = new ReadableStream({
  async start(controller) {
    await wait(1000);
    controller.enqueue('This ');
    await wait(1000);
    controller.enqueue('is ');
    await wait(1000);
    controller.enqueue('a ');
    await wait(1000);
    controller.enqueue('slow ');
    await wait(1000);
    controller.enqueue('request.');
    controller.close();
  },
}).pipeThrough(new TextEncoderStream());

fetch(url, {
  method: 'POST',
  headers: {'Content-Type': 'text/plain'},
  body: stream,
  duplex: 'half',
});

另外还有一些其他的“副作用”:

  • 只能CORS
  • 不支持http/1.x
  • 303以外的重定向失效

压缩/解压缩

浏览器提供了CompressionStreamDecompressionStream转换流来压缩和解压gzip文件;

File System API 也提供了流式写入文件的的底层可写流,通过showSaveFilePicker获取句柄对象,句柄对象通过createWritable方法可以获取FileSystemWritableFileStream (WritableStream的子类)。(File System API后面应该后单独写一篇记录下

通过fetch请求获取gzip压缩过的文件,通过解压缩转换流进行解压,将解压后的文件流写入文件系统。

const download = document.getElementById('download') as HTMLButtonElement;

download.onclick = () => fetch('./test.mp4.gz')
  .then(res => res.body)
  .then(async fileStream => {
    // @ts-ignore
    const fileHandle = await showSaveFilePicker({
      suggestedName: 'test.mp4',
      startIn: 'downloads',
      types: [
        {
          description: 'Video Files',
          accept: {
            'video/mp4': ['.mp4'],
          },
        },
      ],
    })
    const writableStream = await fileHandle.createWritable();
    fileStream
      .pipeThrough(new DecompressionStream('gzip'))
      .pipeTo(writableStream);
  })

image-20231211221545296

另外支持流的Web API还有:

  • 串口(serial):用于通过串口和设备进行通信;

    const port = await navigator.serial.requestPort();
    await port.open({ baudRate: 9_600 });
    const reader = port.readable.getReader();
    
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        reader.releaseLock();
        break;
      }
      console.log(value);
    }
    
    const writer = port.writable.getWriter();
    const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
    await writer.write(data);
    writer.releaseLock();
    
  • 流式websocket(websocketStream):利用流的背压减轻CPU密集型程序的负载。

    const controller = new AbortController();
    const wss = new WebSocketStream(WSS_URL, {
      protocols: ['chat', 'chatv2'],
      signal: controller.signal, // 用于断开websocket连接
    });
    const {readable, writable} = await wss.opened;
    const reader = readable.getReader();
    const writer = writable.getWriter();
    
    while (true) {
      const {value, done} = await reader.read();
      if (done) {
        break;
      }
      const result = await process(value);
      await writer.write(result);
    }
    

前端小白