查看原文
其他

Streams 权威指南

王龙 大前端技术之路 2022-06-29

(给大前端技术之路加星标,提升前端技能

Streams API 让你可以用 JavaScript 以编程的方式处理网络接收的,或通过任何本地方式创建的数据流。流式处理包括将要接收、发送或转换的资源分解为小块(chunk),然后逐块处理这些小块。

尽管浏览器在在接收 HTML 或视频等资源时就在做流处理了,但在 2015 年推出 Streams API 之前,JavaScript 无法使用这些功能.

从技术上讲,XMLHttpRequest是可以实现流媒体的,但 实在不好用[1].

以前,如果要处理某种资源(视频或文本文件等),必须下载整个文件,等待将其反序列化为合适的格式,然后进行处理。随着JavaScript可以使用流,这一切都发生了变化。现在,原始数据一旦可用,就可以使用JavaScript进行逐步处理,而不需要生成buffer、string或blob。这将解锁许多用例,比如:

  • 视频特效: 用管道(pipe)把一个可读的视频流连接到一个可以添加实时特效的转换流
  • 数据解压/加压缩: 用管道(pipe)把一个文件流连接到一个可以有选择地(解)压缩的转换流。
  • 图片解码: 用管道把一个HTTP响应流连接到一个可以将字节解码为位图数据的转换流,然后在连接到一个可以将位图转换为PNG转换流。如果是安装在service worker的“fetch”中,这允许你透明地填充新的图像格式,如AVIF。

核心概念

在详细介绍各种类型的流之前,让我先介绍一些核心概念。

Chunks

Chunks 是写入流或从流中读取的单个数据。它可以是任何类型;流甚至可以包含很多不同类型的Chunk。大多数情况下,对一个流,chunk不是最原子的数据单元。例如,字节流可能包含由16 KiBUint8Array单元组成的块,而不是单个字节。

可读流

可读流表示一个可以从中读取的数据源。换句话说,数据来自可读流。具体地说,可读流是ReadableStream类的实例。

转换流

转换流由一对流组成:可写流(称为其可写端)和可读流(称为其可读端)。一个真实的比喻是,一个即时从一种语言翻译到另一种语言的同声传译员。对于转换流来说就是,向可写侧写入导致新数据可用于从可读侧读取。具体地说,任何具有可写属性和可读属性的对象都可以用作转换流。然而,标准的TransformStream类使得创建这样一对象变得更容易。

管道链(Pipe chains)

流主要通过管道相互连接来使用。可读流可以使用其pipeTo()方法直接通过管道传输到可写流,也可以使用可读流的pipeThrough()方法通过一个或多个转换流进行管道传输。以这种方式连接在一起的一组流称为管道链。

背压(Backpressure)

一旦一个管道链被构建,它将传播关于Chunks应该以多快的速度流过的信号。如果链中的任何一步还不能接收块,它就会通过管道链向后传播一个信号,直到最终原始源被告知停止这么快地生成chunks。这种normalizing flow的过程称为背压。

Teeing

可读流可以使用tee()方法进行tee操作(以大写“T”的形状命名,一个入口两个出口)。这将锁定流,使其不再直接可用;但是,它将创建两个新流,称为分支,可以独立使用。Teeing也很重要,因为流不能倒带或重新启动,稍后将详细介绍。

可读流的机制

一个可读流是一个数据源,在JavaScript中由一个从底层源流出的`ReadableStream`[2]对象表示。ReadableStream构造函数从给定的handlers中创建并返回一个可读的流对象。有两种类型的底层源:

  • 推送(push)源当你访问它是,会不断的向你推送数据,你可以开始、暂停或取消对流的访问。示例包括实时视频流、server-sent events或WebSockets。
  • 拉取(pull)源 要求你在连接到它们时显式地请求数据。例子包括通过fetch()XMLHttpRequest调用的HTTP操作。

流数据以称为chunk的小块顺序读取。放置在流中的块称为进入队列。这意味着它们正在队列中等待被读取。内部队列会跟踪尚未读取的数据块。

队列策略是一个能流内部队列的状态,决定流应该如何发出Backpressure信号的对象。排队策略为每个chunk分配一个大小,并将队列中所有chunk的总大小与一个指定的数字(称为高水位标记high water mark)进行比较。

流中的块由reader读取。这个reader一次检索一个chunk,允许你对数据进行各种类型的操作。reader加上与之相伴的处理逻辑即被称为消费者consumer)。

