Skip to content

Commit 1215d64

Browse files
author
Stephen Belanger
committed
lib: add tracing channel to diagnostics_channel
1 parent 22c645d commit 1215d64

8 files changed

+1074
-21
lines changed

doc/api/diagnostics_channel.md

+560
Large diffs are not rendered by default.

lib/diagnostics_channel.js

+250-21
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
'use strict';
22

33
const {
4+
ArrayPrototypeAt,
45
ArrayPrototypeIndexOf,
56
ArrayPrototypePush,
67
ArrayPrototypeSplice,
8+
FunctionPrototypeBind,
79
ObjectCreate,
810
ObjectGetPrototypeOf,
911
ObjectSetPrototypeOf,
12+
PromisePrototypeThen,
13+
PromiseReject,
14+
ReflectApply,
15+
SafeMap,
1016
SymbolHasInstance,
1117
} = primordials;
1218

@@ -23,11 +29,44 @@ const { triggerUncaughtException } = internalBinding('errors');
2329

2430
const { WeakReference } = internalBinding('util');
2531

32+
function decRef(channel) {
33+
channel._weak.decRef();
34+
if (channel._weak.getRef() === 0) {
35+
delete channels[channel.name];
36+
}
37+
}
38+
39+
function markActive(channel) {
40+
// eslint-disable-next-line no-use-before-define
41+
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
42+
channel._subscribers = [];
43+
channel._stores = new SafeMap();
44+
}
45+
46+
function maybeMarkInactive(channel) {
47+
// When there are no more active subscribers, restore to fast prototype.
48+
if (!channel._subscribers.length && !channel._stores.size) {
49+
// eslint-disable-next-line no-use-before-define
50+
ObjectSetPrototypeOf(channel, Channel.prototype);
51+
channel._subscribers = undefined;
52+
channel._stores = undefined;
53+
}
54+
}
55+
56+
function defaultTransform(data) {
57+
return data
58+
}
59+
60+
function wrapStoreRun(store, data, next, transform = defaultTransform) {
61+
return () => store.run(transform(data), next);
62+
}
63+
2664
// TODO(qard): should there be a C++ channel interface?
2765
class ActiveChannel {
2866
subscribe(subscription) {
2967
validateFunction(subscription, 'subscription');
3068
ArrayPrototypePush(this._subscribers, subscription);
69+
this._weak.incRef();
3170
}
3271

3372
unsubscribe(subscription) {
@@ -36,12 +75,28 @@ class ActiveChannel {
3675

3776
ArrayPrototypeSplice(this._subscribers, index, 1);
3877

39-
// When there are no more active subscribers, restore to fast prototype.
40-
if (!this._subscribers.length) {
41-
// eslint-disable-next-line no-use-before-define
42-
ObjectSetPrototypeOf(this, Channel.prototype);
78+
decRef(this);
79+
maybeMarkInactive(this);
80+
81+
return true;
82+
}
83+
84+
bindStore(store, transform) {
85+
const replacing = this._stores.has(store);
86+
if (!replacing) this._weak.incRef();
87+
this._stores.set(store, transform);
88+
}
89+
90+
unbindStore(store) {
91+
if (!this._stores.has(store)) {
92+
return false;
4393
}
4494

95+
this._stores.delete(store);
96+
97+
decRef(this);
98+
maybeMarkInactive(this);
99+
45100
return true;
46101
}
47102

@@ -61,11 +116,28 @@ class ActiveChannel {
61116
}
62117
}
63118
}
119+
120+
runStores(data, fn, thisArg, ...args) {
121+
this.publish(data);
122+
123+
// Bind base fn first due to AsyncLocalStorage.run not having thisArg
124+
fn = FunctionPrototypeBind(fn, thisArg, ...args);
125+
126+
for (const entry of this._stores.entries()) {
127+
const store = entry[0];
128+
const transform = entry[1];
129+
fn = wrapStoreRun(store, data, fn, transform);
130+
}
131+
132+
return fn();
133+
}
64134
}
65135

66136
class Channel {
67137
constructor(name) {
68138
this._subscribers = undefined;
139+
this._stores = undefined;
140+
this._weak = undefined;
69141
this.name = name;
70142
}
71143

@@ -76,20 +148,32 @@ class Channel {
76148
}
77149

78150
subscribe(subscription) {
79-
ObjectSetPrototypeOf(this, ActiveChannel.prototype);
80-
this._subscribers = [];
151+
markActive(this);
81152
this.subscribe(subscription);
82153
}
83154

84155
unsubscribe() {
85156
return false;
86157
}
87158

159+
bindStore(store, transform) {
160+
markActive(this);
161+
this.bindStore(store, transform);
162+
}
163+
164+
unbindStore() {
165+
return false;
166+
}
167+
88168
get hasSubscribers() {
89169
return false;
90170
}
91171

92172
publish() {}
173+
174+
runStores(data, fn, thisArg, ...args) {
175+
return ReflectApply(fn, thisArg, args);
176+
}
93177
}
94178

95179
const channels = ObjectCreate(null);
@@ -105,27 +189,17 @@ function channel(name) {
105189
}
106190

107191
channel = new Channel(name);
108-
channels[name] = new WeakReference(channel);
192+
channel._weak = new WeakReference(channel);
193+
channels[name] = channel._weak;
109194
return channel;
110195
}
111196

112197
function subscribe(name, subscription) {
113-
const chan = channel(name);
114-
channels[name].incRef();
115-
chan.subscribe(subscription);
198+
return channel(name).subscribe(subscription);
116199
}
117200

