Skip to content

Commit a83c976

Browse files
ronagtargos
authored andcommitted
stream: reset flowing state if no 'readable' or 'data' listeners
If we don't have any 'readable' or 'data' listeners and we are not about to resume. Then reset flowing state to initial null state. PR-URL: #31036 Fixes: #24474 Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent a0f5207 commit a83c976

File tree

3 files changed

+58
-5
lines changed

3 files changed

+58
-5
lines changed

lib/_stream_readable.js

+21-5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const {
2828
ObjectDefineProperty,
2929
ObjectSetPrototypeOf,
3030
SymbolAsyncIterator,
31+
Symbol
3132
} = primordials;
3233

3334
module.exports = Readable;
@@ -51,6 +52,8 @@ const {
5152
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
5253
} = require('internal/errors').codes;
5354

55+
const kPaused = Symbol('kPaused');
56+
5457
// Lazy loaded to improve the startup performance.
5558
let StringDecoder;
5659
let createReadableStreamAsyncIterator;
@@ -127,7 +130,7 @@ function ReadableState(options, stream, isDuplex) {
127130
this.emittedReadable = false;
128131
this.readableListening = false;
129132
this.resumeScheduled = false;
130-
this.paused = true;
133+
this[kPaused] = null;
131134

132135
// Should close be emitted on destroy. Defaults to true.
133136
this.emitClose = !options || options.emitClose !== false;
@@ -159,6 +162,16 @@ function ReadableState(options, stream, isDuplex) {
159162
}
160163
}
161164

165+
// Legacy property for `paused`
166+
ObjectDefineProperty(ReadableState.prototype, 'paused', {
167+
get() {
168+
return this[kPaused] !== false;
169+
},
170+
set(value) {
171+
this[kPaused] = !!value;
172+
}
173+
});
174+
162175
function Readable(options) {
163176
if (!(this instanceof Readable))
164177
return new Readable(options);
@@ -348,7 +361,8 @@ function chunkInvalid(state, chunk) {
348361

349362

350363
Readable.prototype.isPaused = function() {
351-
return this._readableState.flowing === false;
364+
const state = this._readableState;
365+
return state[kPaused] === true || state.flowing === false;
352366
};
353367

354368
// Backwards compatibility.
@@ -947,14 +961,16 @@ function updateReadableListening(self) {
947961
const state = self._readableState;
948962
state.readableListening = self.listenerCount('readable') > 0;
949963

950-
if (state.resumeScheduled && !state.paused) {
964+
if (state.resumeScheduled && state[kPaused] === false) {
951965
// Flowing needs to be set to true now, otherwise
952966
// the upcoming resume will not flow.
953967
state.flowing = true;
954968

955969
// Crude way to check if we should resume
956970
} else if (self.listenerCount('data') > 0) {
957971
self.resume();
972+
} else if (!state.readableListening) {
973+
state.flowing = null;
958974
}
959975
}
960976

@@ -975,7 +991,7 @@ Readable.prototype.resume = function() {
975991
state.flowing = !state.readableListening;
976992
resume(this, state);
977993
}
978-
state.paused = false;
994+
state[kPaused] = false;
979995
return this;
980996
};
981997

@@ -1006,7 +1022,7 @@ Readable.prototype.pause = function() {
10061022
this._readableState.flowing = false;
10071023
this.emit('pause');
10081024
}
1009-
this._readableState.paused = true;
1025+
this._readableState[kPaused] = true;
10101026
return this;
10111027
};
10121028

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
'use strict';
2+
const common = require('../common');
3+
4+
const { Readable } = require('stream');
5+
6+
const readable = new Readable({
7+
read() {}
8+
});
9+
10+
function read() {}
11+
12+
readable.setEncoding('utf8');
13+
readable.on('readable', read);
14+
readable.removeListener('readable', read);
15+
16+
process.nextTick(function() {
17+
readable.on('data', common.mustCall());
18+
readable.push('hello');
19+
});

test/parallel/test-stream-readable-pause-and-resume.js

+18
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const common = require('../common');
4+
const assert = require('assert');
45
const { Readable } = require('stream');
56

67
let ticks = 18;
@@ -38,3 +39,20 @@ function readAndPause() {
3839

3940
rs.on('data', ondata);
4041
}
42+
43+
{
44+
const readable = new Readable({
45+
read() {}
46+
});
47+
48+
function read() {}
49+
50+
readable.setEncoding('utf8');
51+
readable.on('readable', read);
52+
readable.removeListener('readable', read);
53+
readable.pause();
54+
55+
process.nextTick(function() {
56+
assert(readable.isPaused());
57+
});
58+
}

0 commit comments

Comments
 (0)