Skip to content

Commit e2a2a3f

Browse files
ronagtargos
authored andcommitted
stream: use lazy registration for drain for fast destinations
PR-URL: #29095 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent 178caa5 commit e2a2a3f

File tree

3 files changed

+36
-15
lines changed

3 files changed

+36
-15
lines changed

lib/_stream_readable.js

+13-8
Original file line numberDiff line numberDiff line change
@@ -681,20 +681,17 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
681681
dest.end();
682682
}
683683

684-
// When the dest drains, it reduces the awaitDrain counter
685-
// on the source. This would be more elegant with a .once()
686-
// handler in flow(), but adding and removing repeatedly is
687-
// too slow.
688-
const ondrain = pipeOnDrain(src);
689-
dest.on('drain', ondrain);
684+
let ondrain;
690685

691686
var cleanedUp = false;
692687
function cleanup() {
693688
debug('cleanup');
694689
// Cleanup event handlers once the pipe is broken
695690
dest.removeListener('close', onclose);
696691
dest.removeListener('finish', onfinish);
697-
dest.removeListener('drain', ondrain);
692+
if (ondrain) {
693+
dest.removeListener('drain', ondrain);
694+
}
698695
dest.removeListener('error', onerror);
699696
dest.removeListener('unpipe', onunpipe);
700697
src.removeListener('end', onend);
@@ -708,7 +705,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
708705
// flowing again.
709706
// So, if this is awaiting a drain, then we just call it now.
710707
// If we don't know, then assume that we are waiting for one.
711-
if (state.awaitDrain &&
708+
if (ondrain && state.awaitDrain &&
712709
(!dest._writableState || dest._writableState.needDrain))
713710
ondrain();
714711
}
@@ -729,6 +726,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
729726
debug('false write response, pause', state.awaitDrain);
730727
state.awaitDrain++;
731728
}
729+
if (!ondrain) {
730+
// When the dest drains, it reduces the awaitDrain counter
731+
// on the source. This would be more elegant with a .once()
732+
// handler in flow(), but adding and removing repeatedly is
733+
// too slow.
734+
ondrain = pipeOnDrain(src);
735+
dest.on('drain', ondrain);
736+
}
732737
src.pause();
733738
}
734739
}

test/parallel/test-stream-pipe-flow.js

+23
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict';
22
const common = require('../common');
3+
const assert = require('assert');
34
const { Readable, Writable, PassThrough } = require('stream');
45

56
{
@@ -65,3 +66,25 @@ const { Readable, Writable, PassThrough } = require('stream');
6566
wrapper.resume();
6667
wrapper.on('end', common.mustCall());
6768
}
69+
70+
{
71+
// Only register drain if there is backpressure.
72+
const rs = new Readable({ read() {} });
73+
74+
const pt = rs
75+
.pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }));
76+
assert.strictEqual(pt.listenerCount('drain'), 0);
77+
pt.on('finish', () => {
78+
assert.strictEqual(pt.listenerCount('drain'), 0);
79+
});
80+
81+
rs.push('asd');
82+
assert.strictEqual(pt.listenerCount('drain'), 0);
83+
84+
process.nextTick(() => {
85+
rs.push('asd');
86+
assert.strictEqual(pt.listenerCount('drain'), 0);
87+
rs.push(null);
88+
assert.strictEqual(pt.listenerCount('drain'), 0);
89+
});
90+
}

test/parallel/test-stream2-readable-legacy-drain.js

-7
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,4 @@ function drain() {
5252

5353
w.end = common.mustCall();
5454

55-
// Just for kicks, let's mess with the drain count.
56-
// This verifies that even if it gets negative in the
57-
// pipe() cleanup function, we'll still function properly.
58-
r.on('readable', function() {
59-
w.emit('drain');
60-
});
61-
6255
r.pipe(w);

0 commit comments

Comments
 (0)