Skip to content

Commit afecc97

Browse files
mcollinaBridgeAR
authored andcommitted
events: add EventEmitter.on to async iterate over events
Fixes: #27847 PR-URL: #27994 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Gus Caplan <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent 07e82db commit afecc97

File tree

3 files changed

+362
-0
lines changed

3 files changed

+362
-0
lines changed

doc/api/events.md

+35
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,41 @@ Value: `Symbol.for('nodejs.rejection')`
886886

887887
See how to write a custom [rejection handler][rejection].
888888

889+
## events.on(emitter, eventName)
890+
<!-- YAML
891+
added: REPLACEME
892+
-->
893+
894+
* `emitter` {EventEmitter}
895+
* `eventName` {string|symbol} The name of the event being listened for
896+
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`
897+
898+
```js
899+
const { on, EventEmitter } = require('events');
900+
901+
(async () => {
902+
const ee = new EventEmitter();
903+
904+
// Emit later on
905+
process.nextTick(() => {
906+
ee.emit('foo', 'bar');
907+
ee.emit('foo', 42);
908+
});
909+
910+
for await (const event of on(ee, 'foo')) {
911+
// The execution of this inner block is synchronous and it
912+
// processes one event at a time (even with await). Do not use
913+
// if concurrent execution is required.
914+
console.log(event); // prints ['bar'] [42]
915+
}
916+
})();
917+
```
918+
919+
Returns an `AsyncIterator` that iterates `eventName` events. It will throw
920+
if the `EventEmitter` emits `'error'`. It removes all listeners when
921+
exiting the loop. The `value` returned by each iteration is an array
922+
composed of the emitted event arguments.
923+
889924
[WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget
890925
[`--trace-warnings`]: cli.html#cli_trace_warnings
891926
[`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners

lib/events.js

+104
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,16 @@ const {
2929
ObjectCreate,
3030
ObjectDefineProperty,
3131
ObjectGetPrototypeOf,
32+
ObjectSetPrototypeOf,
3233
ObjectKeys,
3334
Promise,
35+
PromiseReject,
36+
PromiseResolve,
3437
ReflectApply,
3538
ReflectOwnKeys,
3639
Symbol,
3740
SymbolFor,
41+
SymbolAsyncIterator
3842
} = primordials;
3943
const kRejection = SymbolFor('nodejs.rejection');
4044

@@ -62,6 +66,7 @@ function EventEmitter(opts) {
6266
}
6367
module.exports = EventEmitter;
6468
module.exports.once = once;
69+
module.exports.on = on;
6570

