Skip to content

Commit 54275fd

Browse files
committed
events: add EventEmitterAsyncResource to core
Signd-off-by: James M Snell <[email protected]>
1 parent 0132db0 commit 54275fd

File tree

3 files changed

+343
-0
lines changed

3 files changed

+343
-0
lines changed

doc/api/events.md

+85
Original file line numberDiff line numberDiff line change
@@ -1166,6 +1166,89 @@ const emitter = new EventEmitter();
11661166
setMaxListeners(5, target, emitter);
11671167
```
11681168

1169+
## Class: `events.EventEmitterAsyncResource extends EventEmitter`
1170+
1171+
<!-- YAML
1172+
added: REPLACEME
1173+
-->
1174+
1175+
Integrates `EventEmitter` with {AsyncResource} for `EventEmitter`s that
1176+
require manual async tracking. Specifically, all events emitted by instances
1177+
of `events.EventEmitterAsyncResource` will run within its [async context][].
1178+
1179+
```js
1180+
const { EventEmitterAsyncResource } = require('events');
1181+
const { notStrictEqual, strictEqual } = require('assert');
1182+
const { executionAsyncId } = require('async_hooks');
1183+
1184+
// Async tracking tooling will identify this as 'Q'.
1185+
const ee1 = new EventEmitterAsyncResource({ name: 'Q' });
1186+
1187+
// 'foo' listeners will run in the EventEmitters async context.
1188+
ee1.on('foo', () => {
1189+
strictEqual(executionAsyncId(), ee1.asyncId());
1190+
strictEqual(triggerAsyncId(), ee1.triggerAsyncId());
1191+
});
1192+
1193+
const ee2 = new EventEmitter();
1194+
1195+
// 'foo' listeners on ordinary EventEmitters that do not track async
1196+
// context, however, run in the same async context as the emit().
1197+
ee1.on('foo', () => {
1198+
notStrictEqual(executionAsyncId(), ee2.asyncId());
1199+
notStrictEqual(triggerAsyncId(), ee2.triggerAsyncId());
1200+
});
1201+
1202+
Promise.resolve().then(() => {
1203+
ee1.emit('foo');
1204+
ee2.emit('foo');
1205+
});
1206+
```
1207+
1208+
The `EventEmitterAsyncResource` class has the same methods and takes the
1209+
same options as `EventEmitter` and `AsyncResource` themselves.
1210+
1211+
### `new events.EventEmitterAsyncResource(options)`
1212+
1213+
* `options` {Object}
1214+
* `captureRejections` {boolean} It enables
1215+
[automatic capturing of promise rejection][capturerejections].
1216+
**Default:** `false`.
1217+
* `name` {string} The type of async event. **Default::**
1218+
[`new.target.name`][].
1219+
* `triggerAsyncId` {number} The ID of the execution context that created this
1220+
async event. **Default:** `executionAsyncId()`.
1221+
* `requireManualDestroy` {boolean} If set to `true`, disables `emitDestroy`
1222+
when the object is garbage collected. This usually does not need to be set
1223+
(even if `emitDestroy` is called manually), unless the resource's `asyncId`
1224+
is retrieved and the sensitive API's `emitDestroy` is called with it.
1225+
When set to `false`, the `emitDestroy` call on garbage collection
1226+
will only take place if there is at least one active `destroy` hook.
1227+
**Default:** `false`.
1228+
1229+
### `eventemitterasyncresource.asyncId()`
1230+
1231+
* Returns: {number} The unique `asyncId` assigned to the resource.
1232+
1233+
### `eventemitterasyncresource.asyncResource`
1234+
1235+
* Type: The underlying {AsyncResource}.
1236+
1237+
The returned `AsyncResource` object has an additional `eventEmitter` property
1238+
that provides a reference to this `EventEmitterAsyncResource`.
1239+
1240+
### `eventemitterasyncresource.emitDestroy()`
1241+
1242+
Call all `destroy` hooks. This should only ever be called once. An error will
1243+
be thrown if it is called more than once. This **must** be manually called. If
1244+
the resource is left to be collected by the GC then the `destroy` hooks will
1245+
never be called.
1246+
1247+
### `eventemitterasyncresource.triggerAsyncId()`
1248+
1249+
* Returns: {number} The same `triggerAsyncId` that is passed to the
1250+
`AsyncResource` constructor.
1251+
11691252
<a id="event-target-and-event-api"></a>
11701253

11711254
## `EventTarget` and `Event` API
@@ -1706,7 +1789,9 @@ to the `EventTarget`.
17061789
[`events.defaultMaxListeners`]: #eventsdefaultmaxlisteners
17071790
[`fs.ReadStream`]: fs.md#class-fsreadstream
17081791
[`net.Server`]: net.md#class-netserver
1792+
[`new.target.name`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/new.target
17091793
[`process.on('warning')`]: process.md#event-warning
1794+
[async context]: async_context.md
17101795
[capturerejections]: #capture-rejections-of-promises
17111796
[error]: #error-events
17121797
[rejection]: #emittersymbolfornodejsrejectionerr-eventname-args

lib/events.js

+126
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const {
2727
ArrayPrototypeShift,
2828
ArrayPrototypeSlice,
2929
ArrayPrototypeSplice,
30+
ArrayPrototypeUnshift,
3031
Boolean,
3132
Error,
3233
ErrorCaptureStackTrace,
@@ -42,6 +43,7 @@ const {
4243
Promise,
4344
PromiseReject,
4445
PromiseResolve,
46+
ReflectApply,
4547
ReflectOwnKeys,
4648
String,
4749
StringPrototypeSplit,
@@ -59,6 +61,7 @@ const {
5961
kEnhanceStackBeforeInspector,
6062
codes: {
6163
ERR_INVALID_ARG_TYPE,
64+
ERR_INVALID_THIS,
6265
ERR_OUT_OF_RANGE,
6366
ERR_UNHANDLED_ERROR
6467
},
@@ -76,6 +79,122 @@ const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners');
7679
const kMaxEventTargetListenersWarned =
7780
Symbol('events.maxEventTargetListenersWarned');
7881

82+
let EventEmitterAsyncResource;
83+
// The EventEmitterAsyncResource has to be initialized lazily because event.js
84+
// is loaded so early in the bootstrap process, before async_hooks is available.
85+
//
86+
// This implementation was adapted straight from addaleax's
87+
// eventemitter-asyncresource MIT-licensed userland module.
88+
// https://github.com/addaleax/eventemitter-asyncresource
89+
function lazyEventEmitterAsyncResource() {
90+
if (EventEmitterAsyncResource === undefined) {
91+
const {
92+
AsyncResource
93+
} = require('async_hooks');
94+
95+
const kEventEmitter = Symbol('kEventEmitter');
96+
const kAsyncResource = Symbol('kAsyncResource');
97+
class EventEmitterReferencingAsyncResource extends AsyncResource {
98+
/**
99+
* @param {EventEmitter} ee
100+
* @param {string} [type]
101+
* @param {{
102+
* triggerAsyncId?: number,
103+
* requireManualDestroy?: boolean,
104+
* }} [options]
105+
*/
106+
constructor(ee, type, options) {
107+
super(type, options);
108+
this[kEventEmitter] = ee;
109+
}
110+
111+
/**
112+
* @type {EventEmitter}
113+
*/
114+
get eventEmitter() {
115+
if (this[kEventEmitter] === undefined)
116+
throw new ERR_INVALID_THIS('EventEmitterReferencingAsyncResource');
117+
return this[kEventEmitter];
118+
}
119+
}
120+
121+
EventEmitterAsyncResource =
122+
class EventEmitterAsyncResource extends EventEmitter {
123+
/**
124+
* @param {{
125+
* name?: string,
126+
* triggerAsyncId?: number,
127+
* requireManualDestroy?: boolean,
128+
* }} [options]
129+
*/
130+
constructor(options = undefined) {
131+
let name;
132+
if (typeof options === 'string') {
133+
name = options;
134+
options = undefined;
135+
} else {
136+
name = options?.name || new.target.name;
137+
}
138+
super(options);
139+
140+
this[kAsyncResource] =
141+
new EventEmitterReferencingAsyncResource(this, name, options);
142+
}
143+
144+
/**
145+
* @param {symbol,string} event
146+
* @param {...any} args
147+
* @returns {boolean}
148+
*/
149+
emit(event, ...args) {
150+
if (this[kAsyncResource] === undefined)
151+
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
152+
const { asyncResource } = this;
153+
ArrayPrototypeUnshift(args, super.emit, this, event);
154+
return ReflectApply(asyncResource.runInAsyncScope, asyncResource,
155+
args);
156+
}
157+
158+
/**
159+
* @returns {void}
160+
*/
161+
emitDestroy() {
162+
if (this[kAsyncResource] === undefined)
163+
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
164+
this.asyncResource.emitDestroy();
165+
}
166+
167+
/**
168+
* @returns {number}
169+
*/
170+
asyncId() {
171+
if (this[kAsyncResource] === undefined)
172+
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
173+
return this.asyncResource.asyncId();
174+
}
175+
176+
/**
177+
* @returns {number}
178+
*/
179+
triggerAsyncId() {
180+
if (this[kAsyncResource] === undefined)
181+
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
182+
return this.asyncResource.triggerAsyncId();
183+
}
184+
185+
/**
186+
* @returns {EventEmitterReferencingAsyncResource}
187+
*/
188+
get asyncResource() {
189+
if (this[kAsyncResource] === undefined)
190+
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
191+
return this[kAsyncResource];
192+
}
193+
};
194+
}
195+
return EventEmitterAsyncResource;
196+
}
197+
79198
/**
80199
* Creates a new `EventEmitter` instance.
81200
* @param {{ captureRejections?: boolean; }} [opts]
@@ -106,6 +225,13 @@ ObjectDefineProperty(EventEmitter, 'captureRejections', {
106225
enumerable: true
107226
});
108227

228+
ObjectDefineProperty(EventEmitter, 'EventEmitterAsyncResource', {
229+
enumerable: true,
230+
get: lazyEventEmitterAsyncResource,
231+
set: undefined,
232+
configurable: true,
233+
});
234+
109235
EventEmitter.errorMonitor = kErrorMonitor;
110236

111237
// The default for captureRejections is false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { EventEmitterAsyncResource } = require('events');
5+
const {
6+
createHook,
7+
executionAsyncId,
8+
} = require('async_hooks');
9+
10+
const {
11+
deepStrictEqual,
12+
strictEqual,
13+
} = require('assert');
14+
15+
const {
16+
setImmediate: tick,
17+
} = require('timers/promises');
18+
19+
function makeHook(trackedTypes) {
20+
const eventMap = new Map();
21+
22+
function log(asyncId, name) {
23+
const entry = eventMap.get(asyncId);
24+
if (entry !== undefined) entry.push({ name });
25+
}
26+
27+
const hook = createHook({
28+
init(asyncId, type, triggerAsyncId, resource) {
29+
if (trackedTypes.includes(type)) {
30+
eventMap.set(asyncId, [
31+
{
32+
name: 'init',
33+
type,
34+
triggerAsyncId,
35+
resource,
36+
},
37+
]);
38+
}
39+
},
40+
41+
before(asyncId) { log(asyncId, 'before'); },
42+
after(asyncId) { log(asyncId, 'after'); },
43+
destroy(asyncId) { log(asyncId, 'destroy'); }
44+
}).enable();
45+
46+
return {
47+
done() {
48+
hook.disable();
49+
return new Set(eventMap.values());
50+
},
51+
ids() {
52+
return new Set(eventMap.keys());
53+
}
54+
};
55+
}
56+
57+
// Tracks emit() calls correctly using async_hooks
58+
(async () => {
59+
const tracer = makeHook(['Foo']);
60+
61+
class Foo extends EventEmitterAsyncResource {}
62+
63+
const origExecutionAsyncId = executionAsyncId();
64+
const foo = new Foo();
65+
66+
foo.on('someEvent', common.mustCall());
67+
foo.emit('someEvent');
68+
69+
deepStrictEqual([foo.asyncId()], [...tracer.ids()]);
70+
strictEqual(foo.triggerAsyncId(), origExecutionAsyncId);
71+
strictEqual(foo.asyncResource.eventEmitter, foo);
72+
73+
foo.emitDestroy();
74+
75+
await tick();
76+
77+
deepStrictEqual(tracer.done(), new Set([
78+
[
79+
{
80+
name: 'init',
81+
type: 'Foo',
82+
triggerAsyncId: origExecutionAsyncId,
83+
resource: foo.asyncResource,
84+
},
85+
{ name: 'before' },
86+
{ name: 'after' },
87+
{ name: 'destroy' },
88+
],
89+
]));
90+
})().then(common.mustCall());
91+
92+
// Can explicitly specify name as positional arg
93+
(async () => {
94+
const tracer = makeHook(['ResourceName']);
95+
96+
const origExecutionAsyncId = executionAsyncId();
97+
class Foo extends EventEmitterAsyncResource {}
98+
99+
const foo = new Foo('ResourceName');
100+
101+
deepStrictEqual(tracer.done(), new Set([
102+
[
103+
{
104+
name: 'init',
105+
type: 'ResourceName',
106+
triggerAsyncId: origExecutionAsyncId,
107+
resource: foo.asyncResource,
108+
},
109+
],
110+
]));
111+
})().then(common.mustCall());
112+
113+
// Can explicitly specify name as option
114+
(async () => {
115+
const tracer = makeHook(['ResourceName']);
116+
117+
const origExecutionAsyncId = executionAsyncId();
118+
class Foo extends EventEmitterAsyncResource {}
119+
120+
const foo = new Foo({ name: 'ResourceName' });
121+
122+
deepStrictEqual(tracer.done(), new Set([
123+
[
124+
{
125+
name: 'init',
126+
type: 'ResourceName',
127+
triggerAsyncId: origExecutionAsyncId,
128+
resource: foo.asyncResource,
129+
},
130+
],
131+
]));
132+
})().then(common.mustCall());

0 commit comments

Comments
 (0)