关于流(Stream),ChatGPT给出了以下解释:
在计算机编程中,流(Stream)是一种用于读取或写入数据的抽象概念。它代表一系列连续的数据,可以是字节、字符、对象等。流使得程序可以以连续的方式处理数据,而不需要一次性将所有数据加载到内存中。
流可以分为输入流(Input Stream)和输出流(Output Stream)两种类型。输入流用于从外部输入设备(如键盘、文件、网络连接等)读取数据,而输出流用于将数据写入到外部输出设备(如屏幕、文件、网络连接等)中。
在程序中,流的使用可以通过两种方式实现:字符流(Character Stream)和字节流(Byte Stream)。字符流适用于处理文本数据,字节流适用于处理二进制数据。字符流通过字符编码的方式将字符转换为字节,而字节流直接操作字节数据。
流的操作可以包括读取数据、写入数据、关闭流等。流的读取操作可以按照字节、字符和行进行读取,而写入操作可以按照字节、字符和行进行写入。
概念
首先来了解一下流的一些基本概念
可读流
一个可读流是一个数据源,在 JavaScript 中用一个 ReadableStream
对象表示,数据从它的底层源流出,或者来自网络或其他域的某种资源。
常见的两种可读流的类型:音视频流和网络IO流,这两个后面会介绍。对应两种流的来源:Push source和Pull source,顾名思义两种流的流向是浏览器流向服务器和服务器流向浏览器。
流中的分块由一个 reader 读取——该数据处理过程一次只处理一个分块。
一个流一次只能被一个 reader 读取,当一个 reader 被创建并开始读一个流时,它被locked在该流上。如果想让另一个 reader 读这个流,则通常需要先取消第一个 reader,再执行其他操作,或者拷贝(teeing)。
每个 reader 都有一个关联的 controller,用来控制流,reader和controller合称为Consumer。
可读流在浏览器中可以通过fetch的response.body获取,也可以通过构造函数ReadableStream实例化。
fetch 获取
fetch('/example.com/api')
.then(response => response.body)
.then(async readableStream => {
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// process Uint8Array data
console.log(value);
}
})
构造函数创建
new ReadableStream([underlyingSource[, queuingStrategy]])
underlyingSource 是一个描述构造行为和方法的对象:
- start(controller):当对象被构造时立刻调用,一般用于设置流的访问以及其他设置,可以返回Promise,表明成功/失败;
- pull(controller):当流的内部队列不满时,会重复调用这个方法,pull可以返回一个Promise,如果返回Promise直到Promise完成之前pull不再被调用;
- cancel(reason):流将被取消时调用这个方法,一般用于释放流的访问,可以反悔Promise,表明成功/失败;
- type:控制器对象(controller)默认为ReadableDefaultStreamController,如果设置为
bytes
,则控制器类型为ReadableByteStreamController; - autoAllocateChunkSize:自定义字节流(ReadableByteStream)的缓冲区大小,开启后分配指定大小的ArrayBuffer。
queuingStrategy 是一个定义流的队列策略的对象:
- highWaterMark:在应用背压之前可以包含在内部队列中的块的总数;
- size(chunk):自定义函数,将在每次数据块入队时调用,以便计算队列的大小。
class ReadableStreamConfig {
timer: NodeJS.Timeout;
button: HTMLButtonElement;
constructor(button: HTMLButtonElement) {
this.timer = null;
this.button = button;
}
start(controller: ReadableStreamDefaultController) {
this.timer = setInterval(() => {
const str = ReadableStreamConfig.randomString();;
controller.enqueue(str);
}, 1000);
this.button.addEventListener('click', () => {
clearInterval(this.timer);
controller.close();
});
}
pull(controller: ReadableStreamDefaultController) {
console.log('run pull');
}
cancel(reason: any) {
if (this.timer) {
clearInterval(this.timer);
}
}
// 随机字符串
static randomString() {
return Math.random().toString(36).slice(2);
}
}
const button = document.getElementById('button') as HTMLButtonElement;
const readableStream = new ReadableStream(new ReadableStreamConfig(button));
const reader = readableStream.getReader();
(async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
console.log(value);
}
})()
代码中定义了一个类,用于实例化ReadableStream的配置对象;start中设置了一个Interval,每隔一秒向流中塞入一个随机字符串,并给button按钮绑定事件,点击时取消循环;然后通过reader不断的读取流中的内容。
通过ReadableStream构造函数获取实例,实例上有许多属性和方法供使用:
- locked:标识流是否被锁定到reader;
- cancel(reason):取消读取流,可以传入 reason 参数表示取消原因;
- getReader():创建一个reader并锁定流;
- pipeThrough():将当前流管道输出到一个转换流;
- pipeTo():输出到WritableStream,返回一个Promise;
- tee():复制流。
可写流
可写流是数据写入的终点,在 JavaScript 中以一个 WritableStream
对象表示,数据会通过它流向更底层的结点,存储系统或者网络进程。
数据由一个 writer 写入流中,每次只写入一个分块。分块和可读流的 reader 一样可以有多种类型。
同reader一样,writer 被创建并开始向一个流写入数据时也会locked,如果想通过其他writer写入数据,需要终止当前的writer。
每个 writer 都有一个关联的 controller,用来控制流,writer和controller合称为Producer。
可写流在浏览器中的应用比较局限。
构造函数创建
new WritableStream([underlyingSink[, queuingStrategy]])
可写流和可读流的参数基本一致,有些许不同
- pull(controller) -> write(chunk, controller):可读流中的pull操作,在可写流中对应的是write操作,当一个数据块(chunk)准备写入底层时会调用这个方法,可以通过返回Promise表示操作是否成功;
- cancel(reason) -> close(reason):可读流中的cancel对用可写流中的close,完成所有分块的写入时会调用该方法,可以通过返回Promise来表示操作是否成功;
- abort(reason):当出现报错希望中断时调用此方法。
class WritableStreamConfig {
encoder: TextEncoder;
encodedText: Uint8Array;
constructor() {
this.encoder = new TextEncoder();
this.encodedText = new Uint8Array();
}
start(controller: WritableStreamDefaultController) {
console.log('Stream started');
}
// 写入数据的方法
write(chunk, controller: WritableStreamDefaultController) {
console.log('Writing chunk:', chunk);
return new Promise<void>(resolve => {
const et = this.encoder.encode(chunk);
const encodedText = new Uint8Array(this.encodedText.length + et.length)
encodedText.set(this.encodedText);
encodedText.set(et, this.encodedText.length);
this.encodedText = encodedText;
resolve();
})
}
// 关闭流的方法
close() {
console.log('Stream closed');
}
// 处理错误的方法
abort(reason) {
console.error('Stream aborted. Reason:', reason);
}
}
const config = new WritableStreamConfig();
const writableStream = new WritableStream(config);
const writer = writableStream.getWriter();
(async function () {
for (let i = 0; i < 10; i++) {
await writer.write(i);
}
const decoder = new TextDecoder();
console.log(decoder.decode(config.encodedText));
})()
可写流实例具有一下属性和方法:
- locked:标识当前流是否被writer锁定;
- abort():终止流,并丢弃所有入队数据;
- close():关闭流;
- getWriter():获取一个 writer 实例,并锁定到当前流(只能绑定一个)。
上面的代码示例没有实际意义,仅作为API使用演示,目前还没有发现WritableStream的实际应用场景。
拷贝
如果要对一个可读流进行两种不同的操作,需要对该流进行拷贝,因为一个流只能被一个reader读取。
前面说到过,如果已经获取了一个reader,此时流会被locked,此时再获取reader会报错
此时可以取消前一个reader来解除对流的锁定,此时再获取reader就不会报错
const reader = readableStream.getReader();
reader.cancel('over').then(() => {
console.log('canceled');
reader.releaseLock()
const reader2 = readableStream.getReader();
})
另一种方法是对流进行拷贝,同时使用多个reader对流进行不同的操作。
const readableStream = new ReadableStream(new ReadableStreamConfig(button));
const [stream1, stream2] = readableStream.tee()
const reader = stream1.getReader();
const reader2 = stream2.getReader();
通过stream.tee
返回一个数组,数组包含两个流,可以分别对这两个流进行操作。
转换流和管道传输
转换流用于在管道传输中进行流的转换,例如视频编解码、文件格式转换等。
同样的可以通过构造函数来创建自定义转换流new TransformStream([transformer[, writableStrategy[, readableStrategy]]])
writableStrategy和readableStrategy是对可写流和可读流的队列策略定义,同之前可读/写流的一样。
transformer对象定义了转换的行为,如果未提供则生成一个恒等变化流(未发生改变)。
- start(controller):构造函数调用时触发;
- transform(chunk, controller):当一个readable chunk准备完成后调用,没有提供则返回恒等变化流;
- flush(controller):所有块转换成功后调用,关闭readable。
const textDecoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextDecoder().decode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
TransformStream实例上有两个属性writable(WritableStream)和readable(ReadableStream)分别是转换流的可写端和可读端。
链式管道传输的起点称为原始源(original source),终点称为最终接收器(ultimate sink)。
- pipeThrough():连接到转换流,可以在传输过程中对流进行读写操作;
- pipeTo():输出到WritableStream。
const rawStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
const decodeStream = rawStream.pipeThrough(textDecoderStream);
async function read() {
const reader = decodeStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
console.log('[read]', value);
}
}
read()
基于转换流和管道可以组合出很多种应用场景,实战部分会进行介绍。
背压和队列策略
流的一个重要概念是背压——这是单个流或一个链式管道调节读/写速度的过程。当链中后面的一个流仍然忙碌,尚未准备好接受更多的分块时,它会通过链向上游的流发送一个信号,告诉上游的转换流(或原始源)适当地减慢传输速度,这样你就不会在任何地方遇到瓶颈。
内置队列用于跟踪流中一直没有处理和完成的分块(可读流没有被读取/可写流没有被底层接收器处理)。
内置的队列采用一个队列策略,该策略规定如何基于内置队列的状态发出背压信号。
一般来说,该策略会将队列中的分块大小与称作 high water mark 的值进行比较,high water mark 是队列期望管理的最大分块的总和。
执行的计算是:
high water mark - total size of chunks in queue = desired size
所需大小(desired size)是流中仍然可以接收的分块数量,以保持流运行,但是小于 high water mark 的大小。当所需的大小大于 0 时,分块的生成将适当的减慢或者加速,以保持流尽可能快的运行。如果值降到 0(或者小于 0),这意味着分块的产生快于流的处理,这可能产生问题。
实战应用
下面看流的具体应用场景
打字机
2023年比较火的ChatGPT的打字机效果就可以通过SSE+ReadableStream来实现。
服务端SSE比较好实现,通过NodeJS流式返回相应数据是最简单的方法:
import Koa from 'koa';
import { PassThrough } from 'stream';
const app = new Koa();
app.use(async (ctx) => {
if (ctx.path!== '/stream') {
ctx.body = 'hello world';
return;
}
ctx.set({
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Method": "*",
});
const stream = new PassThrough();
ctx.status = 200;
ctx.body = stream;
stream.write("hello world ");
for (let i = 0; i < 100; i++) {
setTimeout(() => {
stream.write(`${i}, `);
if(i === 99)
stream.end();
}, i * 100)
}
});
app.listen(3000, () => {
console.log('server is running at port 3000');
});
这里使用Koa返回一个流,使用定时器进行延时输出。
之前说过fetch的Response.body天然就是一个ReadableStream实例,我们就可以通过reader来获取服务端流式相应的数据,然后进行实时的解析渲染。
const box = document.querySelector('.box') as HTMLDivElement;
fetch('http://localhost:3000/stream')
.then(res => res.body.pipeThrough(new TextDecoderStream()))
.then(async readableStream => {
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
box.innerText += value;
}
})
TextDecoderStream和TextEncoderStream是浏览器内置的TextDecoder.decode 和TextEncoder.encode 的转换流,对Uint8Array和string进行正反向转换。
PS:fetch有响应流,自然也会有请求流,⚠️请求流需要设置半双工
function wait(milliseconds) {
return new Promise(resolve => setTimeout(resolve, milliseconds));
}
const stream = new ReadableStream({
async start(controller) {
await wait(1000);
controller.enqueue('This ');
await wait(1000);
controller.enqueue('is ');
await wait(1000);
controller.enqueue('a ');
await wait(1000);
controller.enqueue('slow ');
await wait(1000);
controller.enqueue('request.');
controller.close();
},
}).pipeThrough(new TextEncoderStream());
fetch(url, {
method: 'POST',
headers: {'Content-Type': 'text/plain'},
body: stream,
duplex: 'half',
});
另外还有一些其他的“副作用”:
- 只能CORS
- 不支持http/1.x
- 303以外的重定向失效
压缩/解压缩
浏览器提供了CompressionStream
和DecompressionStream
转换流来压缩和解压gzip文件;
File System API 也提供了流式写入文件的的底层可写流,通过showSaveFilePicker
获取句柄对象,句柄对象通过createWritable
方法可以获取FileSystemWritableFileStream
(WritableStream的子类)。(File System API后面应该后单独写一篇记录下
通过fetch请求获取gzip压缩过的文件,通过解压缩转换流进行解压,将解压后的文件流写入文件系统。
const download = document.getElementById('download') as HTMLButtonElement;
download.onclick = () => fetch('./test.mp4.gz')
.then(res => res.body)
.then(async fileStream => {
// @ts-ignore
const fileHandle = await showSaveFilePicker({
suggestedName: 'test.mp4',
startIn: 'downloads',
types: [
{
description: 'Video Files',
accept: {
'video/mp4': ['.mp4'],
},
},
],
})
const writableStream = await fileHandle.createWritable();
fileStream
.pipeThrough(new DecompressionStream('gzip'))
.pipeTo(writableStream);
})
另外支持流的Web API还有:
串口(serial):用于通过串口和设备进行通信;
const port = await navigator.serial.requestPort(); await port.open({ baudRate: 9_600 }); const reader = port.readable.getReader(); while (true) { const { value, done } = await reader.read(); if (done) { reader.releaseLock(); break; } console.log(value); } const writer = port.writable.getWriter(); const data = new Uint8Array([104, 101, 108, 108, 111]); // hello await writer.write(data); writer.releaseLock();
流式websocket(websocketStream):利用流的背压减轻CPU密集型程序的负载。
const controller = new AbortController(); const wss = new WebSocketStream(WSS_URL, { protocols: ['chat', 'chatv2'], signal: controller.signal, // 用于断开websocket连接 }); const {readable, writable} = await wss.opened; const reader = readable.getReader(); const writer = writable.getWriter(); while (true) { const {value, done} = await reader.read(); if (done) { break; } const result = await process(value); await writer.write(result); }