Skip to content

Commit 3f33b5a

Browse files
jasnelltargos
authored andcommitted
events: allow use of AbortController with on
Signed-off-by: James M Snell <[email protected]> PR-URL: #34912 Backport-PR-URL: #38386 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Denys Otrishko <[email protected]>
1 parent 1fefb5c commit 3f33b5a

File tree

3 files changed

+177
-4
lines changed

3 files changed

+177
-4
lines changed

doc/api/events.md

+31-1
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ Value: `Symbol.for('nodejs.rejection')`
10041004

10051005
See how to write a custom [rejection handler][rejection].
10061006

1007-
## `events.on(emitter, eventName)`
1007+
## `events.on(emitter, eventName[, options])`
10081008
<!-- YAML
10091009
added:
10101010
- v13.6.0
@@ -1013,6 +1013,9 @@ added:
10131013

10141014
* `emitter` {EventEmitter}
10151015
* `eventName` {string|symbol} The name of the event being listened for
1016+
* `options` {Object}
1017+
* `signal` {AbortSignal} An {AbortSignal} that can be used to cancel awaiting
1018+
events.
10161019
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`
10171020

10181021
```js
@@ -1042,6 +1045,33 @@ if the `EventEmitter` emits `'error'`. It removes all listeners when
10421045
exiting the loop. The `value` returned by each iteration is an array
10431046
composed of the emitted event arguments.
10441047

1048+
An {AbortSignal} may be used to cancel waiting on events:
1049+
1050+
```js
1051+
const { on, EventEmitter } = require('events');
1052+
const ac = new AbortController();
1053+
1054+
(async () => {
1055+
const ee = new EventEmitter();
1056+
1057+
// Emit later on
1058+
process.nextTick(() => {
1059+
ee.emit('foo', 'bar');
1060+
ee.emit('foo', 42);
1061+
});
1062+
1063+
for await (const event of on(ee, 'foo', { signal: ac.signal })) {
1064+
// The execution of this inner block is synchronous and it
1065+
// processes one event at a time (even with await). Do not use
1066+
// if concurrent execution is required.
1067+
console.log(event); // prints ['bar'] [42]
1068+
}
1069+
// Unreachable here
1070+
})();
1071+
1072+
process.nextTick(() => ac.abort());
1073+
```
1074+
10451075
## `EventTarget` and `Event` API
10461076
<!-- YAML
10471077
added: v14.5.0

lib/events.js

+27-1
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,13 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
731731
}
732732
}
733733

