Skip to content

Commit 337be58

Browse files
addaleaxtargos
authored andcommitted
worker: implement MessagePort and MessageChannel
Implement `MessagePort` and `MessageChannel` along the lines of the DOM classes of the same names. `MessagePort`s initially support transferring only `ArrayBuffer`s. Thanks to Stephen Belanger for reviewing this change in its original form, to Benjamin Gruenbaum for reviewing the added tests in their original form, and to Olivia Hugger for reviewing the documentation in its original form. Refs: ayojs/ayo#98 PR-URL: #20876 Reviewed-By: Gireesh Punathil <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Shingo Inoue <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Tiancheng "Timothy" Gu <[email protected]> Reviewed-By: John-David Dalton <[email protected]> Reviewed-By: Gus Caplan <[email protected]>
1 parent 9a73413 commit 337be58

23 files changed

+1140
-2
lines changed

doc/api/_toc.md

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
* [Utilities](util.html)
5454
* [V8](v8.html)
5555
* [VM](vm.html)
56+
* [Worker](worker.html)
5657
* [ZLIB](zlib.html)
5758

5859
<div class="line"></div>

doc/api/all.md

+1
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,5 @@
4646
@include util
4747
@include v8
4848
@include vm
49+
@include worker
4950
@include zlib

doc/api/errors.md

+16
Original file line numberDiff line numberDiff line change
@@ -650,12 +650,23 @@ Used when a child process is being forked without specifying an IPC channel.
650650
Used when the main process is trying to read data from the child process's
651651
STDERR / STDOUT, and the data's length is longer than the `maxBuffer` option.
652652

653+
<a id="ERR_CLOSED_MESSAGE_PORT"></a>
654+
### ERR_CLOSED_MESSAGE_PORT
655+
656+
There was an attempt to use a `MessagePort` instance in a closed
657+
state, usually after `.close()` has been called.
658+
653659
<a id="ERR_CONSOLE_WRITABLE_STREAM"></a>
654660
### ERR_CONSOLE_WRITABLE_STREAM
655661

656662
`Console` was instantiated without `stdout` stream, or `Console` has a
657663
non-writable `stdout` or `stderr` stream.
658664

665+
<a id="ERR_CONSTRUCT_CALL_REQUIRED"></a>
666+
### ERR_CONSTRUCT_CALL_REQUIRED
667+
668+
A constructor for a class was called without `new`.
669+
659670
<a id="ERR_CPU_USAGE"></a>
660671
### ERR_CPU_USAGE
661672

