Skip to content

Commit fa60eb8

Browse files
mcollinacodebytere
authored andcommitted
stream: correctly pause and resume after once('readable')
Fixes: #24281 PR-URL: #24366 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Franziska Hinkelmann <[email protected]>
1 parent f0602f8 commit fa60eb8

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

lib/_stream_readable.js

+12-3
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ function ReadableState(options, stream, isDuplex) {
113113
this.emittedReadable = false;
114114
this.readableListening = false;
115115
this.resumeScheduled = false;
116+
this.paused = true;
116117

117118
// Should close be emitted on destroy. Defaults to true.
118119
this.emitClose = options.emitClose !== false;
@@ -858,10 +859,16 @@ Readable.prototype.removeAllListeners = function(ev) {
858859
};
859860

860861
function updateReadableListening(self) {
861-
self._readableState.readableListening = self.listenerCount('readable') > 0;
862+
const state = self._readableState;
863+
state.readableListening = self.listenerCount('readable') > 0;
862864

863-
// crude way to check if we should resume
864-
if (self.listenerCount('data') > 0) {
865+
if (state.resumeScheduled && !state.paused) {
866+
// flowing needs to be set to true now, otherwise
867+
// the upcoming resume will not flow.
868+
state.flowing = true;
869+
870+
// crude way to check if we should resume
871+
} else if (self.listenerCount('data') > 0) {
865872
self.resume();
866873
}
867874
}
@@ -883,6 +890,7 @@ Readable.prototype.resume = function() {
883890
state.flowing = !state.readableListening;
884891
resume(this, state);
885892
}
893+
state.paused = false;
886894
return this;
887895
};
888896

@@ -913,6 +921,7 @@ Readable.prototype.pause = function() {
913921
this._readableState.flowing = false;
914922
this.emit('pause');
915923
}
924+
this._readableState.paused = true;
916925
return this;
917926
};
918927

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { Readable } = require('stream');
5+
6+
// This test verifies that a stream could be resumed after
7+
// removing the readable event in the same tick
8+
9+
check(new Readable({
10+
objectMode: true,
11+
highWaterMark: 1,
12+
read() {
13+
if (!this.first) {
14+
this.push('hello');
15+
this.first = true;
16+
return;
17+
}
18+
19+
this.push(null);
20+
}
21+
}));
22+
23+
function check(s) {
24+
const readableListener = common.mustNotCall();
25+
s.on('readable', readableListener);
26+
s.on('end', common.mustCall());
27+
s.removeListener('readable', readableListener);
28+
s.resume();
29+
}

0 commit comments

Comments
 (0)