还有一个构件叫做控制器controller)。每个可读流都有一个关联的控制器,顾名思义,它允许你控制流。

一次只能有一个reader读取流;当一个reader被创建并开始读取一个流(也就是说,成为一个活动的reader)时,它就被锁定在这个流上。如果你想让另一个reader接管你的流,你通常需要先释放第一个reader(尽管你可以tee流)。

创建一个可读流

你可以通过调用 `ReadableStream()`[3]构造函数创建一个可读流。这个构造函数有一个可选参数 underlyingSource, 它表示一个流实例将如何表现的方法和属性。

The underlyingSource

可以使用以下的可选方法,由开发人员自定义:

  • start(controller): 在对象被构造的时立刻被调用。该方法可以访问流的源,并执行设置流功能所需的任何其他操作。如果这个过程是异步完成的,该方法可以返回一个promise来表示成功或失败. controller 参数是一个`ReadableStreamDefaultController`[4].
  • pull(controller): 可以用于在获取chunks时控制流。只要流的内部chunks队列没有满,它就会被反复调用,直到队列达到最高水位。如果调用pull()的结果是一个promise,那么pull()将不会被再次调用,直到该promise fulfills。如果promise被reject,流就会出错。
  • cancel(reason): 当流的消费者取消流时被调用.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支持下面的方法:

  • `ReadableStreamDefaultController.close()`[5] 关闭相关的流。
  • `ReadableStreamDefaultController.enqueue()`[6] 在关联的流中入队一个给定的块
  • `ReadableStreamDefaultController.error()`[7]让之后与相关流的任何交互出错
/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

The queuingStrategy

queuingStrategy`ReadableStream()`[8]构造函数的第二个参数,也是可以选的。它是一个对象,可选地定义流的队列策略,它有两个属性:

  • highWaterMark: 一个非负数,表示使用此队列策略的流的高水位标记。
  • size(chunk): 计算并返回给定chunk大小的函数。结果用于确定backpressure,通过ReadableStreamDefaultController.desiredSize属性显示。它还控制何时调用underlying source的pull()方法。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark10,
    size(chunk) {
      return chunk.length;
    },
  },
);

你可以自定义 queuingStrategy, 或者使用 `ByteLengthQueuingStrategy`[9]`CountQueuingStrategy`[10] 的实例. 如果没有 queuingStrategy 提供,默认使用 highWaterMark1CountQueuingStrategy.

getReader()getReader()方法

要从可读流中读取,你需要一个reader,它将是一个reader`ReadableStreamDefaultReader`[11]ReadableStream接口的getReader()方法方法创建一个reader并将流锁定到它。当流被锁定时,在释放之前不能获取其他reader。

ReadableStreamDefaultReader`read()`[12]方法返回一个提供访问内部队列中下一个chunk的promise。它会根据stream的状态 fulfill或者reject。会有以下几种可能:

  • chunk 可有,promise 会成功返回一个对象:{ value: chunk, done: false }.
  • 如果流已经关闭, promise 会成功返回一个对象:{ value: undefined, done: true }.
  • 如果流出错了, promise 会以一个相关的error被reject。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 属性

你可以访问`ReadableStream.locked`[13] 属性来检查可读流是否被锁了。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可读流代码示例

下面的代码示例展示了所有的实际操作步骤。首先创建一个ReadableStream,其underlyingSource参数(即TimestampSource类)中定义了一个start()方法。这个方法通知controller在十秒内每秒钟enqueue()一个时间戳(到队列中)。最后,通知控制器关闭流。你可以用getReader()创建一个reader,然后一直调用read()直到流已经done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

异步迭代

在一个循环迭代中检测每个read()done并不太方便。幸运的是讲有一个更好的方法做这:异步迭代

for await (const chunk of stream) {
  console.log(chunk);
}

