Skip to content

Commit 4392b4a

Browse files
addaleaxMyles Borins
authored and
Myles Borins
committed
stream: Fix readableState.awaitDrain mechanism
In 6899094 (#2325), the conditions for increasing `readableState.awaitDrain` when writing to a piping destination returns false were changed so that they could not actually be met, effectively leaving `readableState.awaitDrain` with a constant value of 0. This patch changes the conditions to testing whether the stream for which `.write()` returned false is still a piping destination, which was likely the intention of the original patch. Fixes: #5820 Fixes: #5257 PR-URL: #6023 Reviewed-By: Brian White <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent b26fea1 commit 4392b4a

File tree

2 files changed

+43
-3
lines changed

2 files changed

+43
-3
lines changed

lib/_stream_readable.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -538,9 +538,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
538538
// If the user unpiped during `dest.write()`, it is possible
539539
// to get stuck in a permanently paused state if that write
540540
// also returned false.
541-
if (state.pipesCount === 1 &&
542-
state.pipes[0] === dest &&
543-
src.listenerCount('data') === 1 &&
541+
// => Check whether `dest` is still a piping destination.
542+
if (((state.pipesCount === 1 && state.pipes === dest) ||
543+
(state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
544544
!cleanedUp) {
545545
debug('false write response, pause', src._readableState.awaitDrain);
546546
src._readableState.awaitDrain++;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
'use strict';
2+
const common = require('../common');
3+
const stream = require('stream');
4+
5+
// This is very similar to test-stream-pipe-cleanup-pause.js.
6+
7+
const reader = new stream.Readable();
8+
const writer1 = new stream.Writable();
9+
const writer2 = new stream.Writable();
10+
11+
// 560000 is chosen here because it is larger than the (default) highWaterMark
12+
// and will cause `.write()` to return false
13+
// See: https://github.com/nodejs/node/issues/5820
14+
const buffer = Buffer.allocUnsafe(560000);
15+
16+
reader._read = function(n) {};
17+
18+
writer1._write = common.mustCall(function(chunk, encoding, cb) {
19+
this.emit('chunk-received');
20+
cb();
21+
}, 1);
22+
writer1.once('chunk-received', function() {
23+
setImmediate(function() {
24+
// This one should *not* get through to writer1 because writer2 is not
25+
// "done" processing.
26+
reader.push(buffer);
27+
});
28+
});
29+
30+
// A "slow" consumer:
31+
writer2._write = common.mustCall(function(chunk, encoding, cb) {
32+
// Not calling cb here to "simulate" slow stream.
33+
34+
// This should be called exactly once, since the first .write() call
35+
// will return false.
36+
}, 1);
37+
38+
reader.pipe(writer1);
39+
reader.pipe(writer2);
40+
reader.push(buffer);

0 commit comments

Comments
 (0)