Skip to content

Commit 247caac

Browse files
debadree25danielleadams
authored andcommitted
stream: fix pipeline callback not called on ended stream
Fixes: #46595 PR-URL: #46600 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Colin Ihrig <[email protected]>
1 parent d5eb1df commit 247caac

File tree

3 files changed

+46
-3
lines changed

3 files changed

+46
-3
lines changed

lib/internal/streams/pipeline.js

+10-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const {
3838
isTransformStream,
3939
isWebStream,
4040
isReadableStream,
41+
isReadableEnded,
4142
} = require('internal/streams/utils');
4243
const { AbortController } = require('internal/abort_controller');
4344

@@ -417,10 +418,17 @@ function pipe(src, dst, finish, { end }) {
417418
// Compat. Before node v10.12.0 stdio used to throw an error so
418419
// pipe() did/does not end() stdio destinations.
419420
// Now they allow it but "secretly" don't close the underlying fd.
420-
src.once('end', () => {
421+
422+
function endFn() {
421423
ended = true;
422424
dst.end();
423-
});
425+
}
426+
427+
if (isReadableEnded(src)) { // End the destination if the source has already ended.
428+
process.nextTick(endFn);
429+
} else {
430+
src.once('end', endFn);
431+
}
424432
} else {
425433
finish();
426434
}

test/parallel/test-stream-pipeline.js

+25
Original file line numberDiff line numberDiff line change
@@ -1591,3 +1591,28 @@ const tsp = require('timers/promises');
15911591
assert.strictEqual(writable.endCount, 1);
15921592
}));
15931593
}
1594+
1595+
{
1596+
const readable = new Readable({
1597+
read() {}
1598+
});
1599+
readable.on('end', common.mustCall(() => {
1600+
pipeline(readable, new PassThrough(), common.mustSucceed());
1601+
}));
1602+
readable.push(null);
1603+
readable.read();
1604+
}
1605+
1606+
{
1607+
const dup = new Duplex({
1608+
read() {},
1609+
write(chunk, enc, cb) {
1610+
cb();
1611+
}
1612+
});
1613+
dup.on('end', common.mustCall(() => {
1614+
pipeline(dup, new PassThrough(), common.mustSucceed());
1615+
}));
1616+
dup.push(null);
1617+
dup.read();
1618+
}

test/parallel/test-webstreams-pipeline.js

+11-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const common = require('../common');
44
const assert = require('assert');
5-
const { Readable, Writable, Transform, pipeline } = require('stream');
5+
const { Readable, Writable, Transform, pipeline, PassThrough } = require('stream');
66
const { pipeline: pipelinePromise } = require('stream/promises');
77
const { ReadableStream, WritableStream, TransformStream } = require('stream/web');
88
const http = require('http');
@@ -410,3 +410,13 @@ const http = require('http');
410410
}
411411
c.close();
412412
}
413+
414+
{
415+
const rs = new ReadableStream({
416+
start(controller) {
417+
controller.close();
418+
}
419+
});
420+
421+
pipeline(rs, new PassThrough(), common.mustSucceed());
422+
}

0 commit comments

Comments
 (0)