Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit f00969e

Browse files
author
Stephen Belanger
committedDec 15, 2022
lib: add tracing channel to diagnostics_channel
1 parent b3f5a41 commit f00969e

8 files changed

+1074
-21
lines changed
 

‎doc/api/diagnostics_channel.md

+560
Large diffs are not rendered by default.

‎lib/diagnostics_channel.js

+250-21
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
'use strict';
22

33
const {
4+
ArrayPrototypeAt,
45
ArrayPrototypeIndexOf,
56
ArrayPrototypePush,
67
ArrayPrototypeSplice,
8+
FunctionPrototypeBind,
79
ObjectCreate,
810
ObjectGetPrototypeOf,
911
ObjectSetPrototypeOf,
12+
PromisePrototypeThen,
13+
PromiseReject,
14+
ReflectApply,
15+
SafeMap,
1016
SymbolHasInstance,
1117
} = primordials;
1218

@@ -23,11 +29,44 @@ const { triggerUncaughtException } = internalBinding('errors');
2329

2430
const { WeakReference } = internalBinding('util');
2531

32+
function decRef(channel) {
33+
channel._weak.decRef();
34+
if (channel._weak.getRef() === 0) {
35+
delete channels[channel.name];
36+
}
37+
}
38+
39+
function markActive(channel) {
40+
// eslint-disable-next-line no-use-before-define
41+
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
42+
channel._subscribers = [];
43+
channel._stores = new SafeMap();
44+
}
45+
46+
function maybeMarkInactive(channel) {
47+
// When there are no more active subscribers, restore to fast prototype.
48+
if (!channel._subscribers.length && !channel._stores.size) {
49+
// eslint-disable-next-line no-use-before-define
50+
ObjectSetPrototypeOf(channel, Channel.prototype);
51+
channel._subscribers = undefined;
52+
channel._stores = undefined;
53+
}
54+
}
55+
56+
function defaultTransform(data) {
57+
return data;
58+
}
59+
60+
function wrapStoreRun(store, data, next, transform = defaultTransform) {
61+
return () => store.run(transform(data), next);
62+
}
63+
2664
// TODO(qard): should there be a C++ channel interface?
2765
class ActiveChannel {
2866
subscribe(subscription) {
2967
validateFunction(subscription, 'subscription');
3068
ArrayPrototypePush(this._subscribers, subscription);
69+
this._weak.incRef();
3170
}
3271

3372
unsubscribe(subscription) {
@@ -36,12 +75,28 @@ class ActiveChannel {
3675

3776
ArrayPrototypeSplice(this._subscribers, index, 1);
3877

39-
// When there are no more active subscribers, restore to fast prototype.
40-
if (!this._subscribers.length) {
41-
// eslint-disable-next-line no-use-before-define
42-
ObjectSetPrototypeOf(this, Channel.prototype);
78+
decRef(this);
79+
maybeMarkInactive(this);
80+
81+
return true;
82+
}
83+
84+
bindStore(store, transform) {
85+
const replacing = this._stores.has(store);
86+
if (!replacing) this._weak.incRef();
87+
this._stores.set(store, transform);
88+
}
89+
90+
unbindStore(store) {
91+
if (!this._stores.has(store)) {
92+
return false;
4393
}
4494

95+
this._stores.delete(store);
96+
97+
decRef(this);
98+
maybeMarkInactive(this);
99+
45100
return true;
46101
}
47102

@@ -61,11 +116,28 @@ class ActiveChannel {
61116
}
62117
}
63118
}
119+
120+
runStores(data, fn, thisArg, ...args) {
121+
this.publish(data);
122+
123+
// Bind base fn first due to AsyncLocalStorage.run not having thisArg
124+
fn = FunctionPrototypeBind(fn, thisArg, ...args);
125+
126+
for (const entry of this._stores.entries()) {
127+
const store = entry[0];
128+
const transform = entry[1];
129+
fn = wrapStoreRun(store, data, fn, transform);
130+
}
131+
132+
return fn();
133+
}
64134
}
65135

