Skip to content

Commit 05c3d53

Browse files
guybedfordtargos
authored andcommitted
stream: implement Readable.from async iterator utility
PR-URL: #27660 Reviewed-By: Gus Caplan <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Michaël Zasso <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
1 parent 0e16b35 commit 05c3d53

File tree

4 files changed

+314
-3
lines changed

4 files changed

+314
-3
lines changed

doc/api/stream.md

+111-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ There are four fundamental stream types within Node.js:
4646
* [`Transform`][] - `Duplex` streams that can modify or transform the data as it
4747
is written and read (for example, [`zlib.createDeflate()`][]).
4848

49-
Additionally, this module includes the utility functions [pipeline][] and
50-
[finished][].
49+
Additionally, this module includes the utility functions [pipeline][],
50+
[finished][] and [Readable.from][].
5151

5252
### Object Mode
5353

@@ -1480,6 +1480,31 @@ async function run() {
14801480
run().catch(console.error);
14811481
```
14821482

1483+
### Readable.from(iterable, [options])
1484+
1485+
* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
1486+
`Symbol.iterator` iterable protocol.
1487+
* `options` {Object} Options provided to `new stream.Readable([options])`.
1488+
By default, `Readable.from()` will set `options.objectMode` to `true`, unless
1489+
this is explicitly opted out by setting `options.objectMode` to `false`.
1490+
1491+
A utility method for creating Readable Streams out of iterators.
1492+
1493+
```js
1494+
const { Readable } = require('stream');
1495+
1496+
async function * generate() {
1497+
yield 'hello';
1498+
yield 'streams';
1499+
}
1500+
1501+
const readable = Readable.from(generate());
1502+
1503+
readable.on('data', (chunk) => {
1504+
console.log(chunk);
1505+
});
1506+
```
1507+
14831508
## API for Stream Implementers
14841509

14851510
<!--type=misc-->
@@ -2395,6 +2420,89 @@ primarily for examples and testing, but there are some use cases where
23952420

23962421
<!--type=misc-->
23972422

2423+
### Streams Compatibility with Async Generators and Async Iterators
2424+
2425+
With the support of async generators and iterators in JavaScript, async
2426+
generators are effectively a first-class language-level stream construct at
2427+
this point.
2428+
2429+
Some common interop cases of using Node.js streams with async generators
2430+
and async iterators are provided below.
2431+
2432+
#### Consuming Readable Streams with Async Iterators
2433+
2434+
```js
2435+
(async function() {
2436+
for await (const chunk of readable) {
2437+
console.log(chunk);
2438+
}
2439+
})();
2440+
```
2441+
2442+
#### Creating Readable Streams with Async Generators
2443+
2444+
We can construct a Node.js Readable Stream from an asynchronous generator
2445+
using the `Readable.from` utility method:
2446+
2447+
```js
2448+
const { Readable } = require('stream');
2449+
2450+
async function * generate() {
2451+
yield 'a';
2452+
yield 'b';
2453+
yield 'c';
2454+
}
2455+
2456+
const readable = Readable.from(generate());
2457+
2458+
readable.on('data', (chunk) => {
2459+
console.log(chunk);
2460+
});
2461+
```
2462+
2463+
#### Piping to Writable Streams from Async Iterators
2464+
2465+
In the scenario of writing to a writeable stream from an async iterator,
2466+
it is important to ensure the correct handling of backpressure and errors.
2467+
2468+
```js
2469+
const { once } = require('events');
2470+
2471+
const writeable = fs.createWriteStream('./file');
2472+
2473+
(async function() {
2474+
for await (const chunk of iterator) {
2475+
// Handle backpressure on write
2476+
if (!writeable.write(value))
2477+
await once(writeable, 'drain');
2478+
}
2479+
writeable.end();
2480+
// Ensure completion without errors
2481+
await once(writeable, 'finish');
2482+
})();
2483+
```
2484+
2485+
In the above, errors on the write stream would be caught and thrown by the two
2486+
`once` listeners, since `once` will also handle `'error'` events.
2487+
2488+
Alternatively the readable stream could be wrapped with `Readable.from` and
2489+
then piped via `.pipe`:
2490+
2491+
```js
2492+
const { once } = require('events');
2493+
2494+
const writeable = fs.createWriteStream('./file');
2495+
2496+
(async function() {
2497+
const readable = Readable.from(iterator);
2498+
readable.pipe(writeable);
2499+
// Ensure completion without errors
2500+
await once(writeable, 'finish');
2501+
})();
2502+
```
2503+
2504+
<!--type=misc-->
2505+
23982506
### Compatibility with Older Node.js Versions
23992507

24002508
<!--type=misc-->
@@ -2531,6 +2639,7 @@ contain multi-byte characters.
25312639
[Compatibility]: #stream_compatibility_with_older_node_js_versions
25322640
[HTTP requests, on the client]: http.html#http_class_http_clientrequest
25332641
[HTTP responses, on the server]: http.html#http_class_http_serverresponse
2642+
[Readable.from]: #readable.from
25342643
[TCP sockets]: net.html#net_class_net_socket
25352644
[child process stdin]: child_process.html#child_process_subprocess_stdin
25362645
[child process stdout and stderr]: child_process.html#child_process_subprocess_stdout

lib/_stream_readable.js

+39
Original file line numberDiff line numberDiff line change
@@ -1139,3 +1139,42 @@ function endReadableNT(state, stream) {
11391139
}
11401140
}
11411141
}
1142+
1143+
Readable.from = function(iterable, opts) {
1144+
let iterator;
1145+
if (iterable && iterable[Symbol.asyncIterator])
1146+
iterator = iterable[Symbol.asyncIterator]();
1147+
else if (iterable && iterable[Symbol.iterator])
1148+
iterator = iterable[Symbol.iterator]();
1149+
else
1150+
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
1151+
1152+
const readable = new Readable({
1153+
objectMode: true,
1154+
...opts
1155+
});
1156+
// Reading boolean to protect against _read
1157+
// being called before last iteration completion.
1158+
let reading = false;
1159+
readable._read = function() {
1160+
if (!reading) {
1161+
reading = true;
1162+
next();
1163+
}
1164+
};
1165+
async function next() {
1166+
try {
1167+
const { value, done } = await iterator.next();
1168+
if (done) {
1169+
readable.push(null);
1170+
} else if (readable.push(await value)) {
1171+
next();
1172+
} else {
1173+
reading = false;
1174+
}
1175+
} catch (err) {
1176+
readable.destroy(err);
1177+
}
1178+
}
1179+
return readable;
1180+
};

test/parallel/test-events-once.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,4 @@ Promise.all([
9090
catchesErrors(),
9191
stopListeningAfterCatchingError(),
9292
onceError()
93-
]);
93+
]).then(common.mustCall());

test/parallel/test-readable-from.js

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
'use strict';
2+
3+
const { mustCall } = require('../common');
4+
const { once } = require('events');
5+
const { Readable } = require('stream');
6+
const { strictEqual } = require('assert');
7+
8+
async function toReadableBasicSupport() {
9+
async function * generate() {
10+
yield 'a';
11+
yield 'b';
12+
yield 'c';
13+
}
14+
15+
const stream = Readable.from(generate());
16+
17+
const expected = ['a', 'b', 'c'];
18+
19+
for await (const chunk of stream) {
20+
strictEqual(chunk, expected.shift());
21+
}
22+
}
23+
24+
async function toReadableSyncIterator() {
25+
function * generate() {
26+
yield 'a';
27+
yield 'b';
28+
yield 'c';
29+
}
30+
31+
const stream = Readable.from(generate());
32+
33+
const expected = ['a', 'b', 'c'];
34+
35+
for await (const chunk of stream) {
36+
strictEqual(chunk, expected.shift());
37+
}
38+
}
39+
40+
async function toReadablePromises() {
41+
const promises = [
42+
Promise.resolve('a'),
43+
Promise.resolve('b'),
44+
Promise.resolve('c')
45+
];
46+
47+
const stream = Readable.from(promises);
48+
49+
const expected = ['a', 'b', 'c'];
50+
51+
for await (const chunk of stream) {
52+
strictEqual(chunk, expected.shift());
53+
}
54+
}
55+
56+
async function toReadableString() {
57+
const stream = Readable.from('abc');
58+
59+
const expected = ['a', 'b', 'c'];
60+
61+
for await (const chunk of stream) {
62+
strictEqual(chunk, expected.shift());
63+
}
64+
}
65+
66+
async function toReadableOnData() {
67+
async function * generate() {
68+
yield 'a';
69+
yield 'b';
70+
yield 'c';
71+
}
72+
73+
const stream = Readable.from(generate());
74+
75+
let iterations = 0;
76+
const expected = ['a', 'b', 'c'];
77+
78+
stream.on('data', (chunk) => {
79+
iterations++;
80+
strictEqual(chunk, expected.shift());
81+
});
82+
83+
await once(stream, 'end');
84+
85+
strictEqual(iterations, 3);
86+
}
87+
88+
async function toReadableOnDataNonObject() {
89+
async function * generate() {
90+
yield 'a';
91+
yield 'b';
92+
yield 'c';
93+
}
94+
95+
const stream = Readable.from(generate(), { objectMode: false });
96+
97+
let iterations = 0;
98+
const expected = ['a', 'b', 'c'];
99+
100+
stream.on('data', (chunk) => {
101+
iterations++;
102+
strictEqual(chunk instanceof Buffer, true);
103+
strictEqual(chunk.toString(), expected.shift());
104+
});
105+
106+
await once(stream, 'end');
107+
108+
strictEqual(iterations, 3);
109+
}
110+
111+
async function destroysTheStreamWhenThrowing() {
112+
async function * generate() {
113+
throw new Error('kaboom');
114+
}
115+
116+
const stream = Readable.from(generate());
117+
118+
stream.read();
119+
120+
try {
121+
await once(stream, 'error');
122+
} catch (err) {
123+
strictEqual(err.message, 'kaboom');
124+
strictEqual(stream.destroyed, true);
125+
}
126+
}
127+
128+
async function asTransformStream() {
129+
async function * generate(stream) {
130+
for await (const chunk of stream) {
131+
yield chunk.toUpperCase();
132+
}
133+
}
134+
135+
const source = new Readable({
136+
objectMode: true,
137+
read() {
138+
this.push('a');
139+
this.push('b');
140+
this.push('c');
141+
this.push(null);
142+
}
143+
});
144+
145+
const stream = Readable.from(generate(source));
146+
147+
const expected = ['A', 'B', 'C'];
148+
149+
for await (const chunk of stream) {
150+
strictEqual(chunk, expected.shift());
151+
}
152+
}
153+
154+
Promise.all([
155+
toReadableBasicSupport(),
156+
toReadableSyncIterator(),
157+
toReadablePromises(),
158+
toReadableString(),
159+
toReadableOnData(),
160+
toReadableOnDataNonObject(),
161+
destroysTheStreamWhenThrowing(),
162+
asTransformStream()
163+
]).then(mustCall());

0 commit comments

Comments
 (0)