Skip to content

Commit de549cd

Browse files
committed
stream: added experimental support for for-await
Adds support for Symbol.asyncIterator into the Readable class. The stream is destroyed when the loop terminates with break or throw. Fixes: nodejs#15709
1 parent 4d96c17 commit de549cd

File tree

5 files changed

+492
-0
lines changed

5 files changed

+492
-0
lines changed

doc/api/stream.md

+26
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,31 @@ readable stream will release any internal resources.
11651165
Implementors should not override this method, but instead implement
11661166
[`readable._destroy`][readable-_destroy].
11671167

1168+
##### readable[@@asyncIterator]
1169+
<!-- YAML
1170+
added: REPLACEME
1171+
-->
1172+
1173+
> Stability: 1 - Experimental
1174+
1175+
Returns an [AsyncIterator][async-iterator] to fully consume the stream.
1176+
1177+
```js
1178+
async function print(readable) {
1179+
readable.setEncoding('utf8');
1180+
let data = '';
1181+
for await (const k of readable) {
1182+
data += k;
1183+
}
1184+
console.log(data);
1185+
}
1186+
1187+
print(fs.createReadStream('file')).catch(console.log);
1188+
```
1189+
1190+
If the loop terminates with a `break` or a `throw`, the stream will be destroyed.
1191+
In other terms, iterating over a stream will consume the stream fully.
1192+
11681193
### Duplex and Transform Streams
11691194

11701195
#### Class: stream.Duplex
@@ -2328,3 +2353,4 @@ contain multi-byte characters.
23282353
[readable-destroy]: #stream_readable_destroy_error
23292354
[writable-_destroy]: #stream_writable_destroy_err_callback
23302355
[writable-destroy]: #stream_writable_destroy_error
2356+
[async-iterator]: https://github.com/tc39/proposal-async-iteration

lib/_stream_readable.js

+8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const debug = util.debuglog('stream');
3232
const BufferList = require('internal/streams/BufferList');
3333
const destroyImpl = require('internal/streams/destroy');
3434
const errors = require('internal/errors');
35+
const ReadableAsyncIterator = require('internal/streams/async_iterator');
36+
const { emitExperimentalWarning } = require('internal/util');
3537
var StringDecoder;
3638

3739
util.inherits(Readable, Stream);
@@ -922,6 +924,12 @@ Readable.prototype.wrap = function(stream) {
922924
return this;
923925
};
924926