6671
// Backwards-compat with node 0.10.x
6772
EventEmitter.EventEmitter = EventEmitter;
@@ -657,3 +662,102 @@ function once(emitter, name) {
657662
emitter.once(name, eventListener);
658663
});
659664
}
665+
666+
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
667+
ObjectGetPrototypeOf(async function* () {}).prototype);
668+
669+
function createIterResult(value, done) {
670+
return { value, done };
671+
}
672+
673+
function on(emitter, event) {
674+
const unconsumedEvents = [];
675+
const unconsumedPromises = [];
676+
let error = null;
677+
let finished = false;
678+
679+
const iterator = ObjectSetPrototypeOf({
680+
next() {
681+
// First, we consume all unread events
682+
const value = unconsumedEvents.shift();
683+
if (value) {
684+
return PromiseResolve(createIterResult(value, false));
685+
}
686+
687+
// Then we error, if an error happened
688+
// This happens one time if at all, because after 'error'
689+
// we stop listening
690+
if (error) {
691+
const p = PromiseReject(error);
692+
// Only the first element errors
693+
error = null;
694+
return p;
695+
}
696+
697+
// If the iterator is finished, resolve to done
698+
if (finished) {
699+
return PromiseResolve(createIterResult(undefined, true));
700+
}
701+
702+
// Wait until an event happens
703+
return new Promise(function(resolve, reject) {
704+
unconsumedPromises.push({ resolve, reject });
705+
});
706+
},
707+
708+
return() {
709+
emitter.removeListener(event, eventHandler);
710+
emitter.removeListener('error', errorHandler);
711+
finished = true;
712+
713+
for (const promise of unconsumedPromises) {
714+
promise.resolve(createIterResult(undefined, true));
715+
}
716+
717+
return PromiseResolve(createIterResult(undefined, true));
718+
},
719+
720+
throw(err) {
721+
if (!err || !(err instanceof Error)) {
722+
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
723+
'Error', err);
724+
}
725+
error = err;
726+
emitter.removeListener(event, eventHandler);
727+
emitter.removeListener('error', errorHandler);
728+
},
729+
730+
[SymbolAsyncIterator]() {
731+
return this;
732+
}
733+
}, AsyncIteratorPrototype);
734+
735+
emitter.on(event, eventHandler);
736+
emitter.on('error', errorHandler);
737+
738+
return iterator;
739+
740+
function eventHandler(...args) {
741+
const promise = unconsumedPromises.shift();
742+
if (promise) {
743+
promise.resolve(createIterResult(args, false));
744+
} else {
745+
unconsumedEvents.push(args);
746+
}
747+
}
748+
749+
function errorHandler(err) {
750+
finished = true;
751+
752+
const toError = unconsumedPromises.shift();
753+
754+
if (toError) {
755+
toError.reject(err);
756+
} else {
757+
// The next time we call next()
758+
error = err;
759+
}
760+
761+
iterator.return();
762+
}
763+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { on, EventEmitter } = require('events');
6+
7+
async function basic() {
8+
const ee = new EventEmitter();
9+
process.nextTick(() => {
10+
ee.emit('foo', 'bar');
11+
// 'bar' is a spurious event, we are testing
12+
// that it does not show up in the iterable
13+
ee.emit('bar', 24);
14+
ee.emit('foo', 42);
15+
});
16+
17+
const iterable = on(ee, 'foo');
18+
19+
const expected = [['bar'], [42]];
20+
21+
for await (const event of iterable) {
22+
const current = expected.shift();
23+
24+
assert.deepStrictEqual(current, event);
25+
26+
if (expected.length === 0) {
27+
break;
28+
}
29+
}
30+
assert.strictEqual(ee.listenerCount('foo'), 0);
31+
assert.strictEqual(ee.listenerCount('error'), 0);
32+
}
33+
34+
async function error() {
35+
const ee = new EventEmitter();
36+
const _err = new Error('kaboom');
37+
process.nextTick(() => {
38+
ee.emit('error', _err);
39+
});
40+
41+
const iterable = on(ee, 'foo');
42+
let looped = false;
43+
let thrown = false;
44+
45+
try {
46+
// eslint-disable-next-line no-unused-vars
47+
for await (const event of iterable) {
48+
looped = true;
49+
}
50+
} catch (err) {
51+
thrown = true;
52+
assert.strictEqual(err, _err);
53+
}
54+
assert.strictEqual(thrown, true);
55+
assert.strictEqual(looped, false);
56+
}
57+
58+
async function errorDelayed() {
59+
const ee = new EventEmitter();
60+
const _err = new Error('kaboom');
61+
process.nextTick(() => {
62+
ee.emit('foo', 42);
63+
ee.emit('error', _err);
64+
});
65+
66+
const iterable = on(ee, 'foo');
67+
const expected = [[42]];
68+
let thrown = false;
69+
70+
try {
71+
for await (const event of iterable) {
72+
const current = expected.shift();
73+
assert.deepStrictEqual(current, event);
74+
}
75+
} catch (err) {
76+
thrown = true;
77+
assert.strictEqual(err, _err);
78+
}
79+
assert.strictEqual(thrown, true);
80+
assert.strictEqual(ee.listenerCount('foo'), 0);
81+
assert.strictEqual(ee.listenerCount('error'), 0);
82+
}
83+
84+
async function throwInLoop() {
85+
const ee = new EventEmitter();
86+
const _err = new Error('kaboom');
87+
88+
process.nextTick(() => {
89+
ee.emit('foo', 42);
90+
});
91+
92+
try {
93+
for await (const event of on(ee, 'foo')) {
94+
assert.deepStrictEqual(event, [42]);
95+
throw _err;
96+
}
97+
} catch (err) {
98+
assert.strictEqual(err, _err);
99+
}
100+
101+
assert.strictEqual(ee.listenerCount('foo'), 0);
102+
assert.strictEqual(ee.listenerCount('error'), 0);
103+
}
104+
105+
async function next() {
106+
const ee = new EventEmitter();
107+
const iterable = on(ee, 'foo');
108+
109+
process.nextTick(function() {
110+
ee.emit('foo', 'bar');
111+
ee.emit('foo', 42);
112+
iterable.return();
113+
});
114+
115+
const results = await Promise.all([
116+
iterable.next(),
117+
iterable.next(),
118+
iterable.next()
119+
]);
120+
121+
assert.deepStrictEqual(results, [{
122+
value: ['bar'],
123+
done: false
124+
}, {
125+
value: [42],
126+
done: false
127+
}, {
128+
value: undefined,
129+
done: true
130+
}]);
131+
132+
assert.deepStrictEqual(await iterable.next(), {
133+
value: undefined,
134+
done: true
135+
});
136+
}
137+
138+
async function nextError() {
139+
const ee = new EventEmitter();
140+
const iterable = on(ee, 'foo');
141+
const _err = new Error('kaboom');
142+
process.nextTick(function() {
143+
ee.emit('error', _err);
144+
});
145+
const results = await Promise.allSettled([
146+
iterable.next(),
147+
iterable.next(),
148+
iterable.next()
149+
]);
150+
assert.deepStrictEqual(results, [{
151+
status: 'rejected',
152+
reason: _err
153+
}, {
154+
status: 'fulfilled',
155+
value: {
156+
value: undefined,
157+
done: true
158+
}
159+
}, {
160+
status: 'fulfilled',
161+
value: {
162+
value: undefined,
163+
done: true
164+
}
165+
}]);
166+
assert.strictEqual(ee.listeners('error').length, 0);
167+
}
168+
169+
async function iterableThrow() {
170+
const ee = new EventEmitter();
171+
const iterable = on(ee, 'foo');
172+
173+
process.nextTick(() => {
174+
ee.emit('foo', 'bar');
175+
ee.emit('foo', 42); // lost in the queue
176+
iterable.throw(_err);
177+
});
178+
179+
const _err = new Error('kaboom');
180+
let thrown = false;
181+
182+
assert.throws(() => {
183+
// No argument
184+
iterable.throw();
185+
}, {
186+
message: 'The "EventEmitter.AsyncIterator" property must be' +
187+
' an instance of Error. Received undefined',
188+
name: 'TypeError'
189+
});
190+
191+
const expected = [['bar'], [42]];
192+
193+
try {
194+
for await (const event of iterable) {
195+
assert.deepStrictEqual(event, expected.shift());
196+
}
197+
} catch (err) {
198+
thrown = true;
199+
assert.strictEqual(err, _err);
200+
}
201+
assert.strictEqual(thrown, true);
202+
assert.strictEqual(expected.length, 0);
203+
assert.strictEqual(ee.listenerCount('foo'), 0);
204+
assert.strictEqual(ee.listenerCount('error'), 0);
205+
}
206+
207+
async function run() {
208+
const funcs = [
209+
basic,
210+
error,
211+
errorDelayed,
212+
throwInLoop,
213+
next,
214+
nextError,
215+
iterableThrow
216+
];
217+
218+
for (const fn of funcs) {
219+
await fn();
220+
}
221+
}
222+
223+
run().then(common.mustCall());

0 commit comments

Comments
 (0)