Skip to content

Commit eb82683

Browse files
mcollinaBethGriggs
authored andcommitted
events: add EventEmitter.on to async iterate over events
Fixes: #27847 PR-URL: #27994 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Gus Caplan <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent b3e0bc2 commit eb82683

File tree

3 files changed

+362
-0
lines changed

3 files changed

+362
-0
lines changed

doc/api/events.md

+35
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,41 @@ Value: `Symbol.for('nodejs.rejection')`
886886

887887
See how to write a custom [rejection handler][rejection].
888888

889+
## events.on(emitter, eventName)
890+
<!-- YAML
891+
added: REPLACEME
892+
-->
893+
894+
* `emitter` {EventEmitter}
895+
* `eventName` {string|symbol} The name of the event being listened for
896+
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`
897+
898+
```js
899+
const { on, EventEmitter } = require('events');
900+
901+
(async () => {
902+
const ee = new EventEmitter();
903+
904+
// Emit later on
905+
process.nextTick(() => {
906+
ee.emit('foo', 'bar');
907+
ee.emit('foo', 42);
908+
});
909+
910+
for await (const event of on(ee, 'foo')) {
911+
// The execution of this inner block is synchronous and it
912+
// processes one event at a time (even with await). Do not use
913+
// if concurrent execution is required.
914+
console.log(event); // prints ['bar'] [42]
915+
}
916+
})();
917+
```
918+
919+
Returns an `AsyncIterator` that iterates `eventName` events. It will throw
920+
if the `EventEmitter` emits `'error'`. It removes all listeners when
921+
exiting the loop. The `value` returned by each iteration is an array
922+
composed of the emitted event arguments.
923+
889924
[WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget
890925
[`--trace-warnings`]: cli.html#cli_trace_warnings
891926
[`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners

lib/events.js

+104
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@ const {
3030
ObjectCreate,
3131
ObjectDefineProperty,
3232
ObjectGetPrototypeOf,
33+
ObjectSetPrototypeOf,
3334
ObjectKeys,
3435
Promise,
36+
PromiseReject,
37+
PromiseResolve,
3538
ReflectApply,
3639
ReflectOwnKeys,
3740
Symbol,
3841
SymbolFor,
42+
SymbolAsyncIterator
3943
} = primordials;
4044
const kRejection = SymbolFor('nodejs.rejection');
4145

@@ -63,6 +67,7 @@ function EventEmitter(opts) {
6367
}
6468
module.exports = EventEmitter;
6569
module.exports.once = once;
70+
module.exports.on = on;
6671

