Skip to content

Commit 5fefa4f

Browse files
committed
stream: don't destroy final readable stream in pipeline
If the last stream in a pipeline is still usable/readable don't destroy it to allow further composition. Fixes: nodejs#32105
1 parent b1d4c13 commit 5fefa4f

File tree

2 files changed

+57
-4
lines changed

2 files changed

+57
-4
lines changed

lib/internal/streams/pipeline.js

+10-3
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@ let EE;
2525
let PassThrough;
2626
let createReadableStreamAsyncIterator;
2727

28-
function destroyer(stream, reading, writing, callback) {
28+
function destroyer(stream, reading, writing, final, callback) {
2929
const _destroy = once((err) => {
30-
destroyImpl.destroyer(stream, err);
30+
const readable = stream.readable || isRequest(stream);
31+
if (err || !final || !readable) {
32+
destroyImpl.destroyer(stream, err);
33+
}
3134
callback(err);
3235
});
3336

@@ -68,6 +71,10 @@ function popCallback(streams) {
6871
return streams.pop();
6972
}
7073

74+
function isRequest(stream) {
75+
return stream.setHeader && typeof stream.abort === 'function';
76+
}
77+
7178
function isPromise(obj) {
7279
return !!(obj && typeof obj.then === 'function');
7380
}
@@ -159,7 +166,7 @@ function pipeline(...streams) {
159166
}
160167

161168
function wrap(stream, reading, writing, final) {
162-
destroys.push(destroyer(stream, reading, writing, (err) => {
169+
destroys.push(destroyer(stream, reading, writing, final, (err) => {
163170
finish(err, final);
164171
}));
165172
}

test/parallel/test-stream-pipeline.js

+47-1
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ const { promisify } = require('util');
916916
const dst = new PassThrough({ autoDestroy: false });
917917
pipeline(src, dst, common.mustCall(() => {
918918
assert.strictEqual(src.destroyed, true);
919-
assert.strictEqual(dst.destroyed, true);
919+
assert.strictEqual(dst.destroyed, false);
920920
}));
921921
src.end();
922922
}
@@ -938,3 +938,49 @@ const { promisify } = require('util');
938938
r.push(null);
939939
r.emit('close');
940940
}
941+
942+
{
943+
const server = http.createServer((req, res) => {
944+
});
945+
946+
server.listen(0, () => {
947+
const req = http.request({
948+
port: server.address().port
949+
});
950+
951+
const body = new PassThrough();
952+
pipeline(
953+
body,
954+
req,
955+
common.mustCall((err) => {
956+
assert(!err);
957+
assert(!req.res);
958+
assert(!req.aborted);
959+
req.abort();
960+
server.close();
961+
})
962+
);
963+
body.end();
964+
});
965+
}
966+
967+
{
968+
const src = new PassThrough();
969+
const dst = new PassThrough();
970+
pipeline(src, dst, common.mustCall((err) => {
971+
assert(!err);
972+
assert.strictEqual(dst.destroyed, false);
973+
}));
974+
src.end();
975+
}
976+
977+
{
978+
const src = new PassThrough();
979+
const dst = new PassThrough();
980+
dst.readable = false;
981+
pipeline(src, dst, common.mustCall((err) => {
982+
assert(!err);
983+
assert.strictEqual(dst.destroyed, true);
984+
}));
985+
src.end();
986+
}

0 commit comments

Comments
 (0)