Skip to content

Commit 2b77b94

Browse files
devsnektargos
authored andcommitted
streams: refactor ReadableStream asyncIterator creation and a few fixes
Closes: #23041 - Rewrite `ReadableAsyncIterator` class into `ReadableStreamAsyncIteratorPrototype` which contains no constructor and inherits from `%AsyncIteratorPrototype%`. - Rewrite `AsyncIteratorRecord` into dumb function. PR-URL: #23042 Fixes: #23041 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Anatoli Papirovski <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Ruben Bridgewater <[email protected]>
1 parent 18cbde5 commit 2b77b94

File tree

3 files changed

+78
-48
lines changed

3 files changed

+78
-48
lines changed

lib/_stream_readable.js

+6-4
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const { emitExperimentalWarning } = require('internal/util');
4242

4343
// Lazy loaded to improve the startup performance.
4444
let StringDecoder;
45-
let ReadableAsyncIterator;
45+
let createReadableStreamAsyncIterator;
4646

4747
util.inherits(Readable, Stream);
4848

@@ -990,9 +990,11 @@ Readable.prototype.wrap = function(stream) {
990990

991991
Readable.prototype[Symbol.asyncIterator] = function() {
992992
emitExperimentalWarning('Readable[Symbol.asyncIterator]');
993-
if (ReadableAsyncIterator === undefined)
994-
ReadableAsyncIterator = require('internal/streams/async_iterator');
995-
return new ReadableAsyncIterator(this);
993+
if (createReadableStreamAsyncIterator === undefined) {
994+
createReadableStreamAsyncIterator =
995+
require('internal/streams/async_iterator');
996+
}
997+
return createReadableStreamAsyncIterator(this);
996998
};
997999

9981000
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {

lib/internal/streams/async_iterator.js

+50-44
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,9 @@ const kLastPromise = Symbol('lastPromise');
88
const kHandlePromise = Symbol('handlePromise');
99
const kStream = Symbol('stream');
1010

11-
const AsyncIteratorRecord = class AsyncIteratorRecord {
12-
constructor(value, done) {
13-
this.done = done;
14-
this.value = value;
15-
}
16-
};
11+
function createIterResult(value, done) {
12+
return { value, done };
13+
}
1714

1815
function readAndResolve(iter) {
1916
const resolve = iter[kLastResolve];
@@ -26,7 +23,7 @@ function readAndResolve(iter) {
2623
iter[kLastPromise] = null;
2724
iter[kLastResolve] = null;
2825
iter[kLastReject] = null;
29-
resolve(new AsyncIteratorRecord(data, false));
26+
resolve(createIterResult(data, false));
3027
}
3128
}
3229
}
@@ -43,7 +40,7 @@ function onEnd(iter) {
4340
iter[kLastPromise] = null;
4441
iter[kLastResolve] = null;
4542
iter[kLastReject] = null;
46-
resolve(new AsyncIteratorRecord(null, true));
43+
resolve(createIterResult(null, true));
4744
}
4845
iter[kEnded] = true;
4946
}
@@ -69,39 +66,13 @@ function wrapForNext(lastPromise, iter) {
6966
};
7067
}
7168

72-
const ReadableAsyncIterator = class ReadableAsyncIterator {
73-
constructor(stream) {
74-
this[kStream] = stream;
75-
this[kLastResolve] = null;
76-
this[kLastReject] = null;
77-
this[kError] = null;
78-
this[kEnded] = false;
79-
this[kLastPromise] = null;
80-
81-
stream.on('readable', onReadable.bind(null, this));
82-
stream.on('end', onEnd.bind(null, this));
83-
stream.on('error', onError.bind(null, this));
84-
85-
// the function passed to new Promise
86-
// is cached so we avoid allocating a new
87-
// closure at every run
88-
this[kHandlePromise] = (resolve, reject) => {
89-
const data = this[kStream].read();
90-
if (data) {
91-
this[kLastPromise] = null;
92-
this[kLastResolve] = null;
93-
this[kLastReject] = null;
94-
resolve(new AsyncIteratorRecord(data, false));
95-
} else {
96-
this[kLastResolve] = resolve;
97-
this[kLastReject] = reject;
98-
}
99-
};
100-
}
69+
const AsyncIteratorPrototype = Object.getPrototypeOf(
70+
Object.getPrototypeOf(async function* () {}).prototype);
10171

72+
const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
10273
get stream() {
10374
return this[kStream];
104-
}
75+
},
10576

