4
4
ArrayPrototypeIndexOf,
5
5
ArrayPrototypePush,
6
6
ArrayPrototypeSplice,
7
+ FunctionPrototypeBind,
7
8
ObjectCreate,
8
9
ObjectGetPrototypeOf,
9
10
ObjectSetPrototypeOf,
11
+ PromisePrototypeThen,
12
+ PromiseReject,
13
+ ReflectApply,
14
+ SafeMap,
10
15
SymbolHasInstance,
11
16
} = primordials ;
12
17
@@ -23,11 +28,40 @@ const { triggerUncaughtException } = internalBinding('errors');
23
28
24
29
const { WeakReference } = internalBinding ( 'util' ) ;
25
30
31
+ function decRef ( channel ) {
32
+ channel . _weak . decRef ( ) ;
33
+ if ( channel . _weak . getRef ( ) === 0 ) {
34
+ delete channels [ channel . name ] ;
35
+ }
36
+ }
37
+
38
+ function markActive ( channel ) {
39
+ // eslint-disable-next-line no-use-before-define
40
+ ObjectSetPrototypeOf ( channel , ActiveChannel . prototype ) ;
41
+ channel . _subscribers = [ ] ;
42
+ channel . _stores = new SafeMap ( ) ;
43
+ }
44
+
45
+ function maybeMarkInactive ( channel ) {
46
+ // When there are no more active subscribers, restore to fast prototype.
47
+ if ( ! channel . _subscribers . length && ! channel . _stores . size ) {
48
+ // eslint-disable-next-line no-use-before-define
49
+ ObjectSetPrototypeOf ( channel , Channel . prototype ) ;
50
+ channel . _subscribers = undefined ;
51
+ channel . _stores = undefined ;
52
+ }
53
+ }
54
+
55
+ function wrapStoreRun ( store , data , next , transform = ( v ) => v ) {
56
+ return ( ) => store . run ( transform ( data ) , next ) ;
57
+ }
58
+
26
59
// TODO(qard): should there be a C++ channel interface?
27
60
class ActiveChannel {
28
61
subscribe ( subscription ) {
29
62
validateFunction ( subscription , 'subscription' ) ;
30
63
ArrayPrototypePush ( this . _subscribers , subscription ) ;
64
+ this . _weak . incRef ( ) ;
31
65
}
32
66
33
67
unsubscribe ( subscription ) {
@@ -36,12 +70,28 @@ class ActiveChannel {
36
70
37
71
ArrayPrototypeSplice ( this . _subscribers , index , 1 ) ;
38
72
39
- // When there are no more active subscribers, restore to fast prototype.
40
- if ( ! this . _subscribers . length ) {
41
- // eslint-disable-next-line no-use-before-define
42
- ObjectSetPrototypeOf ( this , Channel . prototype ) ;
73
+ decRef ( this ) ;
74
+ maybeMarkInactive ( this ) ;
75
+
76
+ return true ;
77
+ }
78
+
79
+ bindStore ( store , transform ) {
80
+ const replacing = this . _stores . has ( store ) ;
81
+ if ( ! replacing ) this . _weak . incRef ( ) ;
82
+ this . _stores . set ( store , transform ) ;
83
+ }
84
+
85
+ unbindStore ( store ) {
86
+ if ( ! this . _stores . has ( store ) ) {
87
+ return false ;
43
88
}
44
89
90
+ this . _stores . delete ( store ) ;
91
+
92
+ decRef ( this ) ;
93
+ maybeMarkInactive ( this ) ;
94
+
45
95
return true ;
46
96
}
47
97
@@ -61,11 +111,28 @@ class ActiveChannel {
61
111
}
62
112
}
63
113
}
114
+
115
+ runStores ( data , fn , thisArg , ...args ) {
116
+ this . publish ( data ) ;
117
+
118
+ // Bind base fn first due to AsyncLocalStorage.run not having thisArg
119
+ fn = FunctionPrototypeBind ( fn , thisArg , ...args ) ;
120
+
121
+ for ( const entry of this . _stores . entries ( ) ) {
122
+ const store = entry [ 0 ] ;
123
+ const transform = entry [ 1 ] ;
124
+ fn = wrapStoreRun ( store , data , fn , transform ) ;
125
+ }
126
+
127
+ return fn ( ) ;
128
+ }
64
129
}
65
130
66
131
class Channel {
67
132
constructor ( name ) {
68
133
this . _subscribers = undefined ;
134
+ this . _stores = undefined ;
135
+ this . _weak = undefined ;
69
136
this . name = name ;
70
137
}
71
138
@@ -76,20 +143,32 @@ class Channel {
76
143
}
77
144
78
145
subscribe ( subscription ) {
79
- ObjectSetPrototypeOf ( this , ActiveChannel . prototype ) ;
80
- this . _subscribers = [ ] ;
146
+ markActive ( this ) ;
81
147
this . subscribe ( subscription ) ;
82
148
}
83
149
84
150
unsubscribe ( ) {
85
151
return false ;
86
152
}
87
153
154
+ bindStore ( store , transform = ( v ) => v ) {
155
+ markActive ( this ) ;
156
+ this . bindStore ( store , transform ) ;
157
+ }
158
+
159
+ unbindStore ( ) {
160
+ return false ;
161
+ }
162
+
88
163
get hasSubscribers ( ) {
89
164
return false ;
90
165
}
91
166
92
167
publish ( ) { }
168
+
169
+ runStores ( data , fn , thisArg , ...args ) {
170
+ return ReflectApply ( fn , thisArg , args ) ;
171
+ }
93
172
}
94
173
95
174
const channels = ObjectCreate ( null ) ;
@@ -105,27 +184,17 @@ function channel(name) {
105
184
}
106
185
107
186
channel = new Channel ( name ) ;
108
- channels [ name ] = new WeakReference ( channel ) ;
187
+ channel . _weak = new WeakReference ( channel ) ;
188
+ channels [ name ] = channel . _weak ;
109
189
return channel ;
110
190
}
111
191
112
192
function subscribe ( name , subscription ) {
113
- const chan = channel ( name ) ;
114
- channels [ name ] . incRef ( ) ;
115
- chan . subscribe ( subscription ) ;
193
+ return channel ( name ) . subscribe ( subscription ) ;
116
194
}
117
195
118
196
function unsubscribe ( name , subscription ) {
119
- const chan = channel ( name ) ;
120
- if ( ! chan . unsubscribe ( subscription ) ) {
121
- return false ;
122
- }
123
-
124
- channels [ name ] . decRef ( ) ;
125
- if ( channels [ name ] . getRef ( ) === 0 ) {
126
- delete channels [ name ] ;
127
- }
128
- return true ;
197
+ return channel ( name ) . unsubscribe ( subscription ) ;
129
198
}
130
199
131
200
function hasSubscribers ( name ) {
@@ -139,10 +208,168 @@ function hasSubscribers(name) {
139
208
return channel . hasSubscribers ;
140
209
}
141
210
211
+ const traceEvents = [
212
+ 'start' ,
213
+ 'end' ,
214
+ 'asyncStart' ,
215
+ 'asyncEnd' ,
216
+ 'error' ,
217
+ ] ;
218
+
219
+ function assertChannel ( value , name ) {
220
+ if ( ! ( value instanceof Channel ) ) {
221
+ throw new ERR_INVALID_ARG_TYPE ( name , [ 'Channel' ] , value ) ;
222
+ }
223
+ }
224
+
225
+ class TracingChannel {
226
+ constructor ( nameOrChannels ) {
227
+ if ( typeof nameOrChannels === 'string' ) {
228
+ this . start = channel ( `tracing:${ nameOrChannels } :start` ) ;
229
+ this . end = channel ( `tracing:${ nameOrChannels } :end` ) ;
230
+ this . asyncStart = channel ( `tracing:${ nameOrChannels } :asyncStart` ) ;
231
+ this . asyncEnd = channel ( `tracing:${ nameOrChannels } :asyncEnd` ) ;
232
+ this . error = channel ( `tracing:${ nameOrChannels } :error` ) ;
233
+ } else if ( typeof nameOrChannels === 'object' ) {
234
+ const { start, end, asyncStart, asyncEnd, error } = nameOrChannels ;
235
+
236
+ assertChannel ( start , 'nameOrChannels.start' ) ;
237
+ assertChannel ( end , 'nameOrChannels.end' ) ;
238
+ assertChannel ( asyncStart , 'nameOrChannels.asyncStart' ) ;
239
+ assertChannel ( asyncEnd , 'nameOrChannels.asyncEnd' ) ;
240
+ assertChannel ( error , 'nameOrChannels.error' ) ;
241
+
242
+ this . start = start ;
243
+ this . end = end ;
244
+ this . asyncStart = asyncStart ;
245
+ this . asyncEnd = asyncEnd ;
246
+ this . error = error ;
247
+ } else {
248
+ throw new ERR_INVALID_ARG_TYPE ( 'nameOrChannels' ,
249
+ [ 'string' , 'object' , 'Channel' ] ,
250
+ nameOrChannels ) ;
251
+ }
252
+ }
253
+
254
+ subscribe ( handlers ) {
255
+ for ( const name of traceEvents ) {
256
+ if ( ! handlers [ name ] ) continue ;
257
+
258
+ this [ name ] ?. subscribe ( handlers [ name ] ) ;
259
+ }
260
+ }
261
+
262
+ unsubscribe ( handlers ) {
263
+ let done = true ;
264
+
265
+ for ( const name of traceEvents ) {
266
+ if ( ! handlers [ name ] ) continue ;
267
+
268
+ if ( ! this [ name ] ?. unsubscribe ( handlers [ name ] ) ) {
269
+ done = false ;
270
+ }
271
+ }
272
+
273
+ return done ;
274
+ }
275
+
276
+ traceSync ( fn , ctx = { } , thisArg , ...args ) {
277
+ const { start, end, error } = this ;
278
+
279
+ try {
280
+ const result = start . runStores ( ctx , fn , thisArg , ...args ) ;
281
+ ctx . result = result ;
282
+ return result ;
283
+ } catch ( err ) {
284
+ ctx . error = err ;
285
+ error . publish ( ctx ) ;
286
+ throw err ;
287
+ } finally {
288
+ end . publish ( ctx ) ;
289
+ }
290
+ }
291
+
292
+ tracePromise ( fn , ctx = { } , thisArg , ...args ) {
293
+ const { start, end, asyncStart, asyncEnd, error } = this ;
294
+
295
+ function reject ( err ) {
296
+ ctx . error = err ;
297
+ error . publish ( ctx ) ;
298
+ asyncStart . publish ( ctx ) ;
299
+ // TODO: Is there a way to have asyncEnd _after_ the continuation?
300
+ asyncEnd . publish ( ctx ) ;
301
+ return PromiseReject ( err ) ;
302
+ }
303
+
304
+ function resolve ( result ) {
305
+ ctx . result = result ;
306
+ asyncStart . publish ( ctx ) ;
307
+ // TODO: Is there a way to have asyncEnd _after_ the continuation?
308
+ asyncEnd . publish ( ctx ) ;
309
+ return result ;
310
+ }
311
+
312
+ try {
313
+ const promise = start . runStores ( ctx , fn , thisArg , ...args ) ;
314
+ return PromisePrototypeThen ( promise , resolve , reject ) ;
315
+ } catch ( err ) {
316
+ ctx . error = err ;
317
+ error . publish ( ctx ) ;
318
+ throw err ;
319
+ } finally {
320
+ end . publish ( ctx ) ;
321
+ }
322
+ }
323
+
324
+ traceCallback ( fn , position = 0 , ctx = { } , thisArg , ...args ) {
325
+ const { start, end, asyncStart, asyncEnd, error } = this ;
326
+
327
+ function wrap ( fn ) {
328
+ return function wrappedCallback ( err , res ) {
329
+ if ( err ) {
330
+ ctx . error = err ;
331
+ error . publish ( ctx ) ;
332
+ } else {
333
+ ctx . result = res ;
334
+ }
335
+
336
+ asyncStart . publish ( ctx ) ;
337
+ try {
338
+ if ( fn ) {
339
+ return ReflectApply ( fn , this , arguments ) ;
340
+ }
341
+ } finally {
342
+ asyncEnd . publish ( ctx ) ;
343
+ }
344
+ } ;
345
+ }
346
+
347
+ if ( position >= 0 ) {
348
+ args . splice ( position , 1 , wrap ( args . at ( position ) ) ) ;
349
+ }
350
+
351
+ try {
352
+ return start . runStores ( ctx , fn , thisArg , ...args ) ;
353
+ } catch ( err ) {
354
+ ctx . error = err ;
355
+ error . publish ( ctx ) ;
356
+ throw err ;
357
+ } finally {
358
+ end . publish ( ctx ) ;
359
+ }
360
+ }
361
+ }
362
+
363
+ function tracingChannel ( nameOrChannels ) {
364
+ return new TracingChannel ( nameOrChannels ) ;
365
+ }
366
+
142
367
module . exports = {
143
368
channel,
144
369
hasSubscribers,
145
370
subscribe,
371
+ tracingChannel,
146
372
unsubscribe,
147
- Channel
373
+ Channel,
374
+ TracingChannel
148
375
} ;
0 commit comments