Skip to content

Commit 9d52430

Browse files
ronagUlisesGascon
authored andcommitted
stream: readable use bitmap accessors
PR-URL: #50350 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Raz Luvaton <[email protected]>
1 parent 586becb commit 9d52430

File tree

1 file changed

+31
-27
lines changed

1 file changed

+31
-27
lines changed

lib/internal/streams/readable.js

+31-27
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
431431
function readableAddChunkUnshiftValue(stream, state, chunk) {
432432
if ((state[kState] & kEndEmitted) !== 0)
433433
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
434-
else if (state.destroyed || state.errored)
434+
else if ((state[kState] & (kDestroyed | kErrored)) !== 0)
435435
return false;
436436
else
437437
addChunk(stream, state, chunk, true);
@@ -608,7 +608,7 @@ function computeNewHighWaterMark(n) {
608608
// This function is designed to be inlinable, so please take care when making
609609
// changes to the function body.
610610
function howMuchToRead(n, state) {
611-
if (n <= 0 || (state.length === 0 && state.ended))
611+
if (n <= 0 || (state.length === 0 && (state[kState] & kEnded) !== 0))
612612
return 0;
613613
if ((state[kState] & kObjectMode) !== 0)
614614
return 1;
@@ -652,7 +652,7 @@ Readable.prototype.read = function(n) {
652652
state.length >= state.highWaterMark :
653653
state.length > 0) ||
654654
(state[kState] & kEnded) !== 0)) {
655-
debug('read: emitReadable', state.length, (state[kState] & kEnded) !== 0);
655+
debug('read: emitReadable');
656656
if (state.length === 0 && (state[kState] & kEnded) !== 0)
657657
endReadable(this);
658658
else
@@ -810,7 +810,7 @@ function emitReadable(stream) {
810810
function emitReadable_(stream) {
811811
const state = stream._readableState;
812812
debug('emitReadable_');
813-
if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || state.ended)) {
813+
if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || (state[kState] & kEnded) !== 0)) {
814814
stream.emit('readable');
815815
state[kState] &= ~kEmittedReadable;
816816
}
@@ -891,7 +891,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
891891
const state = this._readableState;
892892

893893
if (state.pipes.length === 1) {
894-
if (!state.multiAwaitDrain) {
894+
if ((state[kState] & kMultiAwaitDrain) === 0) {
895895
state[kState] |= kMultiAwaitDrain;
896896
state.awaitDrainWriters = new SafeSet(
897897
state.awaitDrainWriters ? [state.awaitDrainWriters] : [],
@@ -907,7 +907,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
907907
dest !== process.stderr;
908908

909909
const endFn = doEnd ? onend : unpipe;
910-
if (state.endEmitted)
910+
if ((state[kState] & kEndEmitted) !== 0)
911911
process.nextTick(endFn);
912912
else
913913
src.once('end', endFn);
@@ -966,7 +966,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
966966
if (state.pipes.length === 1 && state.pipes[0] === dest) {
967967
debug('false write response, pause', 0);
968968
state.awaitDrainWriters = dest;
969-
state.multiAwaitDrain = false;
969+
state[kState] &= ~kMultiAwaitDrain;
970970
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
971971
debug('false write response, pause', state.awaitDrainWriters.size);
972972
state.awaitDrainWriters.add(dest);
@@ -1038,7 +1038,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
10381038

10391039
if (dest.writableNeedDrain === true) {
10401040
pause();
1041-
} else if (!state.flowing) {
1041+
} else if ((state[kState] & kFlowing) === 0) {
10421042
debug('pipe resume');
10431043
src.resume();
10441044
}
@@ -1056,7 +1056,7 @@ function pipeOnDrain(src, dest) {
10561056
if (state.awaitDrainWriters === dest) {
10571057
debug('pipeOnDrain', 1);
10581058
state.awaitDrainWriters = null;
1059-
} else if (state.multiAwaitDrain) {
1059+
} else if ((state[kState] & kMultiAwaitDrain) !== 0) {
10601060
debug('pipeOnDrain', state.awaitDrainWriters.size);
10611061
state.awaitDrainWriters.delete(dest);
10621062
}
@@ -1111,20 +1111,20 @@ Readable.prototype.on = function(ev, fn) {
11111111
if (ev === 'data') {
11121112
// Update readableListening so that resume() may be a no-op
11131113
// a few lines down. This is needed to support once('readable').
1114-
state.readableListening = this.listenerCount('readable') > 0;
1114+
state[kState] |= this.listenerCount('readable') > 0 ? kReadableListening : 0;
11151115

11161116
// Try start flowing on next tick if stream isn't explicitly paused.
1117-
if (state.flowing !== false)
1117+
if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) {
11181118
this.resume();
1119+
}
11191120
} else if (ev === 'readable') {
1120-
if (!state.endEmitted && !state.readableListening) {
1121-
state.readableListening = state.needReadable = true;
1122-
state.flowing = false;
1123-
state.emittedReadable = false;
1124-
debug('on readable', state.length, state.reading);
1121+
if ((state[kState] & (kEndEmitted | kReadableListening)) === 0) {
1122+
state[kState] |= kReadableListening | kNeedReadable | kHasFlowing;
1123+
state[kState] &= ~(kFlowing | kEmittedReadable);
1124+
debug('on readable');
11251125
if (state.length) {
11261126
emitReadable(this);
1127-
} else if (!state.reading) {
1127+
} else if ((state[kState] & kReading) === 0) {
11281128
process.nextTick(nReadingNextTick, this);
11291129
}
11301130
}
@@ -1171,7 +1171,12 @@ Readable.prototype.removeAllListeners = function(ev) {
11711171

11721172
function updateReadableListening(self) {
11731173
const state = self._readableState;
1174-
state.readableListening = self.listenerCount('readable') > 0;
1174+
1175+
if (self.listenerCount('readable') > 0) {
1176+
state[kState] |= kReadableListening;
1177+
} else {
1178+
state[kState] &= ~kReadableListening;
1179+
}
11751180

11761181
if ((state[kState] & (kHasPaused | kPaused | kResumeScheduled)) === (kHasPaused | kResumeScheduled)) {
11771182
// Flowing needs to be set to true now, otherwise
@@ -1201,7 +1206,7 @@ Readable.prototype.resume = function() {
12011206
// for readable, but we still have to call
12021207
// resume().
12031208
state[kState] |= kHasFlowing;
1204-
if (!state.readableListening) {
1209+
if ((state[kState] & kReadableListening) === 0) {
12051210
state[kState] |= kFlowing;
12061211
} else {
12071212
state[kState] &= ~kFlowing;
@@ -1214,8 +1219,8 @@ Readable.prototype.resume = function() {
12141219
};
12151220

12161221
function resume(stream, state) {
1217-
if (!state.resumeScheduled) {
1218-
state.resumeScheduled = true;
1222+
if ((state[kState] & kResumeScheduled) === 0) {
1223+
state[kState] |= kResumeScheduled;
12191224
process.nextTick(resume_, stream, state);
12201225
}
12211226
}
@@ -1236,7 +1241,7 @@ function resume_(stream, state) {
12361241
Readable.prototype.pause = function() {
12371242
const state = this._readableState;
12381243
debug('call pause');
1239-
if (state.flowing !== false) {
1244+
if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) {
12401245
debug('pause');
12411246
state[kState] |= kHasFlowing;
12421247
state[kState] &= ~kFlowing;
@@ -1651,20 +1656,19 @@ function fromList(n, state) {
16511656
function endReadable(stream) {
16521657
const state = stream._readableState;
16531658

1654-
debug('endReadable', (state[kState] & kEndEmitted) !== 0);
1659+
debug('endReadable');
16551660
if ((state[kState] & kEndEmitted) === 0) {
16561661
state[kState] |= kEnded;
16571662
process.nextTick(endReadableNT, state, stream);
16581663
}
16591664
}
16601665

16611666
function endReadableNT(state, stream) {
1662-
debug('endReadableNT', state.endEmitted, state.length);
1667+
debug('endReadableNT');
16631668

16641669
// Check that we didn't get one last unshift.
1665-
if (!state.errored && !state.closeEmitted &&
1666-
!state.endEmitted && state.length === 0) {
1667-
state.endEmitted = true;
1670+
if ((state[kState] & (kErrored | kCloseEmitted | kEndEmitted)) === 0 && state.length === 0) {
1671+
state[kState] |= kEndEmitted;
16681672
stream.emit('end');
16691673

16701674
if (stream.writable && stream.allowHalfOpen === false) {

0 commit comments

Comments
 (0)