Skip to content

Commit 08e8997

Browse files
committedJul 17, 2020
stream: simpler and faster Readable async iterator
Reimplement as an async generator instead of a custom iterator class. PR-URL: #34035 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent bf0d82c commit 08e8997

7 files changed

+125
-269
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const Readable = require('stream').Readable;
5+
6+
const bench = common.createBenchmark(main, {
7+
n: [1e5],
8+
sync: ['yes', 'no'],
9+
});
10+
11+
async function main({ n, sync }) {
12+
sync = sync === 'yes';
13+
14+
const s = new Readable({
15+
objectMode: true,
16+
read() {
17+
if (sync) {
18+
this.push(1);
19+
} else {
20+
process.nextTick(() => {
21+
this.push(1);
22+
});
23+
}
24+
}
25+
});
26+
27+
bench.start();
28+
29+
let x = 0;
30+
for await (const chunk of s) {
31+
x += chunk;
32+
if (x > n) {
33+
break;
34+
}
35+
}
36+
37+
// Side effect to ensure V8 does not optimize away the
38+
// loop as a noop.
39+
console.log(x);
40+
41+
bench.end(n);
42+
}

‎lib/_stream_readable.js

+61-5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const {
2828
ObjectDefineProperties,
2929
ObjectKeys,
3030
ObjectSetPrototypeOf,
31+
Promise,
3132
Set,
3233
SymbolAsyncIterator,
3334
Symbol
@@ -60,11 +61,11 @@ const kPaused = Symbol('kPaused');
6061

6162
// Lazy loaded to improve the startup performance.
6263
let StringDecoder;
63-
let createReadableStreamAsyncIterator;
6464
let from;
6565

6666
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
6767
ObjectSetPrototypeOf(Readable, Stream);
68+
function nop() {}
6869

6970
const { errorOrDestroy } = destroyImpl;
7071

@@ -1055,13 +1056,68 @@ Readable.prototype.wrap = function(stream) {
10551056
};
10561057

10571058
Readable.prototype[SymbolAsyncIterator] = function() {
1058-
if (createReadableStreamAsyncIterator === undefined) {
1059-
createReadableStreamAsyncIterator =
1060-
require('internal/streams/async_iterator');
1059+
let stream = this;
1060+
1061+
if (typeof stream.read !== 'function') {
1062+
// v1 stream
1063+
const src = stream;
1064+
stream = new Readable({
1065+
objectMode: true,
1066+
destroy(err, callback) {
1067+
destroyImpl.destroyer(src, err);
1068+
callback();
1069+
}
1070+
}).wrap(src);
10611071
}
1062-
return createReadableStreamAsyncIterator(this);
1072+
1073+
const iter = createAsyncIterator(stream);
1074+
iter.stream = stream;
1075+
return iter;
10631076
};
10641077

1078+
async function* createAsyncIterator(stream) {
1079+
let callback = nop;
1080+
1081+
function next(resolve) {
1082+
if (this === stream) {
1083+
callback();
1084+
callback = nop;
1085+
} else {
1086+
callback = resolve;
1087+
}
1088+
}
1089+
1090+
stream
1091+
.on('readable', next)
1092+
.on('error', next)
1093+
.on('end', next)
1094+
.on('close', next);
1095+
1096+
try {
1097+
const state = stream._readableState;
1098+
while (true) {
1099+
const chunk = stream.read();
1100+
if (chunk !== null) {
1101+
yield chunk;
1102+
} else if (state.errored) {
1103+
throw state.errored;
1104+
} else if (state.ended) {
1105+
break;
1106+
} else if (state.closed) {
1107+
// TODO(ronag): ERR_PREMATURE_CLOSE?
1108+
break;
1109+
} else {
1110+
await new Promise(next);
1111+
}
1112+
}
1113+
} catch (err) {
1114+
destroyImpl.destroyer(stream, err);
1115+
throw err;
1116+
} finally {
1117+
destroyImpl.destroyer(stream, null);
1118+
}
1119+
}
1120+
10651121
// Making it explicit these properties are not enumerable
10661122
// because otherwise some prototype manipulation in
10671123
// userland will fail.

‎lib/internal/streams/async_iterator.js

-221
This file was deleted.

‎lib/internal/streams/pipeline.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const {
2323

2424
let EE;
2525
let PassThrough;
26-
let createReadableStreamAsyncIterator;
26+
let Readable;
2727

2828
function destroyer(stream, reading, writing, callback) {
2929
callback = once(callback);
@@ -113,11 +113,11 @@ function makeAsyncIterable(val) {
113113
}
114114

115115
async function* fromReadable(val) {
116-
if (!createReadableStreamAsyncIterator) {
117-
createReadableStreamAsyncIterator =
118-
require('internal/streams/async_iterator');
116+
if (!Readable) {
117+
Readable = require('_stream_readable');
119118
}
120-
yield* createReadableStreamAsyncIterator(val);
119+
120+
yield* Readable.prototype[SymbolAsyncIterator].call(val);
121121
}
122122

123123
async function pump(iterable, writable, finish) {

‎node.gyp

-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,6 @@
230230
'lib/internal/worker/js_transferable.js',
231231
'lib/internal/watchdog.js',
232232
'lib/internal/streams/lazy_transform.js',
233-
'lib/internal/streams/async_iterator.js',
234233
'lib/internal/streams/buffer_list.js',
235234
'lib/internal/streams/duplexpair.js',
236235
'lib/internal/streams/from.js',

‎test/parallel/test-readline-async-iterators-destroy.js

+1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ async function testMutualDestroy() {
7575
break;
7676
}
7777
assert.deepStrictEqual(iteratedLines, expectedLines);
78+
break;
7879
}
7980

8081
assert.deepStrictEqual(iteratedLines, expectedLines);

‎test/parallel/test-stream-readable-async-iterators.js

+16-37
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,6 @@ const {
1111
const assert = require('assert');
1212

1313
async function tests() {
14-
{
15-
const AsyncIteratorPrototype = Object.getPrototypeOf(
16-
Object.getPrototypeOf(async function* () {}).prototype);
17-
const rs = new Readable({
18-
read() {}
19-
});
20-
assert.strictEqual(
21-
Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())),
22-
AsyncIteratorPrototype);
23-
}
24-
2514
{
2615
// v1 stream
2716

@@ -53,7 +42,9 @@ async function tests() {
5342
});
5443

5544
const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
56-
iter.next().catch(common.mustCall((err) => {
45+
await iter.next();
46+
await iter.next();
47+
await iter.next().catch(common.mustCall((err) => {
5748
assert.strictEqual(err.message, 'asd');
5849
}));
5950
}
@@ -189,12 +180,19 @@ async function tests() {
189180
resolved.forEach(common.mustCall(
190181
(item, i) => assert.strictEqual(item.value, 'hello-' + i), max));
191182

192-
errors.forEach((promise) => {
183+
errors.slice(0, 1).forEach((promise) => {
193184
promise.catch(common.mustCall((err) => {
194185
assert.strictEqual(err.message, 'kaboom');
195186
}));
196187
});
197188

189+
errors.slice(1).forEach((promise) => {
190+
promise.then(common.mustCall(({ done, value }) => {
191+
assert.strictEqual(done, true);
192+
assert.strictEqual(value, undefined);
193+
}));
194+
});
195+
198196
readable.destroy(new Error('kaboom'));
199197
}
200198

@@ -284,28 +282,6 @@ async function tests() {
284282
assert.strictEqual(received, 1);
285283
}
286284

287-
{
288-
// Iterator throw.
289-
290-
const readable = new Readable({
291-
objectMode: true,
292-
read() {
293-
this.push('hello');
294-
}
295-
});
296-
297-
readable.on('error', common.mustCall((err) => {
298-
assert.strictEqual(err.message, 'kaboom');
299-
}));
300-
301-
const it = readable[Symbol.asyncIterator]();
302-
it.throw(new Error('kaboom')).catch(common.mustCall((err) => {
303-
assert.strictEqual(err.message, 'kaboom');
304-
}));
305-
306-
assert.strictEqual(readable.destroyed, true);
307-
}
308-
309285
{
310286
console.log('destroyed by throw');
311287
const readable = new Readable({
@@ -577,12 +553,15 @@ async function tests() {
577553
assert.strictEqual(e, err);
578554
})(), (async () => {
579555
let e;
556+
let x;
580557
try {
581-
await d;
558+
x = await d;
582559
} catch (_e) {
583560
e = _e;
584561
}
585-
assert.strictEqual(e, err);
562+
assert.strictEqual(e, undefined);
563+
assert.strictEqual(x.done, true);
564+
assert.strictEqual(x.value, undefined);
586565
})()]);
587566
}
588567

0 commit comments

Comments
 (0)
Please sign in to comment.