66136
class Channel {
67137
constructor(name) {
68138
this._subscribers = undefined;
139+
this._stores = undefined;
140+
this._weak = undefined;
69141
this.name = name;
70142
}
71143

@@ -76,20 +148,32 @@ class Channel {
76148
}
77149

78150
subscribe(subscription) {
79-
ObjectSetPrototypeOf(this, ActiveChannel.prototype);
80-
this._subscribers = [];
151+
markActive(this);
81152
this.subscribe(subscription);
82153
}
83154

84155
unsubscribe() {
85156
return false;
86157
}
87158

159+
bindStore(store, transform) {
160+
markActive(this);
161+
this.bindStore(store, transform);
162+
}
163+
164+
unbindStore() {
165+
return false;
166+
}
167+
88168
get hasSubscribers() {
89169
return false;
90170
}
91171

92172
publish() {}
173+
174+
runStores(data, fn, thisArg, ...args) {
175+
return ReflectApply(fn, thisArg, args);
176+
}
93177
}
94178

95179
const channels = ObjectCreate(null);
@@ -105,27 +189,17 @@ function channel(name) {
105189
}
106190

107191
channel = new Channel(name);
108-
channels[name] = new WeakReference(channel);
192+
channel._weak = new WeakReference(channel);
193+
channels[name] = channel._weak;
109194
return channel;
110195
}
111196

112197
function subscribe(name, subscription) {
113-
const chan = channel(name);
114-
channels[name].incRef();
115-
chan.subscribe(subscription);
198+
return channel(name).subscribe(subscription);
116199
}
117200

118201
function unsubscribe(name, subscription) {
119-
const chan = channel(name);
120-
if (!chan.unsubscribe(subscription)) {
121-
return false;
122-
}
123-
124-
channels[name].decRef();
125-
if (channels[name].getRef() === 0) {
126-
delete channels[name];
127-
}
128-
return true;
202+
return channel(name).unsubscribe(subscription);
129203
}
130204

