Skip to content

Commit 609b25f

Browse files
debadree25RafaelGSS
authored andcommitted
stream: implement ReadableStream.from
Fixes: #48389 PR-URL: #48395 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Matthew Aitken <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent 1761bdf commit 609b25f

14 files changed

+1036
-2
lines changed

doc/api/webstreams.md

+43
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,49 @@ port1.onmessage = ({ data }) => {
387387
port2.postMessage(stream, [stream]);
388388
```
389389

390+
### `ReadableStream.from(iterable)`
391+
392+
<!-- YAML
393+
added: REPLACEME
394+
-->
395+
396+
* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
397+
`Symbol.iterator` iterable protocol.
398+
399+
A utility method that creates a new {ReadableStream} from an iterable.
400+
401+
```mjs
402+
import { ReadableStream } from 'node:stream/web';
403+
404+
async function* asyncIterableGenerator() {
405+
yield 'a';
406+
yield 'b';
407+
yield 'c';
408+
}
409+
410+
const stream = ReadableStream.from(asyncIterableGenerator());
411+
412+
for await (const chunk of stream)
413+
console.log(chunk); // Prints 'a', 'b', 'c'
414+
```
415+
416+
```cjs
417+
const { ReadableStream } = require('node:stream/web');
418+
419+
async function* asyncIterableGenerator() {
420+
yield 'a';
421+
yield 'b';
422+
yield 'c';
423+
}
424+
425+
(async () => {
426+
const stream = ReadableStream.from(asyncIterableGenerator());
427+
428+
for await (const chunk of stream)
429+
console.log(chunk); // Prints 'a', 'b', 'c'
430+
})();
431+
```
432+
390433
### Class: `ReadableStreamDefaultReader`
391434

392435
<!-- YAML

lib/internal/webstreams/readablestream.js

+59
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ const {
111111
nonOpCancel,
112112
nonOpPull,
113113
nonOpStart,
114+
getIterator,
115+
iteratorNext,
114116
kType,
115117
kState,
116118
} = require('internal/webstreams/util');
@@ -316,6 +318,10 @@ class ReadableStream {
316318
return isReadableStreamLocked(this);
317319
}
318320

321+
static from(iterable) {
322+
return readableStreamFromIterable(iterable);
323+
}
324+
319325
/**
320326
* @param {any} [reason]
321327
* @returns { Promise<void> }
@@ -1251,6 +1257,59 @@ const isReadableStreamBYOBReader =
12511257

12521258
// ---- ReadableStream Implementation
12531259

1260+
function readableStreamFromIterable(iterable) {
1261+
let stream;
1262+
const iteratorRecord = getIterator(iterable, 'async');
1263+
1264+
const startAlgorithm = nonOpStart;
1265+
1266+
async function pullAlgorithm() {
1267+
const nextResult = iteratorNext(iteratorRecord);
1268+
const nextPromise = PromiseResolve(nextResult);
1269+
return PromisePrototypeThen(nextPromise, (iterResult) => {
1270+
if (typeof iterResult !== 'object' || iterResult === null) {
1271+
throw new ERR_INVALID_STATE.TypeError(
1272+
'The promise returned by the iterator.next() method must fulfill with an object');
1273+
}
1274+
if (iterResult.done) {
1275+
readableStreamDefaultControllerClose(stream[kState].controller);
1276+
} else {
1277+
readableStreamDefaultControllerEnqueue(stream[kState].controller, iterResult.value);
1278+
}
1279+
});
1280+
}
1281+
1282+
async function cancelAlgorithm(reason) {
1283+
const iterator = iteratorRecord.iterator;
1284+
const returnMethod = iterator.return;
1285+
if (returnMethod === undefined) {
1286+
return PromiseResolve();
1287+
}
1288+
const returnResult = FunctionPrototypeCall(returnMethod, iterator, reason);
1289+
const returnPromise = PromiseResolve(returnResult);
1290+
return PromisePrototypeThen(returnPromise, (iterResult) => {
1291+
if (typeof iterResult !== 'object' || iterResult === null) {
1292+
throw new ERR_INVALID_STATE.TypeError(
1293+
'The promise returned by the iterator.return() method must fulfill with an object');
1294+
}
1295+
return undefined;
1296+
});
1297+
}
1298+
1299+
stream = new ReadableStream({
1300+
start: startAlgorithm,
1301+
pull: pullAlgorithm,
1302+
cancel: cancelAlgorithm,
1303+
}, {
1304+
size() {
1305+
return 1;
1306+
},
1307+
highWaterMark: 0,
1308+
});
1309+
1310+
return stream;
1311+
}
1312+
12541313
function readableStreamPipeTo(
12551314
source,
12561315
dest,

lib/internal/webstreams/util.js

+53
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@ const {
1313
PromiseReject,
1414
ReflectGet,
1515
Symbol,
16+
SymbolAsyncIterator,
17+
SymbolIterator,
1618
Uint8Array,
1719
} = primordials;
1820

1921
const {
2022
codes: {
2123
ERR_INVALID_ARG_VALUE,
2224
ERR_OPERATION_FAILED,
25+
ERR_INVALID_STATE,
2326
},
2427
} = require('internal/errors');
2528

@@ -217,6 +220,54 @@ function lazyTransfer() {
217220
return transfer;
218221
}
219222

223+
function createAsyncFromSyncIterator(syncIteratorRecord) {
224+
const syncIterable = {
225+
[SymbolIterator]: () => syncIteratorRecord.iterator,
226+
};
227+
228+
const asyncIterator = (async function* () {
229+
return yield* syncIterable;
230+
}());
231+
232+
const nextMethod = asyncIterator.next;
233+
return { iterator: asyncIterator, nextMethod, done: false };
234+
}
235+
236+
function getIterator(obj, kind = 'sync', method) {
237+
if (method === undefined) {
238+
if (kind === 'async') {
239+
method = obj[SymbolAsyncIterator];
240+
if (method === undefined) {
241+
const syncMethod = obj[SymbolIterator];
242+
const syncIteratorRecord = getIterator(obj, 'sync', syncMethod);
243+
return createAsyncFromSyncIterator(syncIteratorRecord);
244+
}
245+
} else {
246+
method = obj[SymbolIterator];
247+
}
248+
}
249+
250+
const iterator = FunctionPrototypeCall(method, obj);
251+
if (typeof iterator !== 'object' || iterator === null) {
252+
throw new ERR_INVALID_STATE.TypeError('The iterator method must return an object');
253+
}
254+
const nextMethod = iterator.next;
255+
return { iterator, nextMethod, done: false };
256+
}
257+
258+
function iteratorNext(iteratorRecord, value) {
259+
let result;
260+
if (value === undefined) {
261+
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator);
262+
} else {
263+
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]);
264+
}
265+
if (typeof result !== 'object' || result === null) {
266+
throw new ERR_INVALID_STATE.TypeError('The iterator.next() method must return an object');
267+
}
268+
return result;
269+
}
270+
220271
module.exports = {
221272
ArrayBufferViewGetBuffer,
222273
ArrayBufferViewGetByteLength,
@@ -243,6 +294,8 @@ module.exports = {
243294
nonOpPull,
244295
nonOpStart,
245296
nonOpWrite,
297+
getIterator,
298+
iteratorNext,
246299
kType,
247300
kState,
248301
};

test/fixtures/wpt/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Last update:
2626
- performance-timeline: https://github.com/web-platform-tests/wpt/tree/17ebc3aea0/performance-timeline
2727
- resource-timing: https://github.com/web-platform-tests/wpt/tree/22d38586d0/resource-timing
2828
- resources: https://github.com/web-platform-tests/wpt/tree/919874f84f/resources
29-
- streams: https://github.com/web-platform-tests/wpt/tree/51750bc8d7/streams
29+
- streams: https://github.com/web-platform-tests/wpt/tree/517e945bbf/streams
3030
- url: https://github.com/web-platform-tests/wpt/tree/84782d9315/url
3131
- user-timing: https://github.com/web-platform-tests/wpt/tree/5ae85bf826/user-timing
3232
- wasm/jsapi: https://github.com/web-platform-tests/wpt/tree/cde25e7e3c/wasm/jsapi
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// META: global=window,worker
2+
'use strict';
3+
4+
promise_test(async t => {
5+
/** @type {ReadableStreamDefaultController} */
6+
var con;
7+
let synchronous = false;
8+
new ReadableStream({ start(c) { con = c }}, { highWaterMark: 0 }).pipeTo(
9+
new WritableStream({ write() { synchronous = true; } })
10+
)
11+
// wait until start algorithm finishes
12+
await Promise.resolve();
13+
con.enqueue();
14+
assert_false(synchronous, 'write algorithm must not run synchronously');
15+
}, "enqueue() must not synchronously call write algorithm");

0 commit comments

Comments
 (0)