118201
function unsubscribe(name, subscription) {
119-
const chan = channel(name);
120-
if (!chan.unsubscribe(subscription)) {
121-
return false;
122-
}
123-
124-
channels[name].decRef();
125-
if (channels[name].getRef() === 0) {
126-
delete channels[name];
127-
}
128-
return true;
202+
return channel(name).unsubscribe(subscription);
129203
}
130204

131205
function hasSubscribers(name) {
@@ -139,10 +213,165 @@ function hasSubscribers(name) {
139213
return channel.hasSubscribers;
140214
}
141215

216+
const traceEvents = [
217+
'start',
218+
'end',
219+
'asyncStart',
220+
'asyncEnd',
221+
'error',
222+
];
223+
224+
function assertChannel(value, name) {
225+
if (!(value instanceof Channel)) {
226+
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
227+
}
228+
}
229+
230+
class TracingChannel {
231+
constructor(nameOrChannels) {
232+
if (typeof nameOrChannels === 'string') {
233+
this.start = channel(`tracing:${nameOrChannels}:start`);
234+
this.end = channel(`tracing:${nameOrChannels}:end`);
235+
this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
236+
this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
237+
this.error = channel(`tracing:${nameOrChannels}:error`);
238+
} else if (typeof nameOrChannels === 'object') {
239+
const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;
240+
241+
assertChannel(start, 'nameOrChannels.start');
242+
assertChannel(end, 'nameOrChannels.end');
243+
assertChannel(asyncStart, 'nameOrChannels.asyncStart');
244+
assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
245+
assertChannel(error, 'nameOrChannels.error');
246+
247+
this.start = start;
248+
this.end = end;
249+
this.asyncStart = asyncStart;
250+
this.asyncEnd = asyncEnd;
251+
this.error = error;
252+
} else {
253+
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
254+
['string', 'object', 'Channel'],
255+
nameOrChannels);
256+
}
257+
}
258+
259+
subscribe(handlers) {
260+
for (const name of traceEvents) {
261+
if (!handlers[name]) continue;
262+
263+
this[name]?.subscribe(handlers[name]);
264+
}
265+
}
266+
267+
unsubscribe(handlers) {
268+
let done = true;
269+
270+
for (const name of traceEvents) {
271+
if (!handlers[name]) continue;
272+
273+
if (!this[name]?.unsubscribe(handlers[name])) {
274+
done = false;
275+
}
276+
}
277+
278+
return done;
279+
}
280+
281+
traceSync(fn, ctx = {}, thisArg, ...args) {
282+
const { start, end, error } = this;
283+
284+
try {
285+
const result = start.runStores(ctx, fn, thisArg, ...args);
286+
ctx.result = result;
287+
return result;
288+
} catch (err) {
289+
ctx.error = err;
290+
error.publish(ctx);
291+
throw err;
292+
} finally {
293+
end.publish(ctx);
294+
}
295+
}
296+
297+
tracePromise(fn, ctx = {}, thisArg, ...args) {
298+
const { start, end, asyncStart, asyncEnd, error } = this;
299+
300+
function reject(err) {
301+
ctx.error = err;
302+
error.publish(ctx);
303+
asyncStart.publish(ctx);
304+
// TODO: Is there a way to have asyncEnd _after_ the continuation?
305+
asyncEnd.publish(ctx);
306+
return PromiseReject(err);
307+
}
308+
309+
function resolve(result) {
310+
ctx.result = result;
311+
asyncStart.publish(ctx);
312+
// TODO: Is there a way to have asyncEnd _after_ the continuation?
313+
asyncEnd.publish(ctx);
314+
return result;
315+
}
316+
317+
try {
318+
const promise = start.runStores(ctx, fn, thisArg, ...args);
319+
return PromisePrototypeThen(promise, resolve, reject);
320+
} catch (err) {
321+
ctx.error = err;
322+
error.publish(ctx);
323+
throw err;
324+
} finally {
325+
end.publish(ctx);
326+
}
327+
}
328+
329+
traceCallback(fn, position = 0, ctx = {}, thisArg, ...args) {
330+
const { start, end, asyncStart, asyncEnd, error } = this;
331+
332+
function wrappedCallback(err, res) {
333+
if (err) {
334+
ctx.error = err;
335+
error.publish(ctx);
336+
} else {
337+
ctx.result = res;
338+
}
339+
340+
asyncStart.publish(ctx);
341+
try {
342+
if (callback) {
343+
return ReflectApply(callback, this, arguments);
344+
}
345+
} finally {
346+
asyncEnd.publish(ctx);
347+
}
348+
}
349+
350+
const callback = ArrayPrototypeAt(args, position);
351+
ArrayPrototypeSplice(args, position, 1, wrappedCallback);
352+
353+
try {
354+
return start.runStores(ctx, fn, thisArg, ...args);
355+
} catch (err) {
356+
ctx.error = err;
357+
error.publish(ctx);
358+
throw err;
359+
} finally {
360+
end.publish(ctx);
361+
}
362+
}
363+
}
364+
365+
function tracingChannel(nameOrChannels) {
366+
return new TracingChannel(nameOrChannels);
367+
}
368+
142369
module.exports = {
143370
channel,
144371
hasSubscribers,
145372
subscribe,
373+
tracingChannel,
146374
unsubscribe,
147-
Channel
375+
Channel,
376+
TracingChannel
148377
};

0 commit comments

Comments
 (0)