131205
function hasSubscribers(name) {
@@ -139,10 +213,165 @@ function hasSubscribers(name) {
139213
return channel.hasSubscribers;
140214
}
141215

216+
const traceEvents = [
217+
'start',
218+
'end',
219+
'asyncStart',
220+
'asyncEnd',
221+
'error',
222+
];
223+
224+
function assertChannel(value, name) {
225+
if (!(value instanceof Channel)) {
226+
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
227+
}
228+
}
229+
230+
class TracingChannel {
231+
constructor(nameOrChannels) {
232+
if (typeof nameOrChannels === 'string') {
233+
this.start = channel(`tracing:${nameOrChannels}:start`);
234+
this.end = channel(`tracing:${nameOrChannels}:end`);
235+
this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
236+
this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
237+
this.error = channel(`tracing:${nameOrChannels}:error`);
238+
} else if (typeof nameOrChannels === 'object') {
239+
const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;
240+
241+
assertChannel(start, 'nameOrChannels.start');
242+
assertChannel(end, 'nameOrChannels.end');
243+
assertChannel(asyncStart, 'nameOrChannels.asyncStart');
244+
assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
245+
assertChannel(error, 'nameOrChannels.error');
246+
247+
this.start = start;
248+
this.end = end;
249+
this.asyncStart = asyncStart;
250+
this.asyncEnd = asyncEnd;
251+
this.error = error;
252+
} else {
253+
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
254+
['string', 'object', 'Channel'],
255+
nameOrChannels);
256+
}
257+
}
258+
259+
subscribe(handlers) {
260+
for (const name of traceEvents) {
261+
if (!handlers[name]) continue;
262+
263+
this[name]?.subscribe(handlers[name]);
264+
}
265+
}
266+
267+
unsubscribe(handlers) {
268+
let done = true;
269+
270+
for (const name of traceEvents) {
271+
if (!handlers[name]) continue;
272+
273+
if (!this[name]?.unsubscribe(handlers[name])) {
274+
done = false;
275+
}
276+
}
277+
278+
return done;
279+
}
280+
281+
traceSync(fn, ctx = {}, thisArg, ...args) {
282+
const { start, end, error } = this;
283+
284+
try {
285+
const result = start.runStores(ctx, fn, thisArg, ...args);
286+
ctx.result = result;
287+
return result;
288+
} catch (err) {
289+
ctx.error = err;
290+
error.publish(ctx);
291+
throw err;
292+
} finally {
293+
end.publish(ctx);
294+
}
295+
}
296+
297+
tracePromise(fn, ctx = {}, thisArg, ...args) {
298+
const { start, end, asyncStart, asyncEnd, error } = this;
299+
300+
function reject(err) {
301+
ctx.error = err;
302+
error.publish(ctx);
303+
asyncStart.publish(ctx);
304+
// TODO: Is there a way to have asyncEnd _after_ the continuation?
305+
asyncEnd.publish(ctx);
306+
return PromiseReject(err);
307+
}
308+
309+
function resolve(result) {
310+
ctx.result = result;
311+
asyncStart.publish(ctx);
312+
// TODO: Is there a way to have asyncEnd _after_ the continuation?
313+
asyncEnd.publish(ctx);
314+
return result;
315+
}
316+
317+
try {
318+
const promise = start.runStores(ctx, fn, thisArg, ...args);
319+
return PromisePrototypeThen(promise, resolve, reject);
320+
} catch (err) {
321+
ctx.error = err;
322+
error.publish(ctx);
323+
throw err;
324+
} finally {
325+
end.publish(ctx);
326+
}
327+
}
328+
329+
traceCallback(fn, position = 0, ctx = {}, thisArg, ...args) {
330+
const { start, end, asyncStart, asyncEnd, error } = this;
331+
332+
function wrappedCallback(err, res) {
333+
if (err) {
334+
ctx.error = err;
335+
error.publish(ctx);
336+
} else {
337+
ctx.result = res;
338+
}
339+
340+
asyncStart.publish(ctx);
341+
try {
342+
if (callback) {
343+
return ReflectApply(callback, this, arguments);
344+
}
345+
} finally {
346+
asyncEnd.publish(ctx);
347+
}
348+
}
349+
350+
const callback = ArrayPrototypeAt(args, position);
351+
ArrayPrototypeSplice(args, position, 1, wrappedCallback);
352+
353+
try {
354+
return start.runStores(ctx, fn, thisArg, ...args);
355+
} catch (err) {
356+
ctx.error = err;
357+
error.publish(ctx);
358+
throw err;
359+
} finally {
360+
end.publish(ctx);
361+
}
362+
}
363+
}
364+
365+
function tracingChannel(nameOrChannels) {
366+
return new TracingChannel(nameOrChannels);
367+
}
368+
142369
module.exports = {
143370
channel,
144371
hasSubscribers,
145372
subscribe,
373+
tracingChannel,
146374
unsubscribe,
147-
Channel
375+
Channel,
376+
TracingChannel
148377
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const dc = require('diagnostics_channel');
6+
const { AsyncLocalStorage } = require('async_hooks');
7+
8+
let n = 0;
9+
const thisArg = new Date();
10+
const inputs = [
11+
{ foo: 'bar' },
12+
{ baz: 'buz' },
13+
];
14+
15+
const channel = dc.channel('test');
16+
17+
// Bind a storage directly to published data
18+
const store1 = new AsyncLocalStorage();
19+
channel.bindStore(store1);
20+
21+
// Bind a store with transformation of published data
22+
const store2 = new AsyncLocalStorage();
23+
channel.bindStore(store2, common.mustCall((data) => {
24+
assert.deepStrictEqual(data, inputs[n]);
25+
return { data };
26+
}, 3));
27+
28+
// Regular subscribers should see publishes from runStores calls
29+
channel.subscribe(common.mustCall((data) => {
30+
assert.deepStrictEqual(data, inputs[n]);
31+
}, 3));
32+
33+
// Verify stores are empty before run
34+
assert.strictEqual(store1.getStore(), undefined);
35+
assert.strictEqual(store2.getStore(), undefined);
36+
37+
channel.runStores(inputs[n], common.mustCall(function(a, b) {
38+
// Verify this and argument forwarding
39+
assert.deepStrictEqual(this, thisArg);
40+
assert.strictEqual(a, 1);
41+
assert.strictEqual(b, 2);
42+
43+
// Verify store 1 state matches input
44+
assert.deepStrictEqual(store1.getStore(), inputs[n]);
45+
46+
// Verify store 2 state has expected transformation
47+
assert.deepStrictEqual(store2.getStore(), { data: inputs[n] });
48+
49+
// Should support nested contexts
50+
n++;
51+
channel.runStores(inputs[n], common.mustCall(function() {
52+
// Verify this and argument forwarding
53+
assert.strictEqual(this, undefined);
54+
55+
// Verify store 1 state matches input
56+
assert.deepStrictEqual(store1.getStore(), inputs[n]);
57+
58+
// Verify store 2 state has expected transformation
59+
assert.deepStrictEqual(store2.getStore(), { data: inputs[n] });
60+
}));
61+
n--;
62+
63+
// Verify store 1 state matches input
64+
assert.deepStrictEqual(store1.getStore(), inputs[n]);
65+
66+
// Verify store 2 state has expected transformation
67+
assert.deepStrictEqual(store2.getStore(), { data: inputs[n] });
68+
}), thisArg, 1, 2);
69+
70+
// Verify stores are empty after run
71+
assert.strictEqual(store1.getStore(), undefined);
72+
assert.strictEqual(store2.getStore(), undefined);
73+
74+
// Verify unbinding works
75+
assert.ok(channel.unbindStore(store1));
76+
77+
// Verify unbinding a store that is not bound returns false
78+
assert.ok(!channel.unbindStore(store1));
79+
80+
n++;
81+
channel.runStores(inputs[n], common.mustCall(() => {
82+
// Verify after unbinding store 1 will remain undefined
83+
assert.strictEqual(store1.getStore(), undefined);
84+
85+
// Verify still bound store 2 receives expected data
86+
assert.deepStrictEqual(store2.getStore(), { data: inputs[n] });
87+
}));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const dc = require('diagnostics_channel');
5+
const assert = require('assert');
6+
7+
const channel = dc.tracingChannel('test');
8+
9+
const expectedError = new Error('test');
10+
const input = { foo: 'bar' };
11+
const thisArg = { baz: 'buz' };
12+
13+
function check(found) {
14+
assert.deepStrictEqual(found, input);
15+
}
16+
17+
const handlers = {
18+
start: common.mustCall(check, 2),
19+
end: common.mustCall(check, 2),
20+
asyncStart: common.mustCall(check, 2),
21+
asyncEnd: common.mustCall(check, 2),
22+
error: common.mustCall((found) => {
23+
check(found);
24+
assert.deepStrictEqual(found.error, expectedError);
25+
}, 2)
26+
};
27+
28+
channel.subscribe(handlers);
29+
30+
channel.traceCallback(function(cb, err) {
31+
assert.deepStrictEqual(this, thisArg);
32+
setImmediate(cb, err);
33+
}, 0, input, thisArg, common.mustCall((err, res) => {
34+
assert.strictEqual(err, expectedError);
35+
assert.strictEqual(res, undefined);
36+
}), expectedError);
37+
38+
channel.tracePromise(function(value) {
39+
assert.deepStrictEqual(this, thisArg);
40+
return Promise.reject(value);
41+
}, input, thisArg, expectedError).then(
42+
common.mustNotCall(),
43+
common.mustCall((value) => {
44+
assert.deepStrictEqual(value, expectedError);
45+
})
46+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const dc = require('diagnostics_channel');
5+
const assert = require('assert');
6+
7+
const channel = dc.tracingChannel('test');
8+
9+
const expectedResult = { foo: 'bar' };
10+
const input = { foo: 'bar' };
11+
const thisArg = { baz: 'buz' };
12+
13+
function check(found) {
14+
assert.deepStrictEqual(found, input);
15+
}
16+
17+
const handlers = {
18+
start: common.mustCall(check, 2),
19+
end: common.mustCall(check, 2),
20+
asyncStart: common.mustCall((found) => {
21+
check(found);
22+
assert.strictEqual(found.error, undefined);
23+
assert.deepStrictEqual(found.result, expectedResult);
24+
}, 2),
25+
asyncEnd: common.mustCall((found) => {
26+
check(found);
27+
assert.strictEqual(found.error, undefined);
28+
assert.deepStrictEqual(found.result, expectedResult);
29+
}, 2),
30+
error: common.mustNotCall()
31+
};
32+
33+
channel.subscribe(handlers);
34+
35+
channel.traceCallback(function(cb, err, res) {
36+
assert.deepStrictEqual(this, thisArg);
37+
setImmediate(cb, err, res);
38+
}, 0, input, thisArg, common.mustCall((err, res) => {
39+
assert.strictEqual(err, null);
40+
assert.deepStrictEqual(res, expectedResult);
41+
}), null, expectedResult);
42+
43+
channel.tracePromise(function(value) {
44+
assert.deepStrictEqual(this, thisArg);
45+
return Promise.resolve(value);
46+
}, input, thisArg, expectedResult).then(
47+
common.mustCall((value) => {
48+
assert.deepStrictEqual(value, expectedResult);
49+
}),
50+
common.mustNotCall()
51+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const dc = require('diagnostics_channel');
5+
const assert = require('assert');
6+
7+
const channel = dc.tracingChannel('test');
8+
9+
const expectedError = new Error('test');
10+
const input = { foo: 'bar' };
11+
const thisArg = { baz: 'buz' };
12+
13+
function check(found) {
14+
assert.deepStrictEqual(found, input);
15+
}
16+
17+
const handlers = {
18+
start: common.mustCall(check),
19+
end: common.mustCall(check),
20+
asyncStart: common.mustNotCall(),
21+
asyncEnd: common.mustNotCall(),
22+
error: common.mustCall((found) => {
23+
check(found);
24+
assert.deepStrictEqual(found.error, expectedError);
25+
})
26+
};
27+
28+
channel.subscribe(handlers);
29+
try {
30+
channel.traceSync(function(err) {
31+
assert.deepStrictEqual(this, thisArg);
32+
assert.strictEqual(err, expectedError);
33+
throw err;
34+
}, input, thisArg, expectedError);
35+
36+
throw new Error('It should not reach this error');
37+
} catch (error) {
38+
assert.deepStrictEqual(error, expectedError);
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const dc = require('diagnostics_channel');
5+
const assert = require('assert');
6+
7+
const channel = dc.tracingChannel('test');
8+
9+
const expectedResult = { foo: 'bar' };
10+
const input = { foo: 'bar' };
11+
12+
function check(found) {
13+
assert.deepStrictEqual(found, input);
14+
}
15+
16+
const handlers = {
17+
start: common.mustCall(check),
18+
end: common.mustCall((found) => {
19+
check(found);
20+
assert.deepStrictEqual(found.result, expectedResult);
21+
}),
22+
asyncStart: common.mustNotCall(),
23+
asyncEnd: common.mustNotCall(),
24+
error: common.mustNotCall()
25+
};
26+
27+
assert.strictEqual(channel.start.hasSubscribers, false);
28+
channel.subscribe(handlers);
29+
assert.strictEqual(channel.start.hasSubscribers, true);
30+
channel.traceSync(() => {
31+
return expectedResult;
32+
}, input);
33+
34+
channel.unsubscribe(handlers);
35+
assert.strictEqual(channel.start.hasSubscribers, false);
36+
channel.traceSync(() => {
37+
return expectedResult;
38+
}, input);

‎tools/doc/type-parser.mjs

+3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ const customTypesMap = {
5757
'Module Namespace Object':
5858
'https://tc39.github.io/ecma262/#sec-module-namespace-exotic-objects',
5959

60+
'AsyncLocalStorage': 'async_context.html#class-asynclocalstorage',
61+
6062
'AsyncHook': 'async_hooks.html#async_hookscreatehookcallbacks',
6163
'AsyncResource': 'async_hooks.html#class-asyncresource',
6264

@@ -108,6 +110,7 @@ const customTypesMap = {
108110
'dgram.Socket': 'dgram.html#class-dgramsocket',
109111

110112
'Channel': 'diagnostics_channel.html#class-channel',
113+
'TracingChannel': 'diagnostics_channel.html#class-tracingchannel',
111114

112115
'Domain': 'domain.html#class-domain',
113116

0 commit comments

Comments
 (0)
Please sign in to comment.