Skip to content

Commit 7710846

Browse files
committed
stream: don't destroy on async iterator success
Destroying on async iterator completion ignores autoDestroy.
1 parent 9c62e0e commit 7710846

File tree

2 files changed

+163
-14
lines changed

2 files changed

+163
-14
lines changed

lib/internal/streams/readable.js

+29-11
Original file line numberDiff line numberDiff line change
@@ -1066,7 +1066,7 @@ Readable.prototype[SymbolAsyncIterator] = function() {
10661066
objectMode: true,
10671067
destroy(err, callback) {
10681068
destroyImpl.destroyer(src, err);
1069-
callback();
1069+
callback(err);
10701070
}
10711071
}).wrap(src);
10721072
}
@@ -1088,24 +1088,39 @@ async function* createAsyncIterator(stream) {
10881088
}
10891089
}
10901090

1091+
const state = stream._readableState;
1092+
1093+
let error = state.errored;
1094+
let errorEmitted = state.errorEmitted;
1095+
let endEmitted = state.endEmitted;
1096+
let closeEmitted = state.closeEmitted;
1097+
10911098
stream
10921099
.on('readable', next)
1093-
.on('error', next)
1094-
.on('end', next)
1095-
.on('close', next);
1100+
.on('error', function(err) {
1101+
error = err;
1102+
errorEmitted = true;
1103+
next.call(this);
1104+
})
1105+
.on('end', function() {
1106+
endEmitted = true;
1107+
next.call(this);
1108+
})
1109+
.on('close', function() {
1110+
closeEmitted = true;
1111+
next.call(this);
1112+
});
10961113

10971114
try {
1098-
const state = stream._readableState;
10991115
while (true) {
11001116
const chunk = stream.read();
11011117
if (chunk !== null) {
11021118
yield chunk;
1103-
} else if (state.errored) {
1104-
throw state.errored;
1105-
} else if (state.ended) {
1119+
} else if (errorEmitted) {
1120+
throw error;
1121+
} else if (endEmitted) {
11061122
break;
1107-
} else if (state.closed) {
1108-
// TODO(ronag): ERR_PREMATURE_CLOSE?
1123+
} else if (closeEmitted) {
11091124
break;
11101125
} else {
11111126
await new Promise(next);
@@ -1115,7 +1130,10 @@ async function* createAsyncIterator(stream) {
11151130
destroyImpl.destroyer(stream, err);
11161131
throw err;
11171132
} finally {
1118-
destroyImpl.destroyer(stream, null);
1133+
if (state.autoDestroy || !endEmitted) {
1134+
// TODO(ronag): ERR_PREMATURE_CLOSE?
1135+
destroyImpl.destroyer(stream, null);
1136+
}
11191137
}
11201138
}
11211139

test/parallel/test-stream-readable-async-iterators.js

+134-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const {
99
pipeline
1010
} = require('stream');
1111
const assert = require('assert');
12+
const http = require('http');
1213

1314
async function tests() {
1415
{
@@ -44,9 +45,11 @@ async function tests() {
4445
const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
4546
await iter.next();
4647
await iter.next();
47-
await iter.next().catch(common.mustCall((err) => {
48-
assert.strictEqual(err.message, 'asd');
49-
}));
48+
await iter.next()
49+
.then(common.mustNotCall())
50+
.catch(common.mustCall((err) => {
51+
assert.strictEqual(err.message, 'asd');
52+
}));
5053
}
5154

5255
{
@@ -581,6 +584,61 @@ async function tests() {
581584
assert.strictEqual(err, _err);
582585
}));
583586
}
587+
588+
{
589+
// Don't destroy if no auto destroy.
590+
// https://github.com/nodejs/node/issues/35116
591+
592+
const r = new Readable({
593+
autoDestroy: false,
594+
read() {
595+
this.push('asd');
596+
this.push(null);
597+
}
598+
});
599+
600+
for await (const chunk of r) {
601+
chunk;
602+
}
603+
assert.strictEqual(r.destroyed, false);
604+
}
605+
606+
{
607+
// Destroy if no auto destroy and premature break.
608+
// https://github.com/nodejs/node/pull/35122/files#r485678318
609+
610+
const r = new Readable({
611+
autoDestroy: false,
612+
read() {
613+
this.push('asd');
614+
}
615+
});
616+
617+
for await (const chunk of r) {
618+
chunk;
619+
break;
620+
}
621+
assert.strictEqual(r.destroyed, true);
622+
}
623+
624+
{
625+
// Don't destroy before 'end'.
626+
627+
const r = new Readable({
628+
read() {
629+
this.push('asd');
630+
this.push(null);
631+
}
632+
}).on('end', () => {
633+
assert.strictEqual(r.destroyed, false);
634+
});
635+
636+
for await (const chunk of r) {
637+
chunk;
638+
}
639+
640+
assert.strictEqual(r.destroyed, true);
641+
}
584642
}
585643

586644
{
@@ -643,5 +701,78 @@ async function tests() {
643701
});
644702
}
645703

704+
{
705+
let _req;
706+
const server = http.createServer((request, response) => {
707+
response.statusCode = 404;
708+
response.write('never ends');
709+
});
710+
711+
server.listen(() => {
712+
_req = http.request(`http://localhost:${server.address().port}`)
713+
.on('response', common.mustCall(async (res) => {
714+
setTimeout(() => {
715+
_req.destroy(new Error('something happened'));
716+
}, 100);
717+
718+
res.on('error', common.mustCall());
719+
720+
let _err;
721+
try {
722+
for await (const chunk of res) {
723+
chunk;
724+
}
725+
} catch (err) {
726+
_err = err;
727+
}
728+
729+
assert.strictEqual(_err.code, 'ECONNRESET');
730+
server.close();
731+
}))
732+
.on('error', common.mustCall())
733+
.end();
734+
});
735+
}
736+
737+
{
738+
async function getParsedBody(request) {
739+
let body = '';
740+
741+
for await (const data of request) {
742+
body += data;
743+
}
744+
745+
try {
746+
return JSON.parse(body);
747+
} catch {
748+
return {};
749+
}
750+
}
751+
752+
const str = JSON.stringify({ asd: true });
753+
const server = http.createServer(async (request, response) => {
754+
const body = await getParsedBody(request);
755+
response.statusCode = 200;
756+
assert.strictEqual(JSON.stringify(body), str);
757+
response.end(JSON.stringify(body));
758+
}).listen(() => {
759+
http
760+
.request({
761+
method: 'POST',
762+
hostname: 'localhost',
763+
port: server.address().port,
764+
})
765+
.end(str)
766+
.on('response', async (res) => {
767+
let body = '';
768+
for await (const chunk of res) {
769+
body += chunk;
770+
}
771+
assert.strictEqual(body, str);
772+
server.close();
773+
});
774+
});
775+
}
776+
646777
// To avoid missing some tests if a promise does not resolve
647778
tests().then(common.mustCall());

0 commit comments

Comments
 (0)