Skip to content

Commit b4819db

Browse files
committed
worker: add option to track unmanaged file descriptors
Add a public option for Workers which adds tracking for raw file descriptors, as currently, those resources are not cleaned up, unlike e.g. `FileHandle`s. PR-URL: #34303 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Ben Noordhuis <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent 3e21dd9 commit b4819db

File tree

5 files changed

+89
-3
lines changed

5 files changed

+89
-3
lines changed

doc/api/worker_threads.md

+12
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,10 @@ if (isMainThread) {
561561
<!-- YAML
562562
added: v10.5.0
563563
changes:
564+
- version:
565+
- REPLACEME
566+
pr-url: https://github.com/nodejs/node/pull/34303
567+
description: The `trackUnmanagedFds` option was introduced.
564568
- version: v12.17.0
565569
pr-url: https://github.com/nodejs/node/pull/32278
566570
description: The `transferList` option was introduced.
@@ -612,6 +616,12 @@ changes:
612616
occur as described in the [HTML structured clone algorithm][], and an error
613617
will be thrown if the object cannot be cloned (e.g. because it contains
614618
`function`s).
619+
* `trackUnmanagedFds` {boolean} If this is set to `true`, then the Worker will
620+
track raw file descriptors managed through [`fs.open()`][] and
621+
[`fs.close()`][], and close them when the Worker exits, similar to other
622+
resources like network sockets or file descriptors managed through
623+
the [`FileHandle`][] API. This option is automatically inherited by all
624+
nested `Worker`s. **Default**: `false`.
615625
* `transferList` {Object[]} If one or more `MessagePort`-like objects
616626
are passed in `workerData`, a `transferList` is required for those
617627
items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] will be thrown.
@@ -816,6 +826,8 @@ active handle in the event system. If the worker is already `unref()`ed calling
816826
[`WebAssembly.Module`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WebAssembly/Module
817827
[`Worker`]: #worker_threads_class_worker
818828
[`cluster` module]: cluster.html
829+
[`fs.open()`]: fs.html#fs_fs_open_path_flags_mode_callback
830+
[`fs.close()`]: fs.html#fs_fs_close_fd_callback
819831
[`port.on('message')`]: #worker_threads_event_message
820832
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
821833
[`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist

lib/internal/worker.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ class Worker extends EventEmitter {
149149
this[kHandle] = new WorkerImpl(url,
150150
env === process.env ? null : env,
151151
options.execArgv,
152-
parseResourceLimits(options.resourceLimits));
152+
parseResourceLimits(options.resourceLimits),
153+
!!options.trackUnmanagedFds);
153154
if (this[kHandle].invalidExecArgv) {
154155
throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
155156
}

src/node_worker.cc

+5-2
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ void Worker::Run() {
313313
context,
314314
std::move(argv_),
315315
std::move(exec_argv_),
316-
EnvironmentFlags::kNoFlags,
316+
static_cast<EnvironmentFlags::Flags>(environment_flags_),
317317
thread_id_,
318318
std::move(inspector_parent_handle_)));
319319
if (is_stopped()) return;
@@ -460,7 +460,6 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
460460

461461
std::vector<std::string> exec_argv_out;
462462

463-
CHECK_EQ(args.Length(), 4);
464463
// Argument might be a string or URL
465464
if (!args[0]->IsNullOrUndefined()) {
466465
Utf8Value value(
@@ -586,6 +585,10 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
586585
CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
587586
limit_info->CopyContents(worker->resource_limits_,
588587
sizeof(worker->resource_limits_));
588+
589+
CHECK(args[4]->IsBoolean());
590+
if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
591+
worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
589592
}
590593

591594
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {

src/node_worker.h

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class Worker : public AsyncWrap {
116116
bool stopped_ = true;
117117

118118
bool has_ref_ = true;
119+
uint64_t environment_flags_ = EnvironmentFlags::kNoFlags;
119120

120121
// The real Environment of the worker object. It has a lesser
121122
// lifespan than the worker object itself - comes to life
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const { Worker } = require('worker_threads');
5+
const { once } = require('events');
6+
const fs = require('fs');
7+
8+
// All the tests here are run sequentially, to avoid accidentally opening an fd
9+
// which another part of the test expects to be closed.
10+
11+
const preamble = `
12+
const fs = require("fs");
13+
const { parentPort } = require('worker_threads');
14+
const __filename = ${JSON.stringify(__filename)};
15+
process.on('warning', (warning) => parentPort.postMessage({ warning }));
16+
`;
17+
18+
(async () => {
19+
// Consistency check: Without trackUnmanagedFds, FDs are *not* closed.
20+
{
21+
const w = new Worker(`${preamble}
22+
parentPort.postMessage(fs.openSync(__filename));
23+
`, { eval: true, trackUnmanagedFds: false });
24+
const [ [ fd ] ] = await Promise.all([once(w, 'message'), once(w, 'exit')]);
25+
assert(fd > 2);
26+
fs.fstatSync(fd); // Does not throw.
27+
fs.closeSync(fd);
28+
}
29+
30+
// With trackUnmanagedFds, FDs are closed automatically.
31+
{
32+
const w = new Worker(`${preamble}
33+
parentPort.postMessage(fs.openSync(__filename));
34+
`, { eval: true, trackUnmanagedFds: true });
35+
const [ [ fd ] ] = await Promise.all([once(w, 'message'), once(w, 'exit')]);
36+
assert(fd > 2);
37+
assert.throws(() => fs.fstatSync(fd), { code: 'EBADF' });
38+
}
39+
40+
// There is a warning when an fd is unexpectedly opened twice.
41+
{
42+
const w = new Worker(`${preamble}
43+
parentPort.postMessage(fs.openSync(__filename));
44+
parentPort.once('message', () => {
45+
const reopened = fs.openSync(__filename);
46+
fs.closeSync(reopened);
47+
});
48+
`, { eval: true, trackUnmanagedFds: true });
49+
const [ fd ] = await once(w, 'message');
50+
fs.closeSync(fd);
51+
w.postMessage('');
52+
const [ { warning } ] = await once(w, 'message');
53+
assert.match(warning.message,
54+
/File descriptor \d+ opened in unmanaged mode twice/);
55+
}
56+
57+
// There is a warning when an fd is unexpectedly closed.
58+
{
59+
const w = new Worker(`${preamble}
60+
parentPort.once('message', (fd) => {
61+
fs.closeSync(fd);
62+
});
63+
`, { eval: true, trackUnmanagedFds: true });
64+
w.postMessage(fs.openSync(__filename));
65+
const [ { warning } ] = await once(w, 'message');
66+
assert.match(warning.message,
67+
/File descriptor \d+ closed but not opened in unmanaged mode/);
68+
}
69+
})().then(common.mustCall());

0 commit comments

Comments
 (0)