diff --git a/benchmark/readline/readline-iterable.js b/benchmark/readline/readline-iterable.js new file mode 100644 index 00000000000000..6dc73b7669962a --- /dev/null +++ b/benchmark/readline/readline-iterable.js @@ -0,0 +1,47 @@ +'use strict'; +const common = require('../common.js'); +const readline = require('readline'); +const { Readable } = require('stream'); + +const bench = common.createBenchmark(main, { + n: [1e1, 1e2, 1e3, 1e4, 1e5, 1e6], +}); + +const loremIpsum = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. +Dui accumsan sit amet nulla facilisi morbi tempus iaculis urna. +Eget dolor morbi non arcu risus quis varius quam quisque. +Lacus viverra vitae congue eu consequat ac felis donec. +Amet porttitor eget dolor morbi non arcu. +Velit ut tortor pretium viverra suspendisse. +Mauris nunc congue nisi vitae suscipit tellus. +Amet nisl suscipit adipiscing bibendum est ultricies integer. +Sit amet dictum sit amet justo donec enim diam. +Condimentum mattis pellentesque id nibh tortor id aliquet lectus proin. +Diam in arcu cursus euismod quis viverra nibh. +Rest of line`; + +function getLoremIpsumStream(repetitions) { + const readable = Readable({ + objectMode: true, + }); + let i = 0; + readable._read = () => readable.push( + i++ >= repetitions ? null : loremIpsum + ); + return readable; +} + +async function main({ n }) { + bench.start(); + let lineCount = 0; + + const iterable = readline.createInterface({ + input: getLoremIpsumStream(n), + }); + + // eslint-disable-next-line no-unused-vars + for await (const _ of iterable) { + lineCount++; + } + bench.end(lineCount); +} diff --git a/lib/events.js b/lib/events.js index f722b17aecae0d..db1aec8964eb6d 100644 --- a/lib/events.js +++ b/lib/events.js @@ -24,7 +24,6 @@ const { ArrayPrototypeIndexOf, ArrayPrototypeJoin, - ArrayPrototypeShift, ArrayPrototypeSlice, ArrayPrototypeSplice, ArrayPrototypeUnshift, @@ -39,20 +38,18 @@ const { ObjectDefineProperty, ObjectDefineProperties, ObjectGetPrototypeOf, - ObjectSetPrototypeOf, Promise, PromiseReject, - PromiseResolve, ReflectApply, ReflectOwnKeys, String, StringPrototypeSplit, Symbol, SymbolFor, - SymbolAsyncIterator, } = primordials; const kRejection = SymbolFor('nodejs.rejection'); const { inspect } = require('internal/util/inspect'); +const FixedQueue = require('internal/fixed_queue'); let spliceOne; @@ -976,13 +973,6 @@ async function once(emitter, name, options = {}) { }); } -const AsyncIteratorPrototype = ObjectGetPrototypeOf( - ObjectGetPrototypeOf(async function* () {}).prototype); - -function createIterResult(value, done) { - return { value, done }; -} - function eventTargetAgnosticRemoveListener(emitter, name, listener, flags) { if (typeof emitter.removeListener === 'function') { emitter.removeListener(name, listener); @@ -1017,80 +1007,16 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) { * @returns {AsyncIterator} */ function on(emitter, event, options) { + const queue = new FixedQueue(); + let resume = null; + let error = null; + let errored = false; + const signal = options?.signal; validateAbortSignal(signal, 'options.signal'); - if (signal?.aborted) + if (signal?.aborted) { throw new AbortError(undefined, { cause: signal?.reason }); - - const unconsumedEvents = []; - const unconsumedPromises = []; - let error = null; - let finished = false; - - const iterator = ObjectSetPrototypeOf({ - next() { - // First, we consume all unread events - const value = unconsumedEvents.shift(); - if (value) { - return PromiseResolve(createIterResult(value, false)); - } - - // Then we error, if an error happened - // This happens one time if at all, because after 'error' - // we stop listening - if (error) { - const p = PromiseReject(error); - // Only the first element errors - error = null; - return p; - } - - // If the iterator is finished, resolve to done - if (finished) { - return PromiseResolve(createIterResult(undefined, true)); - } - - // Wait until an event happens - return new Promise(function(resolve, reject) { - unconsumedPromises.push({ resolve, reject }); - }); - }, - - return() { - eventTargetAgnosticRemoveListener(emitter, event, eventHandler); - eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); - - if (signal) { - eventTargetAgnosticRemoveListener( - signal, - 'abort', - abortListener, - { once: true }); - } - - finished = true; - - for (const promise of unconsumedPromises) { - promise.resolve(createIterResult(undefined, true)); - } - - return PromiseResolve(createIterResult(undefined, true)); - }, - - throw(err) { - if (!err || !(err instanceof Error)) { - throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator', - 'Error', err); - } - error = err; - eventTargetAgnosticRemoveListener(emitter, event, eventHandler); - eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); - }, - - [SymbolAsyncIterator]() { - return this; - } - }, AsyncIteratorPrototype); + } eventTargetAgnosticAddListener(emitter, event, eventHandler); if (event !== 'error' && typeof emitter.on === 'function') { @@ -1105,33 +1031,51 @@ function on(emitter, event, options) { { once: true }); } - return iterator; + function errorHandler(err) { + error = err; + errored = true; + + resume?.(PromiseReject(err)); + resume = null; + } function abortListener() { errorHandler(new AbortError(undefined, { cause: signal?.reason })); } function eventHandler(...args) { - const promise = ArrayPrototypeShift(unconsumedPromises); - if (promise) { - promise.resolve(createIterResult(args, false)); - } else { - unconsumedEvents.push(args); - } + queue.push(args); + + resume?.(); + resume = null; } - function errorHandler(err) { - finished = true; + return async function * () { + try { + while (true) { + if (errored) { + throw error; + } - const toError = ArrayPrototypeShift(unconsumedPromises); + if (!queue.isEmpty()) { + yield queue.shift(); + } else { + await new Promise((resolve) => { + resume = resolve; + }); + } + } + } finally { + eventTargetAgnosticRemoveListener(emitter, event, eventHandler); + eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); - if (toError) { - toError.reject(err); - } else { - // The next time we call next() - error = err; + if (signal) { + eventTargetAgnosticRemoveListener( + signal, + 'abort', + abortListener, + { once: true }); + } } - - iterator.return(); - } + }(); } diff --git a/test/parallel/test-events-on-async-iterator.js b/test/parallel/test-events-on-async-iterator.js index dbd27a8a44693e..e7d5c416eaeeab 100644 --- a/test/parallel/test-events-on-async-iterator.js +++ b/test/parallel/test-events-on-async-iterator.js @@ -25,7 +25,7 @@ async function basic() { for await (const event of iterable) { const current = expected.shift(); - assert.deepStrictEqual(current, event); + assert.deepStrictEqual(event, current); if (expected.length === 0) { break; @@ -113,39 +113,6 @@ async function throwInLoop() { assert.strictEqual(ee.listenerCount('error'), 0); } -async function next() { - const ee = new EventEmitter(); - const iterable = on(ee, 'foo'); - - process.nextTick(function() { - ee.emit('foo', 'bar'); - ee.emit('foo', 42); - iterable.return(); - }); - - const results = await Promise.all([ - iterable.next(), - iterable.next(), - iterable.next(), - ]); - - assert.deepStrictEqual(results, [{ - value: ['bar'], - done: false - }, { - value: [42], - done: false - }, { - value: undefined, - done: true - }]); - - assert.deepStrictEqual(await iterable.next(), { - value: undefined, - done: true - }); -} - async function nextError() { const ee = new EventEmitter(); const iterable = on(ee, 'foo'); @@ -177,44 +144,6 @@ async function nextError() { assert.strictEqual(ee.listeners('error').length, 0); } -async function iterableThrow() { - const ee = new EventEmitter(); - const iterable = on(ee, 'foo'); - - process.nextTick(() => { - ee.emit('foo', 'bar'); - ee.emit('foo', 42); // lost in the queue - iterable.throw(_err); - }); - - const _err = new Error('kaboom'); - let thrown = false; - - assert.throws(() => { - // No argument - iterable.throw(); - }, { - message: 'The "EventEmitter.AsyncIterator" property must be' + - ' an instance of Error. Received undefined', - name: 'TypeError' - }); - - const expected = [['bar'], [42]]; - - try { - for await (const event of iterable) { - assert.deepStrictEqual(event, expected.shift()); - } - } catch (err) { - thrown = true; - assert.strictEqual(err, _err); - } - assert.strictEqual(thrown, true); - assert.strictEqual(expected.length, 0); - assert.strictEqual(ee.listenerCount('foo'), 0); - assert.strictEqual(ee.listenerCount('error'), 0); -} - async function eventTarget() { const et = new EventTarget(); const tick = () => et.dispatchEvent(new Event('tick')); @@ -370,9 +299,7 @@ async function run() { error, errorDelayed, throwInLoop, - next, nextError, - iterableThrow, eventTarget, errorListenerCount, nodeEventTarget,