734-
function on(emitter, event) {
734+
function on(emitter, event, options) {
735+
const { signal } = { ...options };
736+
validateAbortSignal(signal, 'options.signal');
737+
if (signal && signal.aborted) {
738+
throw lazyDOMException('The operation was aborted', 'AbortError');
739+
}
740+
735741
const unconsumedEvents = [];
736742
const unconsumedPromises = [];
737743
let error = null;
@@ -769,6 +775,15 @@ function on(emitter, event) {
769775
return() {
770776
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
771777
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
778+
779+
if (signal) {
780+
eventTargetAgnosticRemoveListener(
781+
signal,
782+
'abort',
783+
abortListener,
784+
{ once: true });
785+
}
786+
772787
finished = true;
773788

774789
for (const promise of unconsumedPromises) {
@@ -798,9 +813,20 @@ function on(emitter, event) {
798813
addErrorHandlerIfEventEmitter(emitter, errorHandler);
799814
}
800815

816+
if (signal) {
817+
eventTargetAgnosticAddListener(
818+
signal,
819+
'abort',
820+
abortListener,
821+
{ once: true });
822+
}
801823

802824
return iterator;
803825

826+
function abortListener() {
827+
errorHandler(lazyDOMException('The operation was aborted', 'AbortError'));
828+
}
829+
804830
function eventHandler(...args) {
805831
const promise = unconsumedPromises.shift();
806832
if (promise) {

test/parallel/test-event-on-async-iterator.js

+119-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Flags: --expose-internals
1+
// Flags: --expose-internals --no-warnings --experimental-abortcontroller
22
'use strict';
33

44
const common = require('../common');
@@ -248,6 +248,117 @@ async function nodeEventTarget() {
248248
clearInterval(interval);
249249
}
250250

251+
async function abortableOnBefore() {
252+
const ee = new EventEmitter();
253+
const ac = new AbortController();
254+
ac.abort();
255+
[1, {}, null, false, 'hi'].forEach((signal) => {
256+
assert.throws(() => on(ee, 'foo', { signal }), {
257+
code: 'ERR_INVALID_ARG_TYPE'
258+
});
259+
});
260+
assert.throws(() => on(ee, 'foo', { signal: ac.signal }), {
261+
name: 'AbortError'
262+
});
263+
}
264+
265+
async function eventTargetAbortableOnBefore() {
266+
const et = new EventTarget();
267+
const ac = new AbortController();
268+
ac.abort();
269+
[1, {}, null, false, 'hi'].forEach((signal) => {
270+
assert.throws(() => on(et, 'foo', { signal }), {
271+
code: 'ERR_INVALID_ARG_TYPE'
272+
});
273+
});
274+
assert.throws(() => on(et, 'foo', { signal: ac.signal }), {
275+
name: 'AbortError'
276+
});
277+
}
278+
279+
async function abortableOnAfter() {
280+
const ee = new EventEmitter();
281+
const ac = new AbortController();
282+
283+
const i = setInterval(() => ee.emit('foo', 'foo'), 10);
284+
285+
async function foo() {
286+
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
287+
assert.strictEqual(f, 'foo');
288+
}
289+
}
290+
291+
foo().catch(common.mustCall((error) => {
292+
assert.strictEqual(error.name, 'AbortError');
293+
})).finally(() => {
294+
clearInterval(i);
295+
});
296+
297+
process.nextTick(() => ac.abort());
298+
}
299+
300+
async function eventTargetAbortableOnAfter() {
301+
const et = new EventTarget();
302+
const ac = new AbortController();
303+
304+
const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);
305+
306+
async function foo() {
307+
for await (const f of on(et, 'foo', { signal: ac.signal })) {
308+
assert(f);
309+
}
310+
}
311+
312+
foo().catch(common.mustCall((error) => {
313+
assert.strictEqual(error.name, 'AbortError');
314+
})).finally(() => {
315+
clearInterval(i);
316+
});
317+
318+
process.nextTick(() => ac.abort());
319+
}
320+
321+
async function eventTargetAbortableOnAfter2() {
322+
const et = new EventTarget();
323+
const ac = new AbortController();
324+
325+
const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);
326+
327+
async function foo() {
328+
for await (const f of on(et, 'foo', { signal: ac.signal })) {
329+
assert(f);
330+
// Cancel after a single event has been triggered.
331+
ac.abort();
332+
}
333+
}
334+
335+
foo().catch(common.mustCall((error) => {
336+
assert.strictEqual(error.name, 'AbortError');
337+
})).finally(() => {
338+
clearInterval(i);
339+
});
340+
}
341+
342+
async function abortableOnAfterDone() {
343+
const ee = new EventEmitter();
344+
const ac = new AbortController();
345+
346+
const i = setInterval(() => ee.emit('foo', 'foo'), 1);
347+
let count = 0;
348+
349+
async function foo() {
350+
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
351+
assert.strictEqual(f[0], 'foo');
352+
if (++count === 5)
353+
break;
354+
}
355+
ac.abort(); // No error will occur
356+
}
357+
358+
foo().finally(() => {
359+
clearInterval(i);
360+
});
361+
}
251362

252363
async function run() {
253364
const funcs = [
@@ -260,7 +371,13 @@ async function run() {
260371
iterableThrow,
261372
eventTarget,
262373
errorListenerCount,
263-
nodeEventTarget
374+
nodeEventTarget,
375+
abortableOnBefore,
376+
abortableOnAfter,
377+
eventTargetAbortableOnBefore,
378+
eventTargetAbortableOnAfter,
379+
eventTargetAbortableOnAfter2,
380+
abortableOnAfterDone
264381
];
265382

266383
for (const fn of funcs) {

0 commit comments

Comments
 (0)