Skip to content

Commit 6993eb0

Browse files
committed
stream: fix y.pipe(x)+y.pipe(x)+y.unpipe(x)
Fix the uncommon situation when a readable stream is piped twice into the same destination stream, and then unpiped once. Previously, the `unpipe` event handlers weren’t able to tell whether they were corresponding to the “right” conceptual pipe that was being removed; this fixes this by adding a counter to the `unpipe` event handler and only removing a single piping destination at most. Fixes: #12718 PR-URL: #12746 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 55c95b1 commit 6993eb0

File tree

2 files changed

+87
-5
lines changed

2 files changed

+87
-5
lines changed

lib/_stream_readable.js

+9-5
Original file line numberDiff line numberDiff line change
@@ -518,10 +518,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
518518
src.once('end', endFn);
519519

520520
dest.on('unpipe', onunpipe);
521-
function onunpipe(readable) {
521+
function onunpipe(readable, unpipeInfo) {
522522
debug('onunpipe');
523523
if (readable === src) {
524-
cleanup();
524+
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
525+
unpipeInfo.hasUnpiped = true;
526+
cleanup();
527+
}
525528
}
526529
}
527530

@@ -647,6 +650,7 @@ function pipeOnDrain(src) {
647650

648651
Readable.prototype.unpipe = function(dest) {
649652
var state = this._readableState;
653+
var unpipeInfo = { hasUnpiped: false };
650654

651655
// if we're not piping anywhere, then do nothing.
652656
if (state.pipesCount === 0)
@@ -666,7 +670,7 @@ Readable.prototype.unpipe = function(dest) {
666670
state.pipesCount = 0;
667671
state.flowing = false;
668672
if (dest)
669-
dest.emit('unpipe', this);
673+
dest.emit('unpipe', this, unpipeInfo);
670674
return this;
671675
}
672676

@@ -681,7 +685,7 @@ Readable.prototype.unpipe = function(dest) {
681685
state.flowing = false;
682686

683687
for (var i = 0; i < len; i++)
684-
dests[i].emit('unpipe', this);
688+
dests[i].emit('unpipe', this, unpipeInfo);
685689
return this;
686690
}
687691

@@ -695,7 +699,7 @@ Readable.prototype.unpipe = function(dest) {
695699
if (state.pipesCount === 1)
696700
state.pipes = state.pipes[0];
697701

698-
dest.emit('unpipe', this);
702+
dest.emit('unpipe', this, unpipeInfo);
699703

700704
return this;
701705
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
'use strict';
2+
const common = require('../common');
3+
4+
// Regression test for https://github.com/nodejs/node/issues/12718.
5+
// Tests that piping a source stream twice to the same destination stream
6+
// works, and that a subsequent unpipe() call only removes the pipe *once*.
7+
const assert = require('assert');
8+
const { PassThrough, Writable } = require('stream');
9+
10+
{
11+
const passThrough = new PassThrough();
12+
const dest = new Writable({
13+
write: common.mustCall((chunk, encoding, cb) => {
14+
assert.strictEqual(`${chunk}`, 'foobar');
15+
cb();
16+
})
17+
});
18+
19+
passThrough.pipe(dest);
20+
passThrough.pipe(dest);
21+
22+
assert.strictEqual(passThrough._events.data.length, 2);
23+
assert.strictEqual(passThrough._readableState.pipesCount, 2);
24+
assert.strictEqual(passThrough._readableState.pipes[0], dest);
25+
assert.strictEqual(passThrough._readableState.pipes[1], dest);
26+
27+
passThrough.unpipe(dest);
28+
29+
assert.strictEqual(passThrough._events.data.length, 1);
30+
assert.strictEqual(passThrough._readableState.pipesCount, 1);
31+
assert.strictEqual(passThrough._readableState.pipes, dest);
32+
33+
passThrough.write('foobar');
34+
passThrough.pipe(dest);
35+
}
36+
37+
{
38+
const passThrough = new PassThrough();
39+
const dest = new Writable({
40+
write: common.mustCall((chunk, encoding, cb) => {
41+
assert.strictEqual(`${chunk}`, 'foobar');
42+
cb();
43+
}, 2)
44+
});
45+
46+
passThrough.pipe(dest);
47+
passThrough.pipe(dest);
48+
49+
assert.strictEqual(passThrough._events.data.length, 2);
50+
assert.strictEqual(passThrough._readableState.pipesCount, 2);
51+
assert.strictEqual(passThrough._readableState.pipes[0], dest);
52+
assert.strictEqual(passThrough._readableState.pipes[1], dest);
53+
54+
passThrough.write('foobar');
55+
}
56+
57+
{
58+
const passThrough = new PassThrough();
59+
const dest = new Writable({
60+
write: common.mustNotCall()
61+
});
62+
63+
passThrough.pipe(dest);
64+
passThrough.pipe(dest);
65+
66+
assert.strictEqual(passThrough._events.data.length, 2);
67+
assert.strictEqual(passThrough._readableState.pipesCount, 2);
68+
assert.strictEqual(passThrough._readableState.pipes[0], dest);
69+
assert.strictEqual(passThrough._readableState.pipes[1], dest);
70+
71+
passThrough.unpipe(dest);
72+
passThrough.unpipe(dest);
73+
74+
assert.strictEqual(passThrough._events.data, undefined);
75+
assert.strictEqual(passThrough._readableState.pipesCount, 0);
76+
77+
passThrough.write('foobar');
78+
}

0 commit comments

Comments
 (0)