Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: performance improvement on readline async iterator #41276

Merged
merged 25 commits into from
Oct 24, 2022
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c467ffe
lib: performance improvement on readline async iterator
Dec 20, 2021
05db8ff
lib: undoing accidental changes
Farenheith Dec 22, 2021
3ecee3c
lib: changing once to on in waitNext
Farenheith Dec 23, 2021
83919d2
lib: adding benchmarks for readline iterable
Farenheith Dec 29, 2021
03ac26d
lib: adding different lines multiplier
Farenheith Dec 29, 2021
870b5cf
lib: fixing lint on benchmark
Farenheith Dec 29, 2021
ae3325c
lib: adding test to validate slow stream
Farenheith Dec 29, 2021
bb6f6ab
lib: adding return and throw implementation to eventsToAsyncIteratorF…
Farenheith Dec 30, 2021
221e239
lib: performance improvement on readline async iterator
Dec 20, 2021
b536144
lib: undoing accidental changes
Farenheith Dec 22, 2021
31efb2d
lib: changing once to on in waitNext
Farenheith Dec 23, 2021
0e04a18
lib: fixing lint on benchmark
Farenheith Dec 29, 2021
7a8c3fa
lib: adding test to validate slow stream
Farenheith Dec 29, 2021
156837b
lib: adding return and throw implementation to eventsToAsyncIteratorF…
Farenheith Dec 30, 2021
87da1a2
lib: keep ASCII order on primordials import
Farenheith Feb 2, 2022
e7f358a
lib: changing standard for internal constants names
Farenheith Feb 2, 2022
d5917c9
lib: unifying on and eventsToAsyncIteratorFactory functions
Farenheith Feb 4, 2022
0143cdf
lib: making firstEventParam interna only
Farenheith Feb 4, 2022
917c51b
lib: refactoring function on
Farenheith Feb 5, 2022
4e8be80
lint: fixing linting problems after merge
Farenheith Sep 14, 2022
610a275
refactor: Apply suggestions from code review
Farenheith Sep 14, 2022
e1dd9d3
refactor: readding wrongly removed SymAsyncIterator definition
Farenheith Sep 14, 2022
26d066b
refactor: using ArrayPrototypePush instead of push
Farenheith Sep 14, 2022
835bb5f
lib: fixing FixedQueue.shift calls mistakenly using ArrayPrototypeShift
Farenheith Sep 15, 2022
e83baea
lib: fixing faulting PromiseResolve call
Farenheith Sep 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lib: unifying on and eventsToAsyncIteratorFactory functions
Farenheith committed Oct 18, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit d5917c96ac4525b887dd682be920cd02c084b225
124 changes: 91 additions & 33 deletions lib/events.js
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@