使用异步迭代的一种变通方法是使用helper函数实现特定功能。下面的代码就能让你使用这个特性:

function streamAsyncIterator(stream) {
  // Get a lock on the stream:
  const reader = stream.getReader();

  return {
    next() {
      // Stream reads already resolve with {done, value}, so
      // we can just call read:
      return reader.read();
    },
    return() {
      // Release the lock if the iterator terminates.
      reader.releaseLock();
      return {};
    },
    // for-await calls this on whatever it's passed, so
    // iterators tend to return themselves.
    [Symbol.asyncIterator]( "Symbol.asyncIterator") {
      return this;
    },
  };
}

async function example() {
  const response = await fetch(url);
  for await (const chunk of streamAsyncIterator(response.body)) {
    console.log(chunk);
  }
}

Tee一个可读流

ReadableStream接口的`tee()`[14] 方法可以对当前流进行tee操作,返回一个长度为2的数组,其中表示新分支的两个ReadableStream实例。这允许两个reader同时读取一个流。比如,在service worker中可以这么用,你想从服务器获取响应并将其流发送到浏览器,但也可以将其流发送到service worker缓存。由于response 的body不能被重复消费,因此需要两个副本来完成此操作。要取消流,你得要取消两个产生的分支。Tee操作在此期间会产生一个锁,防止其他reader锁定它。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]'await readerA.read()); //=> {value: "a", done: false}
console.log('[A]'await readerA.read()); //=> {value: "b", done: false}
console.log('[A]'await readerA.read()); //=> {value: "c", done: false}
console.log('[A]'await readerA.read()); //=> {value: "d", done: false}
console.log('[A]'await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

创建一个可读字节流

ReadableStream()构造函数中传递一个type参数既可以创建一个字节流

new ReadableStream({ type'bytes' });

The underlyingSource

可读字节流的底层源被給予一个ReadableByteStreamController用来操作。ReadableByteStreamController.enqueue()方法接受一个ArrayBufferView类的chunk作为参数。ReadableByteStreamController.byobRequest返回当前的BYOB("bring your own buffer")  pull 请求,如果没有则为null。最后,ReadableByteStreamController.desiredSize属性返回填充stream内部队列的期望大小。

The queuingStrategy

ReadableStream()构造函数的第二个同样是可选的参数是queuingStrategy。它是一个对象,可选地定义流的排队策略,它有一个参数:

  • highWaterMark:个非负的字节数,表示使用此排队策略的流的高水位标记。这用于判定backpressure,通过ReadableByteStreamController.desiredSize 属性显示。它还控制何时调用底层源的pull()方法

与其他流类型的排队策略不同,可读字节流的排队策略没有size(chunk)函数。每个块的大小总是由其byteLength属性决定的。

如果没有提供queuingStrategy,则默认使用highWaterMark为0的策略。

getReader() and read()

你可以通过相应地设置mode参数来访问ReadableStreamBYOBReader: ReadableStream.getReader({mode: "byob"})。这允许对缓冲区分配进行更精确的控制,以避免复制。要从字节流中读取,你需要调用ReadableStreamBYOBReader.read(view),其中view是一个ArrayBufferView

可读字节流示例

const reader = readableStream.getReader({ mode"byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

下面的函数返回可读的字节流,它允许对随机生成的数组进行有效的零拷贝读取。它没有使用预先确定的1024 chunk 大小,而是尝试填充开发人员提供的buffer,从而允许完全的控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可写流的机制

可写流是一个可以写入数据的目的地,在JavaScript中由一个`WritableStream`[15] 对象表示。其作为一个写入原始数据的底层IO sink的抽象。

数据通过writer一次一个chunk写入流。chunk可以有多种形式,就像reader中的一样。你可以使用任何逻辑来生成准备编写的chunk。writer加上相关逻辑称为生产者(producer)。

当一个writer被创建并开始写入流(即一个活动的writer)时,它就被锁定在流上。一次只能有一个writer向可写流写入数据。如果你希望另一个writer开始写入流,通常需要先释放当前的writer。

一个内部队列跟踪已写入流但底层接收器尚未处理的chunk。

最后一个组件是控制器(controller),每个可写流都有一个关联的控制器,允许你控制流(例如,中止流)。

创建一个可写流

Streams API的 WritableStream 接口提供了将流数据写入目标(称为sink)的标准抽象。这个对象中内置背压与队列。你可以通过调用 WritableStream() 来创建一个写流。它有一个可选的underlyingSink参数,一个定义流实例将如何表现的对象。

The underlyingSink

underlyingSink包括下面几个可选的、用户自定义的方法。`WritableStreamDefaultController`[16]类型的controller会被作为参数传入部分方法中。

  • start(controller): 在对象被构造时立即调用此方法。该方法的内容应该以访问底层sink为目标。如果这个过程是异步完成的,它可以返回一个promise来表示成功或失败。
  • write(chunk, controller): 这个方法会在有新的chunk可被写入底层sink时被调用。它可以返回一个promise来表示写操作的成功或失败。此方法只在先前的写操作成功后才被调用,并且不会在流关闭或中止后调用。
  • close(controller): 这个方法会在已经完成了流的写入后被调用。这个方法应该完成对底层sink写操作收尾工作,并释放对它的访问。如果这个进程是异步的,它可以返回一个promise来表示成功或失败。这个方法只有在队列中所有写操作都成功之后才会被调用。
  • abort(reason): 应用程序希望突然关闭流并将其置于出错状态时,该方法会被调用。它可以清理任何持有的资源,这很像close(),但是队列中还有写操作排队,abort()也会被调用。这些队列中的chunks就会被扔掉。如果这个进程是异步的,它可以返回一个promise来表示成功或失败。reason参数包含一个DOMString,描述为什么流被中止。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API的 WritableStreamDefaultController 接口表示一个在设置期间、chunk被提交写入期间,或在写入结束期间控制WritableStream的状态的控制器。在构造WritableStream时,会给底层sink一个相应的WritableStreamDefaultController实例以进行操作。WritableStreamDefaultController只有一个方法 WritableStreamDefaultController.error() 会让以后的流操作出错:

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

The queuingStrategy

WritableStream()构造函数第二个参数同样也是可选的。它是一个对象,可选地定义流的排队策略,它有两个参数:

  • highWaterMark:一个非负数,表示使用此排队策略的流的高水位标记。
  • size(chunk): 一个计算并返回一个给定chunk的大小。这个结果会被用来判断背压(backpressure),通过的WritableStreamDefaultWriter.desiredSize属性暴露。

你可以自定义一个queuingStrategy, 或者使用 `ByteLengthQueuingStrategy`[17]`CountQueuingStrategy`[18] 的实例. 如果没有 queuingStrategy 提供,默认使用 highWaterMark1CountQueuingStrategy

getWriter()write() 方法

要写入可写流,你需要一个writer,它将是一个WritableStreamDefaultWriterWritableStream接口的getWriter()方法返回一个WritableStreamDefaultWriter的新实例,并锁定该实例的流。当流被锁定时,在释放当前的writer之前,不能获取其他writer。

`WritableStreamDefaultWriter`[19] 接口的`write()`[20]方法将传递的数据块写入WritableStream及其底层sink,然后返回一个promise,该promise将解析以指示写操作的成功或失败。请注意,“成功”的含义取决于底层sink,它可能只表示chunk已被接受,而不一定是chunk已安全保存到最终目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 属性

你可以通过访问可写流的`WritableStream.locked`[21]来检查可写流是否被锁定。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可写流代码示例

这个代码暂时了所有的实际操作。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]'Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Pipe可读流到可写流

通过`pipeTo()`[22]可以将可读流pipe到可写流。ReadableStream.pipeTo()方法把当前的``ReadableStreampipe到一个指定的WritableStream`并且返回一个promise来表明成功或者失败。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

创建一个转换流

Streams API的TransformStream接口表示一组可转换的数据。通过调用转换流的构造函数TransformStream()来创建转换流,该构造函数从给定的handler中创建并返回转换流对象。TransformStream()接受一个可选的transformer对象。该对象可以包含以下任何一种方法:

The transformer

  • start(controller): 在构造对象时立即调用此方法。通常使用controller.enqueue()来入队一些前缀chunk。这些块将从读端读取,但不依赖于对写端的任何写入。如果这个初始过程是异步的,例如,因为需要花费一些步骤来获取前缀chunk,函数可以返回一个promise来表示成功或失败。任何抛出的异常都将由TransformStream()构造函数重新抛出。
  • transform(chunk, controller):当新的chunk准备转换时,该方法会被调用。流的实现保证这个函数只会在前面的转换成功之后被调用,并且不会在start()完成之前或flush()被调用之后被调用。这个函数执行转换流的实际转换工作。它可以使用controller.enqueue()把结果进行入队。这就允许了从写端单个chunk可能会在读端参数0个或者多个chunk,取决于你掉了``controller.enqueue()多少次。如果转换的过程是异步的,这个函数可以返回一个promise来表示转换的成功或失败。被reject的promise会让转换流的可读和可写端都出错。如果没有提供transform()`方法,则使用identity转换,chunk不变的从可写端队列到可读端。
  • flush(controller):这个方法会在所有的chunk都成功的通过transform()方法处理后,写端也准备关闭时被调用。通常,这被用于在读端关闭之前将队列后缀chunks插入到可读端。如果flush过程是异步的,函数可以返回一个promise来表示成功或失败;其结果会被通知到stream.writable.write()的调用者。被reject的promise会让转换流的可读和可写端都出错。抛出异常被视为同等于返回被拒绝的promise。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy

TransformStream()构造函数的第二个和第三个参数也都是可选的,分别是writableStrategyreadableStrategy。这2个定于已经在可读流与可写流的章节中描述过了。

转换流代码示例

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Pipe可读流到转换流

ReadableStream接口的 `pipeThrough()`[23] 方法提供了一种链式的pipe可读流到转换流的功能。pipe操作通常会在管道运行期间锁定它,以防止其他reader锁定流。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

下一个代码示例(有点人为设计的)展示了如何实现fetch()的“shouting”版本,通过将返回的响应promise作为一个流[24] ,一个chunk一个chunk地将所有字母变成大写。这种方法的优点是,你不需要等待整个文档被下载,这在处理大文件时可能会产生巨大的差异。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

浏览器的支持和polyfill

浏览器对Streams API的支持各不相同。请确保查看 Can I use[25] 的详细兼容性数据。注意,有些浏览器只实现了某些特性的部分实现,所以一定要彻底检查清楚。

好消息是,有一个可用的 参考实现[26]和一个针对生产环境的polyfill[27]

Demo

下面的demo展示了可读、可写和转换流的实际应用。它还包括了pipeThrough()pipeTo()管道链的例子,还演示了tee()。你可以在新窗口中运行这个demo[28] 或查看source code[29].

浏览器中的流

浏览器中内置了许多有用的流。你可以轻松地从`Blob`[30] 创建一个可读流。Blob接口的stream()[31]方法返回一个可读流,读取时返回Blob中包含的数据。还记得吗,`File`[32]对象是一种特定类型的Blob,可以在Blob可以使用的任何上下文中使用。

const readableStream = new Blob(['hello world'], { type'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode()的流的变体分别被称为`TextDecoderStream`[33]`TextEncoderStream`[34]

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream())

使用`CompressionStream`[35]`DecompressionStream`[36]转换流可以轻松地压缩或解压缩文件。下面的代码示例展示了如何下载Streams spec,在浏览器中对其进行压缩(gzip),并将压缩文件直接写入硬盘。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API[37]'s `FileSystemWritableFileStream`[38] 和实验性的 `fetch()` 请求流[39] 都是可写流的例子。

Serial API[40]大量使用了可读流和可写流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104101108108111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最后,`WebSocketStream`[41] 将流与WebSocket API集成。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

资源

  • Streams specification[42]
  • Accompanying demos[43]
  • Streams polyfill[44]
  • 2016—the year of web streams[45]
  • Async iterators and generators[46]
  • Stream Visualizer[47]

参考资料

[1]

实在不好用: https://gist.github.com/igrigorik/5736866

[2]

ReadableStream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream

[3]

ReadableStream(): https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream

[4]

ReadableStreamDefaultController: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultController

[5]

ReadableStreamDefaultController.close(): https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultController/close

[6]

ReadableStreamDefaultController.enqueue(): https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultController/enqueue

[7]

ReadableStreamDefaultController.error(): https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultController/error

[8]

ReadableStream(): https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream

[9]

ByteLengthQueuingStrategy: https://developer.mozilla.org/en-US/docs/Web/API/ByteLengthQueuingStrategy

[10]

CountQueuingStrategy: https://developer.mozilla.org/en-US/docs/Web/API/CountQueuingStrategy

[11]

ReadableStreamDefaultReader: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader

[12]

read(): https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader/read

[13]

ReadableStream.locked: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/locked

[14]

tee(): https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/tee

[15]

WritableStream: https://developer.mozilla.org/en-US/docs/Web/API/WritableStream

[16]

WritableStreamDefaultController: https://developer.mozilla.org/en-US/docs/Web/API/WritableStreamDefaultController

[17]

ByteLengthQueuingStrategy: https://developer.mozilla.org/en-US/docs/Web/API/ByteLengthQueuingStrategy

[18]

CountQueuingStrategy: https://developer.mozilla.org/en-US/docs/Web/API/CountQueuingStrategy

[19]

WritableStreamDefaultWriter: https://developer.mozilla.org/en-US/docs/Web/API/WritableStreamDefaultWriter

[20]

write(): https://developer.mozilla.org/en-US/docs/Web/API/WritableStreamDefaultWriter/write

[21]

WritableStream.locked: https://developer.mozilla.org/en-US/docs/Web/API/WritableStream/locked

[22]

pipeTo(): https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/pipeTo

[23]

pipeThrough(): https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/pipeThrough

[24]

作为一个流: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#consuming_a_fetch_as_a_stream

[25]

Can I use: https://caniuse.com/streams

[26]

参考实现: https://github.com/whatwg/streams/tree/master/reference-implementation

[27]

polyfill: https://github.com/MattiasBuelens/web-streams-polyfill

[28]

demo: https://streams-demo.glitch.me/

[29]

source code: https://glitch.com/edit/#!/streams-demo?path=script.js

[30]

Blob: https://developer.mozilla.org/en-US/docs/Web/API/Blob

[31]

stream(): https://developer.mozilla.org/en-US/docs/Web/API/Blob/stream

[32]

File: https://developer.mozilla.org/en-US/docs/Web/API/File

[33]

TextDecoderStream: https://encoding.spec.whatwg.org/#interface-textdecoderstream

[34]

TextEncoderStream: https://encoding.spec.whatwg.org/#interface-textencoderstream

[35]

CompressionStream: https://wicg.github.io/compression/#compression-stream

[36]

DecompressionStream: https://wicg.github.io/compression/#decompression-stream

[37]

File System Access API: https://wicg.github.io//file-system-access/

[38]

FileSystemWritableFileStream: https://wicg.github.io/file-system-access/#filesystemwritablefilestream

[39]

fetch() 请求流: https://web.dev/fetch-upload-streaming/#writable-streams

[40]

Serial API: https://web.dev/serial/

[41]

WebSocketStream: https://web.dev/websocketstream/

[42]

Streams specification: https://streams.spec.whatwg.org/

[43]

Accompanying demos: https://streams.spec.whatwg.org/demos/

[44]

Streams polyfill: https://github.com/MattiasBuelens/web-streams-polyfill

[45]

2016—the year of web streams: https://jakearchibald.com/2016/streams-ftw/

[46]

Async iterators and generators: https://jakearchibald.com/2017/async-iterators-and-generators/

[47]

Stream Visualizer: https://surma.dev/lab/whatwg-stream-visualizer/lab.html


- EOF -

推荐阅读  点击标题可跳转

1、TypeScript 4.2 有哪些新特性?

2、图文并茂讲清楚 JavaScript 内存管理

3、探索 Node.js 异步 Hooks


觉得本文对你有帮助?请分享给更多人

关注「大前端技术之路」加星标,提升前端技能

点赞和在看就是最大的支持❤️

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存