6772
// Backwards-compat with node 0.10.x
6873
EventEmitter.EventEmitter = EventEmitter;
@@ -658,3 +663,102 @@ function once(emitter, name) {
658663
emitter.once(name, eventListener);
659664
});
660665
}
666+
667+
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
668+
ObjectGetPrototypeOf(async function* () {}).prototype);
669+
670+
function createIterResult(value, done) {
671+
return { value, done };
672+
}
673+
674+
function on(emitter, event) {
675+
const unconsumedEvents = [];
676+
const unconsumedPromises = [];
677+
let error = null;
678+
let finished = false;
679+
680+
const iterator = ObjectSetPrototypeOf({
681+
next() {
682+
// First, we consume all unread events
683+
const value = unconsumedEvents.shift();
684+
if (value) {
685+
return PromiseResolve(createIterResult(value, false));
686+
}
687+
688+
// Then we error, if an error happened
689+
// This happens one time if at all, because after 'error'
690+
// we stop listening
691+
if (error) {
692+
const p = PromiseReject(error);
693+
// Only the first element errors
694+
error = null;
695+
return p;
696+
}
697+
698+
// If the iterator is finished, resolve to done
699+
if (finished) {
700+
return PromiseResolve(createIterResult(undefined, true));
701+
}
702+
703+
// Wait until an event happens
704+
return new Promise(function(resolve, reject) {
705+
unconsumedPromises.push({ resolve, reject });
706+
});
707+
},
708+
709+
return() {
710+
emitter.removeListener(event, eventHandler);
711+
emitter.removeListener('error', errorHandler);
712+
finished = true;
713+
714+
for (const promise of unconsumedPromises) {
715+
promise.resolve(createIterResult(undefined, true));
716+
}
717+
718+
return PromiseResolve(createIterResult(undefined, true));
719+
},
720+
721+
throw(err) {
722+
if (!err || !(err instanceof Error)) {
723+
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
724+
'Error', err);
725+
}
726+
error = err;
727+
emitter.removeListener(event, eventHandler);
728+
emitter.removeListener('error', errorHandler);
729+
},
730+
731+
[SymbolAsyncIterator]() {
732+
return this;
733+
}
734+
}, AsyncIteratorPrototype);
735+
736+
emitter.on(event, eventHandler);
737+
emitter.on('error', errorHandler);
738+
739+
return iterator;
740+
741+
function eventHandler(...args) {
742+
const promise = unconsumedPromises.shift();
743+
if (promise) {
744+
promise.resolve(createIterResult(args, false));
745+
} else {
746+
unconsumedEvents.push(args);
747+
}
748+
}
749+
750+
function errorHandler(err) {
751+
finished = true;
752+
753+
const toError = unconsumedPromises.shift();
754+
755+
if (toError) {
756+
toError.reject(err);
757+
} else {
758+
// The next time we call next()
759+
error = err;
760+
}
761+
762+
iterator.return();
763+
}
764+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { on, EventEmitter } = require('events');
6+
7+
async function basic() {
8+
const ee = new EventEmitter();
9+
process.nextTick(() => {
10+
ee.emit('foo', 'bar');
11+
// 'bar' is a spurious event, we are testing
12+
// that it does not show up in the iterable
13+
ee.emit('bar', 24);
14+
ee.emit('foo', 42);
15+
});
16+
17+
const iterable = on(ee, 'foo');
18+
19+
const expected = [['bar'], [42]];
20+
21+
for await (const event of iterable) {
22+
const current = expected.shift();
23+
24+
assert.deepStrictEqual(current, event);
25+
26+
if (expected.length === 0) {
27+
break;
28+
}
29+
}
30+
assert.strictEqual(ee.listenerCount('foo'), 0);
31+
assert.strictEqual(ee.listenerCount('error'), 0);
32+
}
33+
34+
async function error() {
35+
const ee = new EventEmitter();
36+
const _err = new Error('kaboom');
37+
process.nextTick(() => {
38+
ee.emit('error', _err);
39+
});
40+
41+
const iterable = on(ee, 'foo');
42+
let looped = false;
43+
let thrown = false;
44+
45+
try {
46+
// eslint-disable-next-line no-unused-vars
47+
for await (const event of iterable) {
48+
looped = true;
49+
}
50+
} catch (err) {
51+
thrown = true;
52+
assert.strictEqual(err, _err);
53+
}
54+
assert.strictEqual(thrown, true);
55+
assert.strictEqual(looped, false);
56+
}
57+
58+
async function errorDelayed() {
59+
const ee = new EventEmitter();
60+
const _err = new Error('kaboom');
61+
process.nextTick(() => {
62+
ee.emit('foo', 42);
63+
ee.emit('error', _err);
64+
});
65+
66+
const iterable = on(ee, 'foo');
67+
const expected = [[42]];
68+
let thrown = false;
69+
70+
try {
71+
for await (const event of iterable) {
72+
const current = expected.shift();
73+
assert.deepStrictEqual(current, event);
74+
}
75+
} catch (err) {
76+
thrown = true;
77+
assert.strictEqual(err, _err);
78+
}
79+
assert.strictEqual(thrown, true);
80+
assert.strictEqual(ee.listenerCount('foo'), 0);
81+
assert.strictEqual(ee.listenerCount('error'), 0);
82+
}
83+
84+
async function throwInLoop() {
85+
const ee = new EventEmitter();
86+
const _err = new Error('kaboom');
87+
88+
process.nextTick(() => {
89+
ee.emit('foo', 42);
90+
});
91+
92+
try {
93+
for await (const event of on(ee, 'foo')) {
94+
assert.deepStrictEqual(event, [42]);
95+
throw _err;
96+
}
97+
} catch (err) {
98+
assert.strictEqual(err, _err);
99+
}
100+
101+
assert.strictEqual(ee.listenerCount('foo'), 0);
102+
assert.strictEqual(ee.listenerCount('error'), 0);
103+
}
104+
105+
async function next() {
106+
const ee = new EventEmitter();
107+
const iterable = on(ee, 'foo');
108+
109+
process.nextTick(function() {
110+
ee.emit('foo', 'bar');
111+
ee.emit('foo', 42);
112+
iterable.return();
113+
});
114+
115+
const results = await Promise.all([
116+
iterable.next(),
117+
iterable.next(),
118+
iterable.next()
119+
]);
120+
121+
assert.deepStrictEqual(results, [{
122+
value: ['bar'],
123+
done: false
124+
}, {
125+
value: [42],
126+
done: false
127+
}, {
128+
value: undefined,
129+
done: true
130+
}]);
131+
132+
assert.deepStrictEqual(await iterable.next(), {
133+
value: undefined,
134+
done: true
135+
});
136+
}
137+
138+
async function nextError() {
139+
const ee = new EventEmitter();
140+
const iterable = on(ee, 'foo');
141+
const _err = new Error('kaboom');
142+
process.nextTick(function() {
143+
ee.emit('error', _err);
144+
});
145+
const results = await Promise.allSettled([
146+
iterable.next(),
147+
iterable.next(),
148+
iterable.next()
149+
]);
150+
assert.deepStrictEqual(results, [{
151+
status: 'rejected',
152+
reason: _err
153+
}, {
154+
status: 'fulfilled',
155+
value: {
156+
value: undefined,
157+
done: true
158+
}
159+
}, {
160+
status: 'fulfilled',
161+
value: {
162+
value: undefined,
163+
done: true
164+
}
165+
}]);
166+
assert.strictEqual(ee.listeners('error').length, 0);
167+
}
168+
169+
async function iterableThrow() {
170+
const ee = new EventEmitter();
171+
const iterable = on(ee, 'foo');
172+
173+
process.nextTick(() => {
174+
ee.emit('foo', 'bar');
175+
ee.emit('foo', 42); // lost in the queue
176+
iterable.throw(_err);
177+
});
178+
179+
const _err = new Error('kaboom');
180+
let thrown = false;
181+
182+
assert.throws(() => {
183+
// No argument
184+
iterable.throw();
185+
}, {
186+
message: 'The "EventEmitter.AsyncIterator" property must be' +
187+
' an instance of Error. Received undefined',
188+
name: 'TypeError'
189+
});
190+
191+
const expected = [['bar'], [42]];
192+
193+
try {
194+
for await (const event of iterable) {
195+
assert.deepStrictEqual(event, expected.shift());
196+
}
197+
} catch (err) {
198+
thrown = true;
199+
assert.strictEqual(err, _err);
200+
}
201+
assert.strictEqual(thrown, true);
202+
assert.strictEqual(expected.length, 0);
203+
assert.strictEqual(ee.listenerCount('foo'), 0);
204+
assert.strictEqual(ee.listenerCount('error'), 0);
205+
}
206+
207+
async function run() {
208+
const funcs = [
209+
basic,
210+
error,
211+
errorDelayed,
212+
throwInLoop,
213+
next,
214+
nextError,
215+
iterableThrow
216+
];
217+
218+
for (const fn of funcs) {
219+
await fn();
220+
}
221+
}
222+
223+
run().then(common.mustCall());

0 commit comments

Comments
 (0)