@@ -1213,6 +1224,11 @@ urlSearchParams.has.call(buf, 'foo');
12131224
// Throws a TypeError with code 'ERR_INVALID_THIS'
12141225
```
12151226

1227+
<a id="ERR_INVALID_TRANSFER_OBJECT"></a>
1228+
### ERR_INVALID_TRANSFER_OBJECT
1229+
1230+
An invalid transfer object was passed to `postMessage()`.
1231+
12161232
<a id="ERR_INVALID_TUPLE"></a>
12171233
### ERR_INVALID_TUPLE
12181234

doc/api/worker.md

+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Worker
2+
3+
<!--introduced_in=REPLACEME-->
4+
5+
> Stability: 1 - Experimental
6+
7+
## Class: MessageChannel
8+
<!-- YAML
9+
added: REPLACEME
10+
-->
11+
12+
Instances of the `worker.MessageChannel` class represent an asynchronous,
13+
two-way communications channel.
14+
The `MessageChannel` has no methods of its own. `new MessageChannel()`
15+
yields an object with `port1` and `port2` properties, which refer to linked
16+
[`MessagePort`][] instances.
17+
18+
```js
19+
const { MessageChannel } = require('worker');
20+
21+
const { port1, port2 } = new MessageChannel();
22+
port1.on('message', (message) => console.log('received', message));
23+
port2.postMessage({ foo: 'bar' });
24+
// prints: received { foo: 'bar' }
25+
```
26+
27+
## Class: MessagePort
28+
<!-- YAML
29+
added: REPLACEME
30+
-->
31+
32+
* Extends: {EventEmitter}
33+
34+
Instances of the `worker.MessagePort` class represent one end of an
35+
asynchronous, two-way communications channel. It can be used to transfer
36+
structured data, memory regions and other `MessagePort`s between different
37+
[`Worker`][]s.
38+
39+
With the exception of `MessagePort`s being [`EventEmitter`][]s rather
40+
than `EventTarget`s, this implementation matches [browser `MessagePort`][]s.
41+
42+
### Event: 'close'
43+
<!-- YAML
44+
added: REPLACEME
45+
-->
46+
47+
The `'close'` event is emitted once either side of the channel has been
48+
disconnected.
49+
50+
### Event: 'message'
51+
<!-- YAML
52+
added: REPLACEME
53+
-->
54+
55+
* `value` {any} The transmitted value
56+
57+
The `'message'` event is emitted for any incoming message, containing the cloned
58+
input of [`port.postMessage()`][].
59+
60+
Listeners on this event will receive a clone of the `value` parameter as passed
61+
to `postMessage()` and no further arguments.
62+
63+
### port.close()
64+
<!-- YAML
65+
added: REPLACEME
66+
-->
67+
68+
Disables further sending of messages on either side of the connection.
69+
This method can be called once you know that no further communication
70+
will happen over this `MessagePort`.
71+
72+
### port.postMessage(value[, transferList])
73+
<!-- YAML
74+
added: REPLACEME
75+
-->
76+
77+
* `value` {any}
78+
* `transferList` {Object[]}
79+
80+
Sends a JavaScript value to the receiving side of this channel.
81+
`value` will be transferred in a way which is compatible with
82+
the [HTML structured clone algorithm][]. In particular, it may contain circular
83+
references and objects like typed arrays that the `JSON` API is not able
84+
to stringify.
85+
86+
`transferList` may be a list of `ArrayBuffer` objects.
87+
After transferring, they will not be usable on the sending side of the channel
88+
anymore (even if they are not contained in `value`).
89+
90+
`value` may still contain `ArrayBuffer` instances that are not in
91+
`transferList`; in that case, the underlying memory is copied rather than moved.
92+
93+
For more information on the serialization and deserialization mechanisms
94+
behind this API, see the [serialization API of the `v8` module][v8.serdes].
95+
96+
Because the object cloning uses the structured clone algorithm,
97+
non-enumerable properties, property accessors, and object prototypes are
98+
not preserved. In particular, [`Buffer`][] objects will be read as
99+
plain [`Uint8Array`][]s on the receiving side.
100+
101+
The message object will be cloned immediately, and can be modified after
102+
posting without having side effects.
103+
104+
### port.ref()
105+
<!-- YAML
106+
added: REPLACEME
107+
-->
108+
109+
Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed port will
110+
*not* let the program exit if it's the only active handle left (the default
111+
behavior). If the port is `ref()`ed, calling `ref()` again will have no effect.
112+
113+
If listeners are attached or removed using `.on('message')`, the port will
114+
be `ref()`ed and `unref()`ed automatically depending on whether
115+
listeners for the event exist.
116+
117+
### port.start()
118+
<!-- YAML
119+
added: REPLACEME
120+
-->
121+
122+
Starts receiving messages on this `MessagePort`. When using this port
123+
as an event emitter, this will be called automatically once `'message'`
124+
listeners are attached.
125+
126+
### port.unref()
127+
<!-- YAML
128+
added: REPLACEME
129+
-->
130+
131+
Calling `unref()` on a port will allow the thread to exit if this is the only
132+
active handle in the event system. If the port is already `unref()`ed calling
133+
`unref()` again will have no effect.
134+
135+
If listeners are attached or removed using `.on('message')`, the port will
136+
be `ref()`ed and `unref()`ed automatically depending on whether
137+
listeners for the event exist.
138+
139+
[`Buffer`]: buffer.html
140+
[`EventEmitter`]: events.html
141+
[`MessagePort`]: #worker_class_messageport
142+
[`port.postMessage()`]: #worker_port_postmessage_value_transferlist
143+
[v8.serdes]: v8.html#v8_serialization_api
144+
[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array
145+
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
146+
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm

lib/internal/bootstrap/loaders.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@
194194
};
195195

196196
NativeModule.isInternal = function(id) {
197-
return id.startsWith('internal/');
197+
return id.startsWith('internal/') ||
198+
(id === 'worker' && !process.binding('config').experimentalWorker);
198199
};
199200
}
200201

lib/internal/modules/cjs/helpers.js

+5
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ const builtinLibs = [
105105
'v8', 'vm', 'zlib'
106106
];
107107

108+
if (process.binding('config').experimentalWorker) {
109+
builtinLibs.push('worker');
110+
builtinLibs.sort();
111+
}
112+
108113
if (typeof process.binding('inspector').open === 'function') {
109114
builtinLibs.push('inspector');
110115
builtinLibs.sort();

lib/internal/worker.js

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
'use strict';
2+
3+
const EventEmitter = require('events');
4+
const util = require('util');
5+
6+
const { internalBinding } = require('internal/bootstrap/loaders');
7+
const { MessagePort, MessageChannel } = internalBinding('messaging');
8+
const { handle_onclose } = internalBinding('symbols');
9+
10+
util.inherits(MessagePort, EventEmitter);
11+
12+
const kOnMessageListener = Symbol('kOnMessageListener');
13+
14+
const debug = util.debuglog('worker');
15+
16+
// A MessagePort consists of a handle (that wraps around an
17+
// uv_async_t) which can receive information from other threads and emits
18+
// .onmessage events, and a function used for sending data to a MessagePort
19+
// in some other thread.
20+
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
21+
debug('received message', payload);
22+
// Emit the deserialized object to userland.
23+
this.emit('message', payload);
24+
};
25+
26+
// This is for compatibility with the Web's MessagePort API. It makes sense to
27+
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
28+
// `onmessage`, we'll switch over to the Web API model.
29+
Object.defineProperty(MessagePort.prototype, 'onmessage', {
30+
enumerable: true,
31+
configurable: true,
32+
get() {
33+
return this[kOnMessageListener];
34+
},
35+
set(value) {
36+
this[kOnMessageListener] = value;
37+
if (typeof value === 'function') {
38+
this.ref();
39+
this.start();
40+
} else {
41+
this.unref();
42+
this.stop();
43+
}
44+
}
45+
});
46+
47+
// This is called from inside the `MessagePort` constructor.
48+
function oninit() {
49+
setupPortReferencing(this, this, 'message');
50+
}
51+
52+
Object.defineProperty(MessagePort.prototype, 'oninit', {
53+
enumerable: true,
54+
writable: false,
55+
value: oninit
56+
});
57+
58+
// This is called after the underlying `uv_async_t` has been closed.
59+
function onclose() {
60+
if (typeof this.onclose === 'function') {
61+
// Not part of the Web standard yet, but there aren't many reasonable
62+
// alternatives in a non-EventEmitter usage setting.
63+
// Refs: https://github.com/whatwg/html/issues/1766
64+
this.onclose();
65+
}
66+
this.emit('close');
67+
}
68+
69+
Object.defineProperty(MessagePort.prototype, handle_onclose, {
70+
enumerable: false,
71+
writable: false,
72+
value: onclose
73+
});
74+
75+
const originalClose = MessagePort.prototype.close;
76+
MessagePort.prototype.close = function(cb) {
77+
if (typeof cb === 'function')
78+
this.once('close', cb);
79+
originalClose.call(this);
80+
};
81+
82+
function setupPortReferencing(port, eventEmitter, eventName) {
83+
// Keep track of whether there are any workerMessage listeners:
84+
// If there are some, ref() the channel so it keeps the event loop alive.
85+
// If there are none or all are removed, unref() the channel so the worker
86+
// can shutdown gracefully.
87+
port.unref();
88+
eventEmitter.on('newListener', (name) => {
89+
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
90+
port.ref();
91+
port.start();
92+
}
93+
});
94+
eventEmitter.on('removeListener', (name) => {
95+
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
96+
port.stop();
97+
port.unref();
98+
}
99+
});
100+
}
101+
102+
module.exports = {
103+
MessagePort,
104+
MessageChannel
105+
};

lib/worker.js

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
'use strict';
2+
3+
const { MessagePort, MessageChannel } = require('internal/worker');
4+
5+
module.exports = { MessagePort, MessageChannel };

node.gyp

+4
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
'lib/util.js',
7979
'lib/v8.js',
8080
'lib/vm.js',
81+
'lib/worker.js',
8182
'lib/zlib.js',
8283
'lib/internal/assert.js',
8384
'lib/internal/async_hooks.js',
@@ -155,6 +156,7 @@
155156
'lib/internal/validators.js',
156157
'lib/internal/stream_base_commons.js',
157158
'lib/internal/vm/module.js',
159+
'lib/internal/worker.js',
158160
'lib/internal/streams/lazy_transform.js',
159161
'lib/internal/streams/async_iterator.js',
160162
'lib/internal/streams/buffer_list.js',
@@ -333,6 +335,7 @@
333335
'src/node_file.cc',
334336
'src/node_http2.cc',
335337
'src/node_http_parser.cc',
338+
'src/node_messaging.cc',
336339
'src/node_os.cc',
337340
'src/node_platform.cc',
338341
'src/node_perf.cc',
@@ -390,6 +393,7 @@
390393
'src/node_http2_state.h',
391394
'src/node_internals.h',
392395
'src/node_javascript.h',
396+
'src/node_messaging.h',
393397
'src/node_mutex.h',
394398
'src/node_perf.h',
395399
'src/node_perf_common.h',

src/async_wrap.h

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ namespace node {
4949
V(HTTP2SETTINGS) \
5050
V(HTTPPARSER) \
5151
V(JSSTREAM) \
52+
V(MESSAGEPORT) \
5253
V(PIPECONNECTWRAP) \
5354
V(PIPESERVERWRAP) \
5455
V(PIPEWRAP) \

0 commit comments

Comments
 (0)