Skip to content

Commit c47419e

Browse files
committed
worker: allow retrieving elu from parent
Usually, when extracting the ELU from a specific JS thread is better to do it from a different thread as the event loop we're observing might already be blocked. The `Worker.performance.eventLoopUtilization()` method allows us to do this for worker threads, but there's not a way to do this for the main thread. This new API, which allows us to retrieve the ELU of the parent thread from a specific worker, is going to enable this. For the moment, I have defined this new API in ``` require('worker_threads').parent.performance.eventLoopUtilization() ``` though I haven't added documentation yet as a) I want to know first whether this approach is acceptable, and in case it is, b) I'm not really sure whether that's the place the API should live in. Would love receiving feedback on this.
1 parent 28bf031 commit c47419e

7 files changed

+125
-3
lines changed

lib/internal/main/worker_thread.js

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const {
2929
messageTypes: {
3030
// Messages that may be received by workers
3131
LOAD_SCRIPT,
32+
PARENT_LOOP_START,
3233
// Messages that may be posted from workers
3334
UP_AND_RUNNING,
3435
ERROR_MESSAGE,
@@ -159,6 +160,8 @@ port.on('message', (message) => {
159160
const CJSLoader = require('internal/modules/cjs/loader');
160161
CJSLoader.Module.runMain(filename);
161162
}
163+
} else if (message.type === PARENT_LOOP_START) {
164+
require('internal/worker').setParentEventLoopStartTime(message.value);
162165
} else if (message.type === STDIO_PAYLOAD) {
163166
const { stream, chunks } = message;
164167
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {

lib/internal/worker.js

+45-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const {
2727
const EventEmitter = require('events');
2828
const assert = require('internal/assert');
2929
const path = require('path');
30+
const { setImmediate } = require('timers');
3031
const {
3132
internalEventLoopUtilization
3233
} = require('internal/perf/event_loop_utilization');
@@ -60,6 +61,13 @@ const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url');
6061
const { kEmptyObject } = require('internal/util');
6162
const { validateArray } = require('internal/validators');
6263

64+
const {
65+
constants: {
66+
NODE_PERFORMANCE_MILESTONE_LOOP_START,
67+
},
68+
milestones,
69+
} = internalBinding('performance');
70+
6371
const {
6472
ownsProcessState,
6573
isMainThread,
@@ -70,7 +78,8 @@ const {
7078
kMaxOldGenerationSizeMb,
7179
kCodeRangeSizeMb,
7280
kStackSizeMb,
73-
kTotalResourceLimitCount
81+
kTotalResourceLimitCount,
82+
parentLoopIdleTime,
7483
} = internalBinding('worker');
7584

7685
const kHandle = Symbol('kHandle');
@@ -83,6 +92,7 @@ const kOnErrorMessage = Symbol('kOnErrorMessage');
8392
const kParentSideStdio = Symbol('kParentSideStdio');
8493
const kLoopStartTime = Symbol('kLoopStartTime');
8594
const kIsOnline = Symbol('kIsOnline');
95+
const kSendLoopStart = Symbol('kSendLoopStart');
8696

8797
const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV');
8898
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
@@ -265,6 +275,13 @@ class Worker extends EventEmitter {
265275
this[kHandle].startThread();
266276

267277
process.nextTick(() => process.emit('worker', this));
278+
// Send current thread loopStart to the worker. In case the loop has not yet
279+
// started, send it after the poll phase of the loop has completed.
280+
if (milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] === -1) {
281+
setImmediate(() => this[kSendLoopStart]());
282+
} else {
283+
this[kSendLoopStart]();
284+
}
268285
if (workerThreadsChannel.hasSubscribers) {
269286
workerThreadsChannel.publish({
270287
worker: this,
@@ -346,6 +363,13 @@ class Worker extends EventEmitter {
346363
}
347364
}
348365

366+
[kSendLoopStart]() {
367+
this[kPort]?.postMessage({
368+
type: messageTypes.PARENT_LOOP_START,
369+
value: milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] / 1e6
370+
});
371+
}
372+
349373
postMessage(...args) {
350374
if (this[kPublicPort] === null) return;
351375

@@ -490,6 +514,24 @@ function eventLoopUtilization(util1, util2) {
490514
);
491515
}
492516

517+
let parentEventLoopStartTime = -1;
518+
function setParentEventLoopStartTime(time) {
519+
parentEventLoopStartTime = time;
520+
}
521+
522+
function parentEventLoopUtilization(util1, util2) {
523+
if (parentEventLoopStartTime === -1) {
524+
return { idle: 0, active: 0, utilization: 0 };
525+
}
526+
527+
return internalEventLoopUtilization(
528+
parentEventLoopStartTime,
529+
parentLoopIdleTime(),
530+
util1,
531+
util2
532+
);
533+
}
534+
493535
module.exports = {
494536
ownsProcessState,
495537
isMainThread,
@@ -501,4 +543,6 @@ module.exports = {
501543
assignEnvironmentData,
502544
threadId,
503545
Worker,
546+
setParentEventLoopStartTime,
547+
parentEventLoopUtilization,
504548
};

lib/internal/worker/io.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ const messageTypes = {
8989
ERROR_MESSAGE: 'errorMessage',
9090
STDIO_PAYLOAD: 'stdioPayload',
9191
STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
92-
LOAD_SCRIPT: 'loadScript'
92+
LOAD_SCRIPT: 'loadScript',
93+
PARENT_LOOP_START: 'parentLoopStart',
9394
};
9495

9596
// We have to mess with the MessagePort prototype a bit, so that a) we can make

lib/worker_threads.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ const {
77
setEnvironmentData,
88
getEnvironmentData,
99
threadId,
10-
Worker
10+
Worker,
11+
parentEventLoopUtilization,
1112
} = require('internal/worker');
1213

1314
const {
@@ -38,4 +39,9 @@ module.exports = {
3839
BroadcastChannel,
3940
setEnvironmentData,
4041
getEnvironmentData,
42+
parent: isMainThread ? null : {
43+
performance: {
44+
eventLoopUtilization: parentEventLoopUtilization,
45+
}
46+
},
4147
};

src/node_worker.cc

+11
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ Worker::Worker(Environment* env,
5656
per_isolate_opts_(per_isolate_opts),
5757
exec_argv_(exec_argv),
5858
platform_(env->isolate_data()->platform()),
59+
parent_loop_(env->event_loop()),
5960
thread_id_(AllocateEnvironmentThreadId()),
6061
env_vars_(env_vars),
6162
snapshot_data_(snapshot_data) {
@@ -865,6 +866,14 @@ void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
865866
args.GetReturnValue().Set(loop_start_time / 1e6);
866867
}
867868

869+
void Worker::ParentLoopIdleTime(const FunctionCallbackInfo<Value>& args) {
870+
Environment* env = Environment::GetCurrent(args);
871+
CHECK(!env->is_main_thread());
872+
Worker* w = env->worker_context();
873+
uint64_t idle_time = uv_metrics_idle_time(w->parent_loop_);
874+
args.GetReturnValue().Set(1.0 * idle_time / 1e6);
875+
}
876+
868877
namespace {
869878

870879
// Return the MessagePort that is global for this Environment and communicates
@@ -921,6 +930,7 @@ void InitWorker(Local<Object> target,
921930
}
922931

923932
SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
933+
SetMethod(context, target, "parentLoopIdleTime", Worker::ParentLoopIdleTime);
924934

925935
target
926936
->Set(env->context(),
@@ -967,6 +977,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
967977
registry->Register(Worker::TakeHeapSnapshot);
968978
registry->Register(Worker::LoopIdleTime);
969979
registry->Register(Worker::LoopStartTime);
980+
registry->Register(Worker::ParentLoopIdleTime);
970981
}
971982

972983
} // anonymous namespace

src/node_worker.h

+3
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class Worker : public AsyncWrap {
7575
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
7676
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
7777
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
78+
static void ParentLoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>&);
7879

7980
private:
8081
bool CreateEnvMessagePort(Environment* env);
@@ -91,6 +92,8 @@ class Worker : public AsyncWrap {
9192

9293
std::unique_ptr<InspectorParentHandle> inspector_parent_handle_;
9394

95+
uv_loop_t* parent_loop_;
96+
9497
// This mutex protects access to all variables listed below it.
9598
mutable Mutex mutex_;
9699

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
'use strict';
2+
3+
const { mustCall } = require('../common');
4+
5+
const TIMEOUT = 10;
6+
const SPIN_DUR = 50;
7+
8+
const assert = require('assert');
9+
const { Worker, parent, workerData } = require('worker_threads');
10+
11+
// Do not use isMainThread directly, otherwise the test would time out in case
12+
// it's started inside of another worker thread.
13+
if (!process.env.HAS_STARTED_WORKER) {
14+
process.env.HAS_STARTED_WORKER = '1';
15+
const i32arr = new Int32Array(new SharedArrayBuffer(4));
16+
const w = new Worker(__filename, { workerData: i32arr });
17+
w.on('online', mustCall(() => {
18+
Atomics.wait(i32arr, 0, 0);
19+
20+
const t = Date.now();
21+
while (Date.now() - t < SPIN_DUR);
22+
23+
Atomics.store(i32arr, 0, 0);
24+
Atomics.notify(i32arr, 0);
25+
Atomics.wait(i32arr, 0, 0);
26+
}));
27+
} else {
28+
setTimeout(() => {
29+
const { eventLoopUtilization } = parent.performance;
30+
const i32arr = workerData;
31+
const elu1 = eventLoopUtilization();
32+
33+
Atomics.store(i32arr, 0, 1);
34+
Atomics.notify(i32arr, 0);
35+
Atomics.wait(i32arr, 0, 1);
36+
37+
const elu2 = eventLoopUtilization(elu1);
38+
const elu3 = eventLoopUtilization();
39+
const elu4 = eventLoopUtilization(elu3, elu1);
40+
41+
assert.strictEqual(elu2.idle, 0);
42+
assert.strictEqual(elu4.idle, 0);
43+
assert.strictEqual(elu2.utilization, 1);
44+
assert.strictEqual(elu4.utilization, 1);
45+
assert.strictEqual(elu3.active - elu1.active, elu4.active);
46+
assert.ok(elu2.active > SPIN_DUR - 10, `${elu2.active} <= ${SPIN_DUR - 10}`);
47+
assert.ok(elu2.active < elu4.active, `${elu2.active} >= ${elu4.active}`);
48+
assert.ok(elu3.active > elu2.active, `${elu3.active} <= ${elu2.active}`);
49+
assert.ok(elu3.active > elu4.active, `${elu3.active} <= ${elu4.active}`);
50+
51+
Atomics.store(i32arr, 0, 1);
52+
Atomics.notify(i32arr, 0);
53+
}, TIMEOUT);
54+
}

0 commit comments

Comments
 (0)