Skip to content

Commit 573410f

Browse files
ronagMylesBorins
authored andcommitted
stream: multiple stream backports
includes: * stream: simpler and faster Readable async iterator * stream: don't destroy on async iterator success * stream: async iterator stop read if destroyed PR-URL: #34887 Refs: #34035 Refs: #35122 Refs: #35640 Refs: #34680 Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent b82fc40 commit 573410f

File tree

6 files changed

+237
-271
lines changed

6 files changed

+237
-271
lines changed

lib/_stream_readable.js

+79-5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const {
2828
NumberParseInt,
2929
ObjectDefineProperties,
3030
ObjectSetPrototypeOf,
31+
Promise,
3132
Set,
3233
SymbolAsyncIterator,
3334
Symbol
@@ -60,11 +61,11 @@ const kPaused = Symbol('kPaused');
6061

6162
// Lazy loaded to improve the startup performance.
6263
let StringDecoder;
63-
let createReadableStreamAsyncIterator;
6464
let from;
6565

6666
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
6767
ObjectSetPrototypeOf(Readable, Stream);
68+
function nop() {}
6869

6970
const { errorOrDestroy } = destroyImpl;
7071

@@ -1076,13 +1077,86 @@ Readable.prototype.wrap = function(stream) {
10761077
};
10771078

10781079
Readable.prototype[SymbolAsyncIterator] = function() {
1079-
if (createReadableStreamAsyncIterator === undefined) {
1080-
createReadableStreamAsyncIterator =
1081-
require('internal/streams/async_iterator');
1080+
let stream = this;
1081+
1082+
if (typeof stream.read !== 'function') {
1083+
// v1 stream
1084+
const src = stream;
1085+
stream = new Readable({
1086+
objectMode: true,
1087+
destroy(err, callback) {
1088+
destroyImpl.destroyer(src, err);
1089+
callback(err);
1090+
}
1091+
}).wrap(src);
10821092
}
1083-
return createReadableStreamAsyncIterator(this);
1093+
1094+
const iter = createAsyncIterator(stream);
1095+
iter.stream = stream;
1096+
return iter;
10841097
};
10851098

1099+
async function* createAsyncIterator(stream) {
1100+
let callback = nop;
1101+
1102+
function next(resolve) {
1103+
if (this === stream) {
1104+
callback();
1105+
callback = nop;
1106+
} else {
1107+
callback = resolve;
1108+
}
1109+
}
1110+
1111+
const state = stream._readableState;
1112+
1113+
let error = state.errored;
1114+
let errorEmitted = state.errorEmitted;
1115+
let endEmitted = state.endEmitted;
1116+
let closeEmitted = state.closeEmitted;
1117+
1118+
stream
1119+
.on('readable', next)
1120+
.on('error', function(err) {
1121+
error = err;
1122+
errorEmitted = true;
1123+
next.call(this);
1124+
})
1125+
.on('end', function() {
1126+
endEmitted = true;
1127+
next.call(this);
1128+
})
1129+
.on('close', function() {
1130+
closeEmitted = true;
1131+
next.call(this);
1132+
});
1133+
1134+
try {
1135+
while (true) {
1136+
const chunk = stream.destroyed ? null : stream.read();
1137+
if (chunk !== null) {
1138+
yield chunk;
1139+
} else if (errorEmitted) {
1140+
throw error;
1141+
} else if (endEmitted) {
1142+
break;
1143+
} else if (closeEmitted) {
1144+
break;
1145+
} else {
1146+
await new Promise(next);
1147+
}
1148+
}
1149+
} catch (err) {
1150+
destroyImpl.destroyer(stream, err);
1151+
throw err;
1152+
} finally {
1153+
if (state.autoDestroy || !endEmitted) {
1154+
// TODO(ronag): ERR_PREMATURE_CLOSE?
1155+
destroyImpl.destroyer(stream, null);
1156+
}
1157+
}
1158+
}
1159+
10861160
// Making it explicit these properties are not enumerable
10871161
// because otherwise some prototype manipulation in
10881162
// userland will fail.

lib/internal/streams/async_iterator.js

-221
This file was deleted.

lib/internal/streams/destroy.js

+1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ function undestroy() {
123123

124124
if (w) {
125125
w.closed = false;
126+
w.closeEmitted = false;
126127
w.destroyed = false;
127128
w.errored = null;
128129
w.ended = false;

lib/internal/streams/pipeline.js

+4-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const {
2323

2424
let EE;
2525
let PassThrough;
26-
let createReadableStreamAsyncIterator;
26+
let Readable;
2727

2828
function destroyer(stream, reading, writing, callback) {
2929
callback = once(callback);
@@ -113,11 +113,10 @@ function makeAsyncIterable(val) {
113113
}
114114

115115
async function* fromReadable(val) {
116-
if (!createReadableStreamAsyncIterator) {
117-
createReadableStreamAsyncIterator =
118-
require('internal/streams/async_iterator');
116+
if (!Readable) {
117+
Readable = require('_stream_readable');
119118
}
120-
yield* createReadableStreamAsyncIterator(val);
119+
yield* Readable.prototype[SymbolAsyncIterator].call(val);
121120
}
122121

123122
async function pump(iterable, writable, finish) {

node.gyp

-1
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@
222222
'lib/internal/worker/js_transferable.js',
223223
'lib/internal/watchdog.js',
224224
'lib/internal/streams/lazy_transform.js',
225-
'lib/internal/streams/async_iterator.js',
226225
'lib/internal/streams/buffer_list.js',
227226
'lib/internal/streams/duplexpair.js',
228227
'lib/internal/streams/from.js',

0 commit comments

Comments
 (0)