Skip to content

Commit 16ee842

Browse files
ronagtargos
authored andcommitted
stream: pipeline should drain empty readable
This simplifies some cases where the last stream is a Duplex without any expected output. await pipeline(readable, duplex) PR-URL: #40654 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent fe2cd09 commit 16ee842

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

lib/internal/streams/pipeline.js

+9-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const {
3333
isIterable,
3434
isReadableNodeStream,
3535
isNodeStream,
36+
isReadableFinished,
3637
} = require('internal/streams/utils');
3738
const { AbortController } = require('internal/abort_controller');
3839

@@ -229,7 +230,14 @@ function pipelineImpl(streams, callback, opts) {
229230

230231
if (isNodeStream(stream)) {
231232
finishCount++;
232-
destroys.push(destroyer(stream, reading, writing, finish));
233+
destroys.push(destroyer(stream, reading, writing, (err) => {
234+
if (!err && !reading && isReadableFinished(stream, false)) {
235+
stream.read(0);
236+
destroyer(stream, true, writing, finish);
237+
} else {
238+
finish(err);
239+
}
240+
}));
233241
}
234242

235243
if (i === 0) {

test/parallel/test-stream-pipeline.js

+21-1
Original file line numberDiff line numberDiff line change
@@ -1027,7 +1027,7 @@ const tsp = require('timers/promises');
10271027
const src = new PassThrough();
10281028
const dst = new PassThrough();
10291029
pipeline(src, dst, common.mustSucceed(() => {
1030-
assert.strictEqual(dst.destroyed, false);
1030+
assert.strictEqual(dst.destroyed, true);
10311031
}));
10321032
src.end();
10331033
}
@@ -1447,3 +1447,23 @@ const tsp = require('timers/promises');
14471447
assert.strictEqual(text, 'Hello World!');
14481448
}));
14491449
}
1450+
1451+
{
1452+
const pipelinePromise = promisify(pipeline);
1453+
1454+
async function run() {
1455+
const read = new Readable({
1456+
read() {}
1457+
});
1458+
1459+
const duplex = new PassThrough();
1460+
1461+
read.push(null);
1462+
1463+
await pipelinePromise(read, duplex);
1464+
1465+
assert.strictEqual(duplex.destroyed, true);
1466+
}
1467+
1468+
run();
1469+
}

0 commit comments

Comments
 (0)