From 1cea18d29f09a64f06e9aea70e22ab82e984afd0 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Thu, 21 Sep 2023 10:23:58 +0000 Subject: [PATCH 01/10] stream: use bitmap in readable state --- lib/internal/streams/readable.js | 57 ++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 1b40192d9458ba..bc0c68e4295ffd 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -83,6 +83,59 @@ const nop = () => {}; const { errorOrDestroy } = destroyImpl; +const kObjectMode = 1 << 0; +const kEnded = 1 << 1; +const kEndEmitted = 1 << 2; +const kReading = 1 << 3; +const kConstructed = 1 << 4; +const kSync = 1 << 5; +const kNeedReadable = 1 << 6; +const kEmittedReadable = 1 << 7; +const kReadableListening = 1 << 8; +const kResumeScheduled = 1 << 9; +const kErrorEmitted = 1 << 10; +const kEmitClose = 1 << 11; +const kAutoDestroy = 1 << 12; +const kDestroyed = 1 << 13; +const kClosed = 1 << 14; +const kCloseEmitted = 1 << 15; +const kMultiAwaitDrain = 1 << 16; +const kReadingMore = 1 << 17; +const kDataEmitted = 1 << 18; + +// TODO(benjamingr) it is likely slower to do it this way than with free functions +function makeBitMapDescriptor(bit) { + return { + enumerable: true, + get() { return (this.flags & bit) !== 0; }, + set(value) { + if (value) this.flags |= bit; + else this.flags &= ~bit; + }, + }; +} +ObjectDefineProperties(ReadableState.prototype, { + objectMode: makeBitMapDescriptor(kObjectMode), + ended: makeBitMapDescriptor(kEnded), + endEmitted: makeBitMapDescriptor(kEndEmitted), + reading: makeBitMapDescriptor(kReading), + constructed: makeBitMapDescriptor(kConstructed), + sync: makeBitMapDescriptor(kSync), + needReadable: makeBitMapDescriptor(kNeedReadable), + emittedReadable: makeBitMapDescriptor(kEmittedReadable), + readableListening: makeBitMapDescriptor(kReadableListening), + resumeScheduled: makeBitMapDescriptor(kResumeScheduled), + errorEmitted: makeBitMapDescriptor(kErrorEmitted), + emitClose: makeBitMapDescriptor(kEmitClose), + autoDestroy: makeBitMapDescriptor(kAutoDestroy), + destroyed: makeBitMapDescriptor(kDestroyed), + closed: makeBitMapDescriptor(kClosed), + closeEmitted: makeBitMapDescriptor(kCloseEmitted), + multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain), + readingMore: makeBitMapDescriptor(kReadingMore), + dataEmitted: makeBitMapDescriptor(kDataEmitted), +}); + function ReadableState(options, stream, isDuplex) { // Duplex streams are both readable and writable, but share // the same options object. @@ -92,6 +145,10 @@ function ReadableState(options, stream, isDuplex) { if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Stream.Duplex; + // Bit map field to store ReadableState more effciently with 1 bit per field + // instead of a V8 slot per field. + this.state = 0; + // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. this.objectMode = !!(options && options.objectMode); From e2d09e3921c5ab6a97493474beb46989586a2f78 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Thu, 21 Sep 2023 11:03:12 +0000 Subject: [PATCH 02/10] fixup! dont let copilot generate code --- lib/internal/streams/readable.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index bc0c68e4295ffd..6bc282a5509781 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -107,10 +107,10 @@ const kDataEmitted = 1 << 18; function makeBitMapDescriptor(bit) { return { enumerable: true, - get() { return (this.flags & bit) !== 0; }, + get() { return (this.state & bit) !== 0; }, set(value) { - if (value) this.flags |= bit; - else this.flags &= ~bit; + if (value) this.state |= bit; + else this.state &= ~bit; }, }; } @@ -148,10 +148,10 @@ function ReadableState(options, stream, isDuplex) { // Bit map field to store ReadableState more effciently with 1 bit per field // instead of a V8 slot per field. this.state = 0; - // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. this.objectMode = !!(options && options.objectMode); + throw ("get", this.__proto__.objectMode) if (isDuplex) this.objectMode = this.objectMode || From f6a09c74c3c6a914151483b29e520dde949b4f15 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Thu, 21 Sep 2023 11:03:38 +0000 Subject: [PATCH 03/10] fixup! dont let copilot generate code --- lib/internal/streams/readable.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 6bc282a5509781..ca380dd131dc79 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -151,7 +151,6 @@ function ReadableState(options, stream, isDuplex) { // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. this.objectMode = !!(options && options.objectMode); - throw ("get", this.__proto__.objectMode) if (isDuplex) this.objectMode = this.objectMode || From e2e1f773e566784cb87c74b687f20af43796ef93 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Thu, 21 Sep 2023 12:39:12 +0000 Subject: [PATCH 04/10] a bit more testing --- lib/internal/streams/readable.js | 79 +++++++++++--------------------- 1 file changed, 28 insertions(+), 51 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index ca380dd131dc79..899923396f80a9 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -106,7 +106,7 @@ const kDataEmitted = 1 << 18; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { return { - enumerable: true, + enumerable: false, get() { return (this.state & bit) !== 0; }, set(value) { if (value) this.state |= bit; @@ -119,19 +119,35 @@ ObjectDefineProperties(ReadableState.prototype, { ended: makeBitMapDescriptor(kEnded), endEmitted: makeBitMapDescriptor(kEndEmitted), reading: makeBitMapDescriptor(kReading), + // Stream is still being constructed and cannot be + // destroyed until construction finished or failed. + // Async construction is opt in, therefore we start as + // constructed. constructed: makeBitMapDescriptor(kConstructed), + // A flag to be able to tell if the event 'readable'/'data' is emitted + // immediately, or on a later tick. We set this to true at first, because + // any actions that shouldn't happen until "later" should generally also + // not happen before the first read call. sync: makeBitMapDescriptor(kSync), + // Whenever we return null, then we set a flag to say + // that we're awaiting a 'readable' event emission. needReadable: makeBitMapDescriptor(kNeedReadable), emittedReadable: makeBitMapDescriptor(kEmittedReadable), readableListening: makeBitMapDescriptor(kReadableListening), resumeScheduled: makeBitMapDescriptor(kResumeScheduled), + // True if the error was already emitted and should not be thrown again. errorEmitted: makeBitMapDescriptor(kErrorEmitted), emitClose: makeBitMapDescriptor(kEmitClose), autoDestroy: makeBitMapDescriptor(kAutoDestroy), + // Has it been destroyed. destroyed: makeBitMapDescriptor(kDestroyed), + // Indicates whether the stream has finished destroying. closed: makeBitMapDescriptor(kClosed), + // True if close has been emitted or would have been emitted + // depending on emitClose. closeEmitted: makeBitMapDescriptor(kCloseEmitted), multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain), + // If true, a maybeReadMore has been scheduled. readingMore: makeBitMapDescriptor(kReadingMore), dataEmitted: makeBitMapDescriptor(kDataEmitted), }); @@ -147,14 +163,13 @@ function ReadableState(options, stream, isDuplex) { // Bit map field to store ReadableState more effciently with 1 bit per field // instead of a V8 slot per field. - this.state = 0; + this.state = kEmitClose | kAutoDestroy | kConstructed | kSync; // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. - this.objectMode = !!(options && options.objectMode); + if (options && options.objectMode) this.state |= kObjectMode; - if (isDuplex) - this.objectMode = this.objectMode || - !!(options && options.readableObjectMode); + if (isDuplex && options && options.readableObjectMode) + this.state |= options.readableObjectMode; // The point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" @@ -169,41 +184,15 @@ function ReadableState(options, stream, isDuplex) { this.length = 0; this.pipes = []; this.flowing = null; - this.ended = false; - this.endEmitted = false; - this.reading = false; - // Stream is still being constructed and cannot be - // destroyed until construction finished or failed. - // Async construction is opt in, therefore we start as - // constructed. - this.constructed = true; - - // A flag to be able to tell if the event 'readable'/'data' is emitted - // immediately, or on a later tick. We set this to true at first, because - // any actions that shouldn't happen until "later" should generally also - // not happen before the first read call. - this.sync = true; - - // Whenever we return null, then we set a flag to say - // that we're awaiting a 'readable' event emission. - this.needReadable = false; - this.emittedReadable = false; - this.readableListening = false; - this.resumeScheduled = false; this[kPaused] = null; - // True if the error was already emitted and should not be thrown again. - this.errorEmitted = false; - // Should close be emitted on destroy. Defaults to true. - this.emitClose = !options || options.emitClose !== false; + if (options && options.emitClose === false) this.state &= ~kEmitClose; // Should .destroy() be called after 'end' (and potentially 'finish'). - this.autoDestroy = !options || options.autoDestroy !== false; + if (options || options.autoDestroy === false) this.state &= ~kAutoDestroy; - // Has it been destroyed. - this.destroyed = false; // Indicates whether the stream has errored. When true no further // _read calls, 'data' or 'readable' events should occur. This is needed @@ -211,12 +200,6 @@ function ReadableState(options, stream, isDuplex) { // stream has failed. this.errored = null; - // Indicates whether the stream has finished destroying. - this.closed = false; - - // True if close has been emitted or would have been emitted - // depending on emitClose. - this.closeEmitted = false; // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. @@ -233,12 +216,6 @@ function ReadableState(options, stream, isDuplex) { // Ref the piped dest which we need a drain event on it // type: null | Writable | Set. this.awaitDrainWriters = null; - this.multiAwaitDrain = false; - - // If true, a maybeReadMore has been scheduled. - this.readingMore = false; - - this.dataEmitted = false; this.decoder = null; this.encoding = null; @@ -319,7 +296,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { const state = stream._readableState; let err; - if (!state.objectMode) { + if (state.state & kObjectMode !== 0) { if (typeof chunk === 'string') { encoding = encoding || state.defaultEncoding; if (state.encoding !== encoding) { @@ -346,11 +323,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { if (err) { errorOrDestroy(stream, err); } else if (chunk === null) { - state.reading = false; + state.state &= ~kReading; onEofChunk(stream, state); - } else if (state.objectMode || (chunk && chunk.length > 0)) { + } else if (((state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { if (addToFront) { - if (state.endEmitted) + if ((state & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); else if (state.destroyed || state.errored) return false; @@ -373,7 +350,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } } } else if (!addToFront) { - state.reading = false; + state.state &= ~kReading; maybeReadMore(stream, state); } From b14ba7430bf2f2578598a92a402fd7c17f6716a2 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Thu, 21 Sep 2023 12:46:15 +0000 Subject: [PATCH 05/10] fixup! convert more to bitmap --- lib/internal/streams/readable.js | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 899923396f80a9..91c69b68e8b2e3 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -296,7 +296,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { const state = stream._readableState; let err; - if (state.state & kObjectMode !== 0) { + if ((state.state & kObjectMode) !== 0) { if (typeof chunk === 'string') { encoding = encoding || state.defaultEncoding; if (state.encoding !== encoding) { @@ -325,9 +325,9 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } else if (chunk === null) { state.state &= ~kReading; onEofChunk(stream, state); - } else if (((state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { + } else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { if (addToFront) { - if ((state & kEndEmitted) !== 0) + if ((state.state & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); else if (state.destroyed || state.errored) return false; @@ -338,7 +338,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } else if (state.destroyed || state.errored) { return false; } else { - state.reading = false; + state.state &= ~kReading; if (state.decoder && !encoding) { chunk = state.decoder.write(chunk); if (state.objectMode || chunk.length !== 0) @@ -366,7 +366,7 @@ function addChunk(stream, state, chunk, addToFront) { stream.listenerCount('data') > 0) { // Use the guard to avoid creating `Set()` repeatedly // when we have multiple pipes. - if (state.multiAwaitDrain) { + if ((state.state & kMultiAwaitDrain) !== 0) { state.awaitDrainWriters.clear(); } else { state.awaitDrainWriters = null; @@ -382,7 +382,7 @@ function addChunk(stream, state, chunk, addToFront) { else state.buffer.push(chunk); - if (state.needReadable) + if ((state.state & kNeedReadable) !== 0) emitReadable(stream); } maybeReadMore(stream, state); @@ -437,7 +437,7 @@ function computeNewHighWaterMark(n) { function howMuchToRead(n, state) { if (n <= 0 || (state.length === 0 && state.ended)) return 0; - if (state.objectMode) + if ((state.state & kObjectMode !== 0)) return 1; if (NumberIsNaN(n)) { // Only flow one buffer at a time. @@ -468,9 +468,9 @@ Readable.prototype.read = function(n) { state.highWaterMark = computeNewHighWaterMark(n); if (n !== 0) - state.emittedReadable = false; - - // If we're doing read(0) to trigger a readable event, but we + state.state &= ~kEmittedReadable; + + // If we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger // the 'readable' event and move on. if (n === 0 && @@ -519,7 +519,7 @@ Readable.prototype.read = function(n) { // 3. Actually pull the requested chunks out of the buffer and return. // if we need a readable event, then we need to do some reading. - let doRead = state.needReadable; + let doRead = (state.state & kNeedReadable) !== 0; debug('need readable', doRead); // If we currently have less than the highWaterMark, then also read some. @@ -537,11 +537,10 @@ Readable.prototype.read = function(n) { debug('reading, ended or constructing', doRead); } else if (doRead) { debug('do read'); - state.reading = true; - state.sync = true; + state.state |= kReading | kSync; // If the length is currently zero, then we *need* a readable event. if (state.length === 0) - state.needReadable = true; + state.state |= kNeedReadable; // Call internal read method try { @@ -549,8 +548,8 @@ Readable.prototype.read = function(n) { } catch (err) { errorOrDestroy(this, err); } + state.state &= ~kSync; - state.sync = false; // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. if (!state.reading) From 5013d2500b9f63bb6e1da0d2cfa699d8abcd8220 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Thu, 21 Sep 2023 13:57:00 +0000 Subject: [PATCH 06/10] fixup! --- lib/internal/streams/readable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 91c69b68e8b2e3..c05cb4f513eab2 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -191,7 +191,7 @@ function ReadableState(options, stream, isDuplex) { if (options && options.emitClose === false) this.state &= ~kEmitClose; // Should .destroy() be called after 'end' (and potentially 'finish'). - if (options || options.autoDestroy === false) this.state &= ~kAutoDestroy; + if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy; // Indicates whether the stream has errored. When true no further From 4fa5f37503129a3dfeb2666919400f91a449429d Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Thu, 21 Sep 2023 17:08:52 +0000 Subject: [PATCH 07/10] fixup! inverted f --- lib/internal/streams/readable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index c05cb4f513eab2..ac433933ee8963 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -296,7 +296,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { const state = stream._readableState; let err; - if ((state.state & kObjectMode) !== 0) { + if ((state.state & kObjectMode) === 0) { if (typeof chunk === 'string') { encoding = encoding || state.defaultEncoding; if (state.encoding !== encoding) { From b848c8c1922d42f05905526c4a4600cb56fffd31 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Thu, 21 Sep 2023 18:10:12 +0000 Subject: [PATCH 08/10] fixup! tests pass, run linter --- lib/internal/streams/readable.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index ac433933ee8963..cdb3278ee3714f 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -469,8 +469,8 @@ Readable.prototype.read = function(n) { if (n !== 0) state.state &= ~kEmittedReadable; - - // If we're doing read(0) to trigger a readable event, but we + + // If we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger // the 'readable' event and move on. if (n === 0 && From 90a48c46ae40f2dffc220c8a529b04827db64e7f Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Fri, 22 Sep 2023 17:00:43 +0300 Subject: [PATCH 09/10] fixup! Update lib/internal/streams/readable.js --- lib/internal/streams/readable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index cdb3278ee3714f..52491ec1907f5b 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -169,7 +169,7 @@ function ReadableState(options, stream, isDuplex) { if (options && options.objectMode) this.state |= kObjectMode; if (isDuplex && options && options.readableObjectMode) - this.state |= options.readableObjectMode; + this.state |= kObjectMode; // The point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" From 3882ebd7c3ee8eb865810713e0fa35fa57c81e49 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Sat, 23 Sep 2023 14:24:33 +0300 Subject: [PATCH 10/10] fixup! Update lib/internal/streams/readable.js Co-authored-by: Robert Nagy --- lib/internal/streams/readable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 52491ec1907f5b..49df23cba9f4c2 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -437,7 +437,7 @@ function computeNewHighWaterMark(n) { function howMuchToRead(n, state) { if (n <= 0 || (state.length === 0 && state.ended)) return 0; - if ((state.state & kObjectMode !== 0)) + if ((state.state & kObjectMode) !== 0) return 1; if (NumberIsNaN(n)) { // Only flow one buffer at a time.