Skip to content

Commit 3f33d0b

Browse files
ronagtargos
authored andcommitted
stream: fix pipe deadlock when starting with needDrain
Fixes: #36544 PR-URL: #36563 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent 95c80f5 commit 3f33d0b

File tree

2 files changed

+35
-28
lines changed

2 files changed

+35
-28
lines changed

lib/internal/streams/readable.js

+28-24
Original file line numberDiff line numberDiff line change
@@ -716,35 +716,39 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
716716
ondrain();
717717
}
718718

719+
function pause() {
720+
// If the user unpiped during `dest.write()`, it is possible
721+
// to get stuck in a permanently paused state if that write
722+
// also returned false.
723+
// => Check whether `dest` is still a piping destination.
724+
if (!cleanedUp) {
725+
if (state.pipes.length === 1 && state.pipes[0] === dest) {
726+
debug('false write response, pause', 0);
727+
state.awaitDrainWriters = dest;
728+
state.multiAwaitDrain = false;
729+
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
730+
debug('false write response, pause', state.awaitDrainWriters.size);
731+
state.awaitDrainWriters.add(dest);
732+
}
733+
src.pause();
734+
}
735+
if (!ondrain) {
736+
// When the dest drains, it reduces the awaitDrain counter
737+
// on the source. This would be more elegant with a .once()
738+
// handler in flow(), but adding and removing repeatedly is
739+
// too slow.
740+
ondrain = pipeOnDrain(src, dest);
741+
dest.on('drain', ondrain);
742+
}
743+
}
744+
719745
src.on('data', ondata);
720746
function ondata(chunk) {
721747
debug('ondata');
722748
const ret = dest.write(chunk);
723749
debug('dest.write', ret);
724750
if (ret === false) {
725-
// If the user unpiped during `dest.write()`, it is possible
726-
// to get stuck in a permanently paused state if that write
727-
// also returned false.
728-
// => Check whether `dest` is still a piping destination.
729-
if (!cleanedUp) {
730-
if (state.pipes.length === 1 && state.pipes[0] === dest) {
731-
debug('false write response, pause', 0);
732-
state.awaitDrainWriters = dest;
733-
state.multiAwaitDrain = false;
734-
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
735-
debug('false write response, pause', state.awaitDrainWriters.size);
736-
state.awaitDrainWriters.add(dest);
737-
}
738-
src.pause();
739-
}
740-
if (!ondrain) {
741-
// When the dest drains, it reduces the awaitDrain counter
742-
// on the source. This would be more elegant with a .once()
743-
// handler in flow(), but adding and removing repeatedly is
744-
// too slow.
745-
ondrain = pipeOnDrain(src, dest);
746-
dest.on('drain', ondrain);
747-
}
751+
pause();
748752
}
749753
}
750754

@@ -793,7 +797,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
793797

794798
if (dest.writableNeedDrain === true) {
795799
if (state.flowing) {
796-
src.pause();
800+
pause();
797801
}
798802
} else if (!state.flowing) {
799803
debug('pipe resume');

test/parallel/test-stream-pipe-needDrain.js

+7-4
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ const assert = require('assert');
55
const Readable = require('_stream_readable');
66
const Writable = require('_stream_writable');
77

8-
// Pipe should not continue writing if writable needs drain.
8+
// Pipe should pause temporarily if writable needs drain.
99
{
1010
const w = new Writable({
1111
write(buf, encoding, callback) {
12-
13-
}
12+
process.nextTick(callback);
13+
},
14+
highWaterMark: 1
1415
});
1516

1617
while (w.write('asd'));
@@ -20,10 +21,12 @@ const Writable = require('_stream_writable');
2021
const r = new Readable({
2122
read() {
2223
this.push('asd');
24+
this.push(null);
2325
}
2426
});
2527

26-
w.write = common.mustNotCall();
28+
r.on('pause', common.mustCall(2));
29+
r.on('end', common.mustCall());
2730

2831
r.pipe(w);
2932
}

0 commit comments

Comments
 (0)