const {
ArrayPrototypeJoin,
ArrayPrototypeShift,
ArrayPrototypeSlice,
ArrayPrototypeSplice,
ArrayPrototypeUnshift,
@@ -33,6 +32,7 @@ const {
FunctionPrototypeBind,
FunctionPrototypeCall,
NumberIsNaN,
NumberPOSITIVE_INFINITY,
ObjectCreate,
ObjectDefineProperty,
ObjectDefineProperties,
@@ -84,7 +84,7 @@ const kErrorMonitor = Symbol('events.errorMonitor');
const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners');
const kMaxEventTargetListenersWarned =
Symbol('events.maxEventTargetListenersWarned');

let FixedQueue;
let EventEmitterAsyncResource;
// The EventEmitterAsyncResource has to be initialized lazily because event.js
// is loaded so early in the bootstrap process, before async_hooks is available.
@@ -999,7 +999,13 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
* Returns an `AsyncIterator` that iterates `event` events.
* @param {EventEmitter} emitter
* @param {string | symbol} event
* @param {{ signal: AbortSignal; }} [options]
* @param {{
* signal: AbortSignal;
* close?: string[];
* pauseThreshold?: number,
* resumeThreshold?: number,
* firstEventParam: boolean
* }} [options]
* @returns {AsyncIterator}
*/
function on(emitter, event, options) {
@@ -1008,16 +1014,44 @@ function on(emitter, event, options) {
if (signal?.aborted)
throw new AbortError(undefined, { cause: signal?.reason });

const unconsumedEvents = [];
const unconsumedPromises = [];
if (!FixedQueue) FixedQueue = require('internal/fixed_queue');

const unconsumedEvents = new FixedQueue();
const unconsumedPromises = new FixedQueue();
let error = null;
let finished = false;
const close = options?.close;
const firstEventParam = options?.firstEventParam;
let pauseThreshold = options?.pauseThreshold;
let resumeThreshold = options?.resumeThreshold;
let paused = false;
let size = 0;
if (pauseThreshold === undefined) {
pauseThreshold = NumberPOSITIVE_INFINITY;
}
if (resumeThreshold === undefined) {
resumeThreshold = 1;
}

const iterator = ObjectSetPrototypeOf({
get highWaterMark() {
return pauseThreshold;
},
get isPaused() {
return paused;
},
get queueSize() {
return size;
},
next() {
// First, we consume all unread events
const value = unconsumedEvents.shift();
if (value) {
if (size) {
const value = unconsumedEvents.shift();
size--;
if (paused && size < resumeThreshold) {
emitter.resume();
paused = false;
}
return PromiseResolve(createIterResult(value, false));
}

@@ -1033,6 +1067,7 @@ function on(emitter, event, options) {

// If the iterator is finished, resolve to done
if (finished) {
removeListeners();
return PromiseResolve(createIterResult(undefined, true));
}

@@ -1043,24 +1078,7 @@ function on(emitter, event, options) {
},

return() {
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);

if (signal) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}

finished = true;

for (const promise of unconsumedPromises) {
promise.resolve(createIterResult(undefined, true));
}

return PromiseResolve(createIterResult(undefined, true));
return closeHandler();
},

throw(err) {
@@ -1069,19 +1087,23 @@ function on(emitter, event, options) {
'Error', err);
}
error = err;
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
removeListeners();
},

[SymbolAsyncIterator]() {
return this;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part wasn't needed because this method is inherited from the parent prototype

}, AsyncIteratorPrototype);

eventTargetAgnosticAddListener(emitter, event, eventHandler);
eventTargetAgnosticAddListener(emitter, event, firstEventParam ? eventHandlerFirstParam : eventHandler);
if (event !== 'error' && typeof emitter.on === 'function') {
emitter.on('error', errorHandler);
}
if (close && close.length) {
for (let i = 0; i <= close.length; i++) {
eventTargetAgnosticAddListener(emitter, close[i], closeHandler);
}
}

if (signal) {
eventTargetAgnosticAddListener(
@@ -1093,23 +1115,59 @@ function on(emitter, event, options) {

return iterator;

function closeHandler() {
removeListeners();
finished = true;

while (!unconsumedPromises.isEmpty()) {
unconsumedPromises.shift().resolve(createIterResult(undefined, true));
}

return PromiseResolve(createIterResult(undefined, true));
}

function removeListeners() {
eventTargetAgnosticRemoveListener(emitter, event, firstEventParam ? eventHandlerFirstParam : eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
if (close && close.length) {
for (let i = 0; i <= close.length; i++) {
eventTargetAgnosticRemoveListener(emitter, close[i], closeHandler);
}
}
if (signal) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}
}

function abortListener() {
errorHandler(new AbortError(undefined, { cause: signal?.reason }));
}

function eventHandler(...args) {
const promise = ArrayPrototypeShift(unconsumedPromises);
if (promise) {
promise.resolve(createIterResult(args, false));
return eventHandlerFirstParam(args);
}

function eventHandlerFirstParam(arg) {
if (unconsumedPromises.isEmpty()) {
size++;
if (size > pauseThreshold) {
paused = true;
emitter.pause();
}
unconsumedEvents.push(arg);
} else {
unconsumedEvents.push(args);
unconsumedPromises.shift().resolve(createIterResult(arg, false));
}
}

function errorHandler(err) {
finished = true;

const toError = ArrayPrototypeShift(unconsumedPromises);
const toError = unconsumedPromises.shift();

if (toError) {
toError.reject(err);
133 changes: 0 additions & 133 deletions lib/internal/readline/eventsToAsyncIteratorFactory.js

This file was deleted.

11 changes: 5 additions & 6 deletions lib/internal/readline/interface.js
Original file line number Diff line number Diff line change
@@ -1343,12 +1343,11 @@ class Interface extends InterfaceConstructor {
*/
[SymbolAsyncIterator]() {
if (!this[kLineObjectStream]) {
this[kLineObjectStream] = require(
'internal/readline/eventsToAsyncIteratorFactory'
)(
this, {
itemEvents: ['line'],
closeEvents: ['close']
this[kLineObjectStream] = EventEmitter.on(
this, 'line', {
close: ['close'],
pauseThreshold: 1024,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pauseThreshold -> highWaterMark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Could it be pauseThreshold -> highWaterMark and resumeThreshold -> lowWaterMark? I understood that I got the best result in my testing controlling the moment to pause and to resume with two different thresholds

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ronag what do you think (about naming)?

(to tl;dr; this PR improves the performance of readline's async iterator and events.on by making the implementation faster using a FixedQueue and exposing machinery to deal with backpressure in on that isn't exposed now but will be in a future PR)

(ideally readline would just be a stream but that probably can't work)

firstEventParam: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these params won't be exposed to the public can you please use symbols for them? e.g.

const kFirstEventParam = Symbol('nodejs.kFirstEventParam');

And then pass that instead:

  [kFirstEventParam]: true

Since these aren't exposed to users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that.
But maybe it's a good thing to expose it to users, it can be useful in other situations too.
Regardless, it can be discussed in a further moment, so I'll change it to a internal symbol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjamingr the problem here is that I'm using the firstEventParam on the interface.js, so, even if I create an internal symbol into events.js, I would need to export it. Is there some special place you folks use to put internal symbols shared between files?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Farenheith the thing is we are touching a public API here (on) without a lot of feedback and since you've already been working on this PR for a while I don't want to endanger delaying it further so I would rather not expose any public API changes to on here and do it in a follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjamingr okay, I created a new file to keep this symbol inside the internal folder. I think it's enough to attend that.
Please take a look if it solves the issue and if there's anything else to fix, just let me know!

});
}
return this[kLineObjectStream];