Skip to content

Commit 4640ea2

Browse files
ronagMylesBorins
authored andcommitted
stream: don't destroy final readable stream in pipeline
If the last stream in a pipeline is still usable/readable don't destroy it to allow further composition. Fixes: #32105 Backport-PR-URL: #32111 PR-URL: #32110 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent ddb8824 commit 4640ea2

File tree

2 files changed

+52
-3
lines changed

2 files changed

+52
-3
lines changed

lib/internal/streams/pipeline.js

+6-3
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,18 @@ function destroyStream(stream, err) {
3636
if (typeof stream.close === 'function') return stream.close();
3737
}
3838

39-
function destroyer(stream, reading, writing, callback) {
39+
function destroyer(stream, reading, writing, final, callback) {
4040
callback = once(callback);
4141
let destroyed = false;
4242

4343
if (eos === undefined) eos = require('internal/streams/end-of-stream');
4444
eos(stream, { readable: reading, writable: writing }, (err) => {
4545
if (destroyed) return;
4646
destroyed = true;
47-
destroyStream(stream, err);
47+
const readable = stream.readable || isRequest(stream);
48+
if (err || !final || !readable) {
49+
destroyStream(stream, err);
50+
}
4851
callback(err);
4952
});
5053

@@ -176,7 +179,7 @@ function pipeline(...streams) {
176179
}
177180

178181
function wrap(stream, reading, writing, final) {
179-
destroys.push(destroyer(stream, reading, writing, (err) => {
182+
destroys.push(destroyer(stream, reading, writing, final, (err) => {
180183
finish(err, final);
181184
}));
182185
}

test/parallel/test-stream-pipeline.js

+46
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,52 @@ const { promisify } = require('util');
918918
const dst = new PassThrough({ autoDestroy: false });
919919
pipeline(src, dst, common.mustCall(() => {
920920
assert.strictEqual(src.destroyed, true);
921+
assert.strictEqual(dst.destroyed, false);
922+
}));
923+
src.end();
924+
}
925+
926+
{
927+
const server = http.createServer((req, res) => {
928+
});
929+
930+
server.listen(0, () => {
931+
const req = http.request({
932+
port: server.address().port
933+
});
934+
935+
const body = new PassThrough();
936+
pipeline(
937+
body,
938+
req,
939+
common.mustCall((err) => {
940+
assert(!err);
941+
assert(!req.res);
942+
assert(!req.aborted);
943+
req.abort();
944+
server.close();
945+
})
946+
);
947+
body.end();
948+
});
949+
}
950+
951+
{
952+
const src = new PassThrough();
953+
const dst = new PassThrough();
954+
pipeline(src, dst, common.mustCall((err) => {
955+
assert(!err);
956+
assert.strictEqual(dst.destroyed, false);
957+
}));
958+
src.end();
959+
}
960+
961+
{
962+
const src = new PassThrough();
963+
const dst = new PassThrough();
964+
dst.readable = false;
965+
pipeline(src, dst, common.mustCall((err) => {
966+
assert(!err);
921967
assert.strictEqual(dst.destroyed, true);
922968
}));
923969
src.end();

0 commit comments

Comments
 (0)