Skip to content

Commit 4c5e35e

Browse files
committed
stream: use bitmap in readable state
PR-URL: nodejs/node#49745 Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Moshe Atlow <[email protected]>
1 parent c9cab9b commit 4c5e35e

File tree

1 file changed

+91
-59
lines changed

1 file changed

+91
-59
lines changed

graal-nodejs/lib/internal/streams/readable.js

+91-59
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,75 @@ const nop = () => {};
8282

8383
const { errorOrDestroy } = destroyImpl;
8484

85+
const kObjectMode = 1 << 0;
86+
const kEnded = 1 << 1;
87+
const kEndEmitted = 1 << 2;
88+
const kReading = 1 << 3;
89+
const kConstructed = 1 << 4;
90+
const kSync = 1 << 5;
91+
const kNeedReadable = 1 << 6;
92+
const kEmittedReadable = 1 << 7;
93+
const kReadableListening = 1 << 8;
94+
const kResumeScheduled = 1 << 9;
95+
const kErrorEmitted = 1 << 10;
96+
const kEmitClose = 1 << 11;
97+
const kAutoDestroy = 1 << 12;
98+
const kDestroyed = 1 << 13;
99+
const kClosed = 1 << 14;
100+
const kCloseEmitted = 1 << 15;
101+
const kMultiAwaitDrain = 1 << 16;
102+
const kReadingMore = 1 << 17;
103+
const kDataEmitted = 1 << 18;
104+
105+
// TODO(benjamingr) it is likely slower to do it this way than with free functions
106+
function makeBitMapDescriptor(bit) {
107+
return {
108+
enumerable: false,
109+
get() { return (this.state & bit) !== 0; },
110+
set(value) {
111+
if (value) this.state |= bit;
112+
else this.state &= ~bit;
113+
},
114+
};
115+
}
116+
ObjectDefineProperties(ReadableState.prototype, {
117+
objectMode: makeBitMapDescriptor(kObjectMode),
118+
ended: makeBitMapDescriptor(kEnded),
119+
endEmitted: makeBitMapDescriptor(kEndEmitted),
120+
reading: makeBitMapDescriptor(kReading),
121+
// Stream is still being constructed and cannot be
122+
// destroyed until construction finished or failed.
123+
// Async construction is opt in, therefore we start as
124+
// constructed.
125+
constructed: makeBitMapDescriptor(kConstructed),
126+
// A flag to be able to tell if the event 'readable'/'data' is emitted
127+
// immediately, or on a later tick. We set this to true at first, because
128+
// any actions that shouldn't happen until "later" should generally also
129+
// not happen before the first read call.
130+
sync: makeBitMapDescriptor(kSync),
131+
// Whenever we return null, then we set a flag to say
132+
// that we're awaiting a 'readable' event emission.
133+
needReadable: makeBitMapDescriptor(kNeedReadable),
134+
emittedReadable: makeBitMapDescriptor(kEmittedReadable),
135+
readableListening: makeBitMapDescriptor(kReadableListening),
136+
resumeScheduled: makeBitMapDescriptor(kResumeScheduled),
137+
// True if the error was already emitted and should not be thrown again.
138+
errorEmitted: makeBitMapDescriptor(kErrorEmitted),
139+
emitClose: makeBitMapDescriptor(kEmitClose),
140+
autoDestroy: makeBitMapDescriptor(kAutoDestroy),
141+
// Has it been destroyed.
142+
destroyed: makeBitMapDescriptor(kDestroyed),
143+
// Indicates whether the stream has finished destroying.
144+
closed: makeBitMapDescriptor(kClosed),
145+
// True if close has been emitted or would have been emitted
146+
// depending on emitClose.
147+
closeEmitted: makeBitMapDescriptor(kCloseEmitted),
148+
multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
149+
// If true, a maybeReadMore has been scheduled.
150+
readingMore: makeBitMapDescriptor(kReadingMore),
151+
dataEmitted: makeBitMapDescriptor(kDataEmitted),
152+
});
153+
85154
function ReadableState(options, stream, isDuplex) {
86155
// Duplex streams are both readable and writable, but share
87156
// the same options object.
@@ -91,13 +160,15 @@ function ReadableState(options, stream, isDuplex) {
91160
if (typeof isDuplex !== 'boolean')
92161
isDuplex = stream instanceof Stream.Duplex;
93162

163+
// Bit map field to store ReadableState more effciently with 1 bit per field
164+
// instead of a V8 slot per field.
165+
this.state = kEmitClose | kAutoDestroy | kConstructed | kSync;
94166
// Object stream flag. Used to make read(n) ignore n and to
95167
// make all the buffer merging and length checks go away.
96-
this.objectMode = !!(options && options.objectMode);
168+
if (options && options.objectMode) this.state |= kObjectMode;
97169

98-
if (isDuplex)
99-
this.objectMode = this.objectMode ||
100-
!!(options && options.readableObjectMode);
170+
if (isDuplex && options && options.readableObjectMode)
171+
this.state |= kObjectMode;
101172

102173
// The point at which it stops calling _read() to fill the buffer
103174
// Note: 0 is a valid value, means "don't call _read preemptively ever"
@@ -112,54 +183,22 @@ function ReadableState(options, stream, isDuplex) {
112183
this.length = 0;
113184
this.pipes = [];
114185
this.flowing = null;
115-
this.ended = false;
116-
this.endEmitted = false;
117-
this.reading = false;
118-
119-
// Stream is still being constructed and cannot be
120-
// destroyed until construction finished or failed.
121-
// Async construction is opt in, therefore we start as
122-
// constructed.
123-
this.constructed = true;
124186

125-
// A flag to be able to tell if the event 'readable'/'data' is emitted
126-
// immediately, or on a later tick. We set this to true at first, because
127-
// any actions that shouldn't happen until "later" should generally also
128-
// not happen before the first read call.
129-
this.sync = true;
130-
131-
// Whenever we return null, then we set a flag to say
132-
// that we're awaiting a 'readable' event emission.
133-
this.needReadable = false;
134-
this.emittedReadable = false;
135-
this.readableListening = false;
136-
this.resumeScheduled = false;
137187
this[kPaused] = null;
138188

139-
// True if the error was already emitted and should not be thrown again.
140-
this.errorEmitted = false;
141-
142189
// Should close be emitted on destroy. Defaults to true.
143-
this.emitClose = !options || options.emitClose !== false;
190+
if (options && options.emitClose === false) this.state &= ~kEmitClose;
144191

145192
// Should .destroy() be called after 'end' (and potentially 'finish').
146-
this.autoDestroy = !options || options.autoDestroy !== false;
193+
if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy;
147194

148-
// Has it been destroyed.
149-
this.destroyed = false;
150195

151196
// Indicates whether the stream has errored. When true no further
152197
// _read calls, 'data' or 'readable' events should occur. This is needed
153198
// since when autoDestroy is disabled we need a way to tell whether the
154199
// stream has failed.
155200
this.errored = null;
156201

157-
// Indicates whether the stream has finished destroying.
158-
this.closed = false;
159-
160-
// True if close has been emitted or would have been emitted
161-
// depending on emitClose.
162-
this.closeEmitted = false;
163202

164203
// Crypto is kind of old and crusty. Historically, its default string
165204
// encoding is 'binary' so we have to make this configurable.
@@ -169,12 +208,6 @@ function ReadableState(options, stream, isDuplex) {
169208
// Ref the piped dest which we need a drain event on it
170209
// type: null | Writable | Set<Writable>.
171210
this.awaitDrainWriters = null;
172-
this.multiAwaitDrain = false;
173-
174-
// If true, a maybeReadMore has been scheduled.
175-
this.readingMore = false;
176-
177-
this.dataEmitted = false;
178211

179212
this.decoder = null;
180213
this.encoding = null;
@@ -255,7 +288,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
255288
const state = stream._readableState;
256289

257290
let err;
258-
if (!state.objectMode) {
291+
if ((state.state & kObjectMode) === 0) {
259292
if (typeof chunk === 'string') {
260293
encoding = encoding || state.defaultEncoding;
261294
if (state.encoding !== encoding) {
@@ -282,11 +315,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
282315
if (err) {
283316
errorOrDestroy(stream, err);
284317
} else if (chunk === null) {
285-
state.reading = false;
318+
state.state &= ~kReading;
286319
onEofChunk(stream, state);
287-
} else if (state.objectMode || (chunk && chunk.length > 0)) {
320+
} else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
288321
if (addToFront) {
289-
if (state.endEmitted)
322+
if ((state.state & kEndEmitted) !== 0)
290323
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
291324
else if (state.destroyed || state.errored)
292325
return false;
@@ -297,7 +330,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
297330
} else if (state.destroyed || state.errored) {
298331
return false;
299332
} else {
300-
state.reading = false;
333+
state.state &= ~kReading;
301334
if (state.decoder && !encoding) {
302335
chunk = state.decoder.write(chunk);
303336
if (state.objectMode || chunk.length !== 0)
@@ -309,7 +342,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
309342
}
310343
}
311344
} else if (!addToFront) {
312-
state.reading = false;
345+
state.state &= ~kReading;
313346
maybeReadMore(stream, state);
314347
}
315348

@@ -325,7 +358,7 @@ function addChunk(stream, state, chunk, addToFront) {
325358
stream.listenerCount('data') > 0) {
326359
// Use the guard to avoid creating `Set()` repeatedly
327360
// when we have multiple pipes.
328-
if (state.multiAwaitDrain) {
361+
if ((state.state & kMultiAwaitDrain) !== 0) {
329362
state.awaitDrainWriters.clear();
330363
} else {
331364
state.awaitDrainWriters = null;
@@ -341,7 +374,7 @@ function addChunk(stream, state, chunk, addToFront) {
341374
else
342375
state.buffer.push(chunk);
343376

344-
if (state.needReadable)
377+
if ((state.state & kNeedReadable) !== 0)
345378
emitReadable(stream);
346379
}
347380
maybeReadMore(stream, state);
@@ -396,7 +429,7 @@ function computeNewHighWaterMark(n) {
396429
function howMuchToRead(n, state) {
397430
if (n <= 0 || (state.length === 0 && state.ended))
398431
return 0;
399-
if (state.objectMode)
432+
if ((state.state & kObjectMode) !== 0)
400433
return 1;
401434
if (NumberIsNaN(n)) {
402435
// Only flow one buffer at a time.
@@ -427,7 +460,7 @@ Readable.prototype.read = function(n) {
427460
state.highWaterMark = computeNewHighWaterMark(n);
428461

429462
if (n !== 0)
430-
state.emittedReadable = false;
463+
state.state &= ~kEmittedReadable;
431464

432465
// If we're doing read(0) to trigger a readable event, but we
433466
// already have a bunch of data in the buffer, then just trigger
@@ -478,7 +511,7 @@ Readable.prototype.read = function(n) {
478511
// 3. Actually pull the requested chunks out of the buffer and return.
479512

480513
// if we need a readable event, then we need to do some reading.
481-
let doRead = state.needReadable;
514+
let doRead = (state.state & kNeedReadable) !== 0;
482515
debug('need readable', doRead);
483516

484517
// If we currently have less than the highWaterMark, then also read some.
@@ -496,20 +529,19 @@ Readable.prototype.read = function(n) {
496529
debug('reading, ended or constructing', doRead);
497530
} else if (doRead) {
498531
debug('do read');
499-
state.reading = true;
500-
state.sync = true;
532+
state.state |= kReading | kSync;
501533
// If the length is currently zero, then we *need* a readable event.
502534
if (state.length === 0)
503-
state.needReadable = true;
535+
state.state |= kNeedReadable;
504536

505537
// Call internal read method
506538
try {
507539
this._read(state.highWaterMark);
508540
} catch (err) {
509541
errorOrDestroy(this, err);
510542
}
543+
state.state &= ~kSync;
511544

512-
state.sync = false;
513545
// If _read pushed data synchronously, then `reading` will be false,
514546
// and we need to re-evaluate how much data we can return to the user.
515547
if (!state.reading)

0 commit comments

Comments
 (0)