Skip to content

Commit 8ab8d6a

Browse files
addaleaxMylesBorins
authored andcommitted
stream: fix y.pipe(x)+y.pipe(x)+y.unpipe(x)
Fix the uncommon situation when a readable stream is piped twice into the same destination stream, and then unpiped once. Previously, the `unpipe` event handlers weren’t able to tell whether they were corresponding to the “right” conceptual pipe that was being removed; this fixes this by adding a counter to the `unpipe` event handler and only removing a single piping destination at most. Fixes: #12718 PR-URL: #12746 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 0ca2dad commit 8ab8d6a

File tree

2 files changed

+87
-5
lines changed

2 files changed

+87
-5
lines changed

lib/_stream_readable.js

+9-5
Original file line numberDiff line numberDiff line change
@@ -499,10 +499,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
499499
src.once('end', endFn);
500500

501501
dest.on('unpipe', onunpipe);
502-
function onunpipe(readable) {
502+
function onunpipe(readable, unpipeInfo) {
503503
debug('onunpipe');
504504
if (readable === src) {
505-
cleanup();
505+
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
506+
unpipeInfo.hasUnpiped = true;
507+
cleanup();
508+
}
506509
}
507510
}
508511

@@ -628,6 +631,7 @@ function pipeOnDrain(src) {
628631

629632
Readable.prototype.unpipe = function(dest) {
630633
var state = this._readableState;
634+
var unpipeInfo = { hasUnpiped: false };
631635

632636
// if we're not piping anywhere, then do nothing.
633637
if (state.pipesCount === 0)
@@ -647,7 +651,7 @@ Readable.prototype.unpipe = function(dest) {
647651
state.pipesCount = 0;
648652
state.flowing = false;
649653
if (dest)
650-
dest.emit('unpipe', this);
654+
dest.emit('unpipe', this, unpipeInfo);
651655
return this;
652656
}
653657

@@ -662,7 +666,7 @@ Readable.prototype.unpipe = function(dest) {
662666
state.flowing = false;
663667

664668
for (var i = 0; i < len; i++)
665-
dests[i].emit('unpipe', this);
669+
dests[i].emit('unpipe', this, unpipeInfo);
666670
return this;
667671
}
668672

@@ -676,7 +680,7 @@ Readable.prototype.unpipe = function(dest) {
676680
if (state.pipesCount === 1)
677681
state.pipes = state.pipes[0];
678682

679-
dest.emit('unpipe', this);
683+
dest.emit('unpipe', this, unpipeInfo);
680684

681685
return this;
682686
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
'use strict';
2+
const common = require('../common');
3+
4+
// Regression test for https://github.com/nodejs/node/issues/12718.
5+
// Tests that piping a source stream twice to the same destination stream
6+
// works, and that a subsequent unpipe() call only removes the pipe *once*.
7+
const assert = require('assert');
8+
const { PassThrough, Writable } = require('stream');
9+
10+
{
11+
const passThrough = new PassThrough();
12+
const dest = new Writable({
13+
write: common.mustCall((chunk, encoding, cb) => {
14+
assert.strictEqual(`${chunk}`, 'foobar');
15+
cb();
16+
})
17+
});
18+
19+
passThrough.pipe(dest);
20+
passThrough.pipe(dest);
21+
22+
assert.strictEqual(passThrough._events.data.length, 2);
23+
assert.strictEqual(passThrough._readableState.pipesCount, 2);
24+
assert.strictEqual(passThrough._readableState.pipes[0], dest);
25+
assert.strictEqual(passThrough._readableState.pipes[1], dest);
26+
27+
passThrough.unpipe(dest);
28+
29+
assert.strictEqual(passThrough._events.data.length, 1);
30+
assert.strictEqual(passThrough._readableState.pipesCount, 1);
31+
assert.strictEqual(passThrough._readableState.pipes, dest);
32+
33+
passThrough.write('foobar');
34+
passThrough.pipe(dest);
35+
}
36+
37+
{
38+
const passThrough = new PassThrough();
39+
const dest = new Writable({
40+
write: common.mustCall((chunk, encoding, cb) => {
41+
assert.strictEqual(`${chunk}`, 'foobar');
42+
cb();
43+
}, 2)
44+
});
45+
46+
passThrough.pipe(dest);
47+
passThrough.pipe(dest);
48+
49+
assert.strictEqual(passThrough._events.data.length, 2);
50+
assert.strictEqual(passThrough._readableState.pipesCount, 2);
51+
assert.strictEqual(passThrough._readableState.pipes[0], dest);
52+
assert.strictEqual(passThrough._readableState.pipes[1], dest);
53+
54+
passThrough.write('foobar');
55+
}
56+
57+
{
58+
const passThrough = new PassThrough();
59+
const dest = new Writable({
60+
write: common.mustNotCall()
61+
});
62+
63+
passThrough.pipe(dest);
64+
passThrough.pipe(dest);
65+
66+
assert.strictEqual(passThrough._events.data.length, 2);
67+
assert.strictEqual(passThrough._readableState.pipesCount, 2);
68+
assert.strictEqual(passThrough._readableState.pipes[0], dest);
69+
assert.strictEqual(passThrough._readableState.pipes[1], dest);
70+
71+
passThrough.unpipe(dest);
72+
passThrough.unpipe(dest);
73+
74+
assert.strictEqual(passThrough._events.data, undefined);
75+
assert.strictEqual(passThrough._readableState.pipesCount, 0);
76+
77+
passThrough.write('foobar');
78+
}

0 commit comments

Comments
 (0)