10677
next() {
10778
// if we have detected an error in the meanwhile
@@ -112,7 +83,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
11283
}
11384

11485
if (this[kEnded]) {
115-
return Promise.resolve(new AsyncIteratorRecord(null, true));
86+
return Promise.resolve(createIterResult(null, true));
11687
}
11788

11889
// if we have multiple next() calls
@@ -129,7 +100,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
129100
// without triggering the next() queue
130101
const data = this[kStream].read();
131102
if (data !== null) {
132-
return Promise.resolve(new AsyncIteratorRecord(data, false));
103+
return Promise.resolve(createIterResult(data, false));
133104
}
134105

135106
promise = new Promise(this[kHandlePromise]);
@@ -138,7 +109,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
138109
this[kLastPromise] = promise;
139110

140111
return promise;
141-
}
112+
},
142113

143114
return() {
144115
// destroy(err, cb) is a private API
@@ -150,10 +121,45 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
150121
reject(err);
151122
return;
152123
}
153-
resolve(new AsyncIteratorRecord(null, true));
124+
resolve(createIterResult(null, true));
154125
});
155126
});
156-
}
127+
},
128+
}, AsyncIteratorPrototype);
129+
130+
const createReadableStreamAsyncIterator = (stream) => {
131+
const iterator = Object.create(ReadableStreamAsyncIteratorPrototype, {
132+
[kStream]: { value: stream, writable: true },
133+
[kLastResolve]: { value: null, writable: true },
134+
[kLastReject]: { value: null, writable: true },
135+
[kError]: { value: null, writable: true },
136+
[kEnded]: { value: false, writable: true },
137+
[kLastPromise]: { value: null, writable: true },
138+
// the function passed to new Promise
139+
// is cached so we avoid allocating a new
140+
// closure at every run
141+
[kHandlePromise]: {
142+
value: (resolve, reject) => {
143+
const data = iterator[kStream].read();
144+
if (data) {
145+
iterator[kLastPromise] = null;
146+
iterator[kLastResolve] = null;
147+
iterator[kLastReject] = null;
148+
resolve(createIterResult(data, false));
149+
} else {
150+
iterator[kLastResolve] = resolve;
151+
iterator[kLastReject] = reject;
152+
}
153+
},
154+
writable: true,
155+
},
156+
});
157+
158+
stream.on('readable', onReadable.bind(null, iterator));
159+
stream.on('end', onEnd.bind(null, iterator));
160+
stream.on('error', onError.bind(null, iterator));
161+
162+
return iterator;
157163
};
158164

159-
module.exports = ReadableAsyncIterator;
165+
module.exports = createReadableStreamAsyncIterator;

test/parallel/test-stream-readable-async-iterators.js

+22
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,28 @@ const { Readable } = require('stream');
55
const assert = require('assert');
66

77
async function tests() {
8+
{
9+
const AsyncIteratorPrototype = Object.getPrototypeOf(
10+
Object.getPrototypeOf(async function* () {}).prototype);
11+
const rs = new Readable({});
12+
assert.strictEqual(
13+
Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())),
14+
AsyncIteratorPrototype);
15+
}
16+
17+
await (async function() {
18+
const readable = new Readable({ objectMode: true, read() {} });
19+
readable.push(0);
20+
readable.push(1);
21+
readable.push(null);
22+
23+
const iter = readable[Symbol.asyncIterator]();
24+
assert.strictEqual((await iter.next()).value, 0);
25+
for await (const d of iter) {
26+
assert.strictEqual(d, 1);
27+
}
28+
})();
29+
830
await (async function() {
931
console.log('read without for..await');
1032
const max = 5;

0 commit comments

Comments
 (0)