927+
Readable.prototype[Symbol.asyncIterator] = function() {
928+
emitExperimentalWarning('Readable[Symbol.asyncIterator]');
929+
930+
return new ReadableAsyncIterator(this);
931+
};
932+
925933
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
926934
// making it explicit this property is not enumerable
927935
// because otherwise some prototype manipulation in
+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
'use strict';
2+
3+
const kLastResolve = Symbol('lastResolve');
4+
const kLastReject = Symbol('lastReject');
5+
const kError = Symbol('error');
6+
const kEnded = Symbol('ended');
7+
const kLastPromise = Symbol('lastPromise');
8+
const kHandlePromise = Symbol('handlePromise');
9+
const kStream = Symbol('stream');
10+
11+
const AsyncIteratorRecord = class AsyncIteratorRecord {
12+
constructor(value, done) {
13+
this.done = done;
14+
this.value = value;
15+
}
16+
};
17+
18+
function readAndResolve(iter) {
19+
const resolve = iter[kLastResolve];
20+
if (resolve !== null) {
21+
const data = iter[kStream].read();
22+
// we defer if data is null
23+
// we can be expecting either 'end' or
24+
// 'error'
25+
if (data !== null) {
26+
iter[kLastPromise] = null;
27+
iter[kLastResolve] = null;
28+
iter[kLastReject] = null;
29+
resolve(new AsyncIteratorRecord(data, false));
30+
}
31+
}
32+
}
33+
34+
function onReadable(iter) {
35+
// we wait for the next tick, because it might
36+
// emit an error with process.nextTick
37+
process.nextTick(readAndResolve, iter);
38+
}
39+
40+
function onEnd(iter) {
41+
const resolve = iter[kLastResolve];
42+
if (resolve !== null) {
43+
iter[kLastPromise] = null;
44+
iter[kLastResolve] = null;
45+
iter[kLastReject] = null;
46+
resolve(new AsyncIteratorRecord(null, true));
47+
}
48+
iter[kEnded] = true;
49+
}
50+
51+
function onError(iter, err) {
52+
const reject = iter[kLastReject];
53+
// reject if we are waiting for data in the Promise
54+
// returned by next() and store the error
55+
if (reject !== null) {
56+
iter[kLastPromise] = null;
57+
iter[kLastResolve] = null;
58+
iter[kLastReject] = null;
59+
reject(err);
60+
}
61+
iter.error = err;
62+
}
63+
64+
function wrapForNext(lastPromise, iter) {
65+
return function(resolve, reject) {
66+
lastPromise.then(function() {
67+
iter[kHandlePromise](resolve, reject);
68+
}, reject);
69+
};
70+
}
71+
72+
const ReadableAsyncIterator = class ReadableAsyncIterator {
73+
constructor(stream) {
74+
this[kStream] = stream;
75+
this[kLastResolve] = null;
76+
this[kLastReject] = null;
77+
this[kError] = null;
78+
this[kEnded] = false;
79+
this[kLastPromise] = null;
80+
81+
stream.on('readable', onReadable.bind(null, this));
82+
stream.on('end', onEnd.bind(null, this));
83+
stream.on('error', onError.bind(null, this));
84+
85+
// the function passed to new Promise
86+
// is cached so we avoid allocating a new
87+
// closure at every run
88+
this[kHandlePromise] = (resolve, reject) => {
89+
const data = this[kStream].read();
90+
if (data) {
91+
this[kLastPromise] = null;
92+
this[kLastResolve] = null;
93+
this[kLastReject] = null;
94+
resolve(new AsyncIteratorRecord(data, false));
95+
} else {
96+
this[kLastResolve] = resolve;
97+
this[kLastReject] = reject;
98+
}
99+
};
100+
}
101+
102+
get stream() {
103+
return this[kStream];
104+
}
105+
106+
next() {
107+
// if we have detected an error in the meanwhile
108+
// reject straight away
109+
const error = this[kError];
110+
if (error !== null) {
111+
return Promise.reject(error);
112+
}
113+
114+
if (this[kEnded]) {
115+
return Promise.resolve(new AsyncIteratorRecord(null, true));
116+
}
117+
118+
// if we have multiple next() calls
119+
// we will wait for the previous Promise to finish
120+
// this logic is optimized to support for await loops,
121+
// where next() is only called once at a time
122+
const lastPromise = this[kLastPromise];
123+
let promise;
124+
125+
if (lastPromise) {
126+
promise = new Promise(wrapForNext(lastPromise, this));
127+
} else {
128+
// fast path needed to support multiple this.push()
129+
// without triggering the next() queue
130+
const data = this[kStream].read();
131+
if (data !== null) {
132+
return Promise.resolve(new AsyncIteratorRecord(data, false));
133+
}
134+
135+
promise = new Promise(this[kHandlePromise]);
136+
}
137+
138+
this[kLastPromise] = promise;
139+
140+
return promise;
141+
}
142+
143+
return() {
144+
// destroy(err, cb) is a private API
145+
// we can guarantee we have that here, because we control the
146+
// Readable class this is attached to
147+
return new Promise((resolve, reject) => {
148+
this[kStream].destroy(null, (err) => {
149+
if (err) {
150+
reject(err);
151+
return;
152+
}
153+
resolve(new AsyncIteratorRecord(null, true));
154+
});
155+
});
156+
}
157+
};
158+
159+
module.exports = ReadableAsyncIterator;

node.gyp

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@
138138
'lib/internal/v8_prof_polyfill.js',
139139
'lib/internal/v8_prof_processor.js',
140140
'lib/internal/streams/lazy_transform.js',
141+
'lib/internal/streams/async_iterator.js',
141142
'lib/internal/streams/BufferList.js',
142143
'lib/internal/streams/legacy.js',
143144
'lib/internal/streams/destroy.js',

0 commit comments

Comments
 (0)