Skip to content

Commit 0aa3809

Browse files
addaleaxruyadorno
authored andcommitted
worker: make MessagePort inherit from EventTarget
Use `NodeEventTarget` to provide a mixed `EventEmitter`/`EventTarget` API interface. PR-URL: #34057 Refs: https://twitter.com/addaleax/status/1276289101671608320 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: David Carlier <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent c93a898 commit 0aa3809

9 files changed

+126
-69
lines changed

benchmark/worker/messageport.js

+15-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const common = require('../common.js');
44
const { MessageChannel } = require('worker_threads');
55
const bench = common.createBenchmark(main, {
66
payload: ['string', 'object'],
7+
style: ['eventtarget', 'eventemitter'],
78
n: [1e6]
89
});
910

@@ -25,14 +26,26 @@ function main(conf) {
2526
const { port1, port2 } = new MessageChannel();
2627

2728
let messages = 0;
28-
port2.onmessage = () => {
29+
function listener() {
2930
if (messages++ === n) {
3031
bench.end(n);
3132
port1.close();
3233
} else {
3334
write();
3435
}
35-
};
36+
}
37+
38+
switch (conf.style) {
39+
case 'eventtarget':
40+
port2.onmessage = listener;
41+
break;
42+
case 'eventemitter':
43+
port2.on('message', listener);
44+
break;
45+
default:
46+
throw new Error('Unsupported listener type');
47+
}
48+
3649
bench.start();
3750
write();
3851

+23-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,31 @@
11
'use strict';
2+
const {
3+
SymbolFor,
4+
} = primordials;
5+
26
class MessageEvent {
3-
constructor(data, target) {
7+
constructor(data, target, type) {
48
this.data = data;
59
this.target = target;
10+
this.type = type;
611
}
712
}
813

9-
exports.emitMessage = function(data) {
10-
if (typeof this.onmessage === 'function')
11-
this.onmessage(new MessageEvent(data, this));
14+
const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch');
15+
16+
exports.emitMessage = function(data, type) {
17+
if (typeof this[kHybridDispatch] === 'function') {
18+
this[kHybridDispatch](data, type, undefined);
19+
return;
20+
}
21+
22+
const event = new MessageEvent(data, this, type);
23+
if (type === 'message') {
24+
if (typeof this.onmessage === 'function')
25+
this.onmessage(event);
26+
} else {
27+
// eslint-disable-next-line no-lonely-if
28+
if (typeof this.onmessageerror === 'function')
29+
this.onmessageerror(event);
30+
}
1231
};

lib/internal/worker/io.js

+65-46
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,23 @@ const {
2424
stopMessagePort
2525
} = internalBinding('messaging');
2626
const {
27-
threadId,
2827
getEnvMessagePort
2928
} = internalBinding('worker');
3029

3130
const { Readable, Writable } = require('stream');
32-
const EventEmitter = require('events');
31+
const {
32+
Event,
33+
NodeEventTarget,
34+
defineEventHandler,
35+
initNodeEventTarget,
36+
kCreateEvent,
37+
kNewListener,
38+
kRemoveListener,
39+
} = require('internal/event_target');
3340
const { inspect } = require('internal/util/inspect');
34-
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
35-
debug = fn;
36-
});
3741

3842
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
3943
const kName = Symbol('kName');
40-
const kOnMessageListener = Symbol('kOnMessageListener');
4144
const kPort = Symbol('kPort');
4245
const kWaitingStreams = Symbol('kWaitingStreams');
4346
const kWritableCallbacks = Symbol('kWritableCallbacks');
@@ -54,55 +57,47 @@ const messageTypes = {
5457
};
5558

5659
// We have to mess with the MessagePort prototype a bit, so that a) we can make
57-
// it inherit from EventEmitter, even though it is a C++ class, and b) we do
60+
// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do
5861
// not provide methods that are not present in the Browser and not documented
5962
// on our side (e.g. hasRef).
6063
// Save a copy of the original set of methods as a shallow clone.
6164
const MessagePortPrototype = ObjectCreate(
6265
ObjectGetPrototypeOf(MessagePort.prototype),
6366
ObjectGetOwnPropertyDescriptors(MessagePort.prototype));
6467
// Set up the new inheritance chain.
65-
ObjectSetPrototypeOf(MessagePort, EventEmitter);
66-
ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
68+
ObjectSetPrototypeOf(MessagePort, NodeEventTarget);
69+
ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype);
6770
// Copy methods that are inherited from HandleWrap, because
6871
// changing the prototype of MessagePort.prototype implicitly removed them.
6972
MessagePort.prototype.ref = MessagePortPrototype.ref;
7073
MessagePort.prototype.unref = MessagePortPrototype.unref;
7174

72-
// A communication channel consisting of a handle (that wraps around an
73-
// uv_async_t) which can receive information from other threads and emits
74-
// .onmessage events, and a function used for sending data to a MessagePort
75-
// in some other thread.
76-
MessagePort.prototype[kOnMessageListener] = function onmessage(event) {
77-
if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)
78-
debug(`[${threadId}] received message`, event);
79-
// Emit the deserialized object to userland.
80-
this.emit('message', event.data);
81-
};
82-
83-
// This is for compatibility with the Web's MessagePort API. It makes sense to
84-
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
85-
// `onmessage`, we'll switch over to the Web API model.
86-
ObjectDefineProperty(MessagePort.prototype, 'onmessage', {
87-
enumerable: true,
88-
configurable: true,
89-
get() {
90-
return this[kOnMessageListener];
91-
},
92-
set(value) {
93-
this[kOnMessageListener] = value;
94-
if (typeof value === 'function') {
95-
this.ref();
96-
MessagePortPrototype.start.call(this);
97-
} else {
98-
this.unref();
99-
stopMessagePort(this);
100-
}
75+
class MessageEvent extends Event {
76+
constructor(data, target, type) {
77+
super(type);
78+
this.data = data;
10179
}
102-
});
80+
}
81+
82+
ObjectDefineProperty(
83+
MessagePort.prototype,
84+
kCreateEvent,
85+
{
86+
value: function(data, type) {
87+
return new MessageEvent(data, this, type);
88+
},
89+
configurable: false,
90+
writable: false,
91+
enumerable: false,
92+
});
10393

10494
// This is called from inside the `MessagePort` constructor.
10595
function oninit() {
96+
initNodeEventTarget(this);
97+
// TODO(addaleax): This should be on MessagePort.prototype, but
98+
// defineEventHandler() does not support that.
99+
defineEventHandler(this, 'message');
100+
defineEventHandler(this, 'messageerror');
106101
setupPortReferencing(this, this, 'message');
107102
}
108103

@@ -112,9 +107,15 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
112107
value: oninit
113108
});
114109

110+
class MessagePortCloseEvent extends Event {
111+
constructor() {
112+
super('close');
113+
}
114+
}
115+
115116
// This is called after the underlying `uv_async_t` has been closed.
116117
function onclose() {
117-
this.emit('close');
118+
this.dispatchEvent(new MessagePortCloseEvent());
118119
}
119120

120121
ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
@@ -156,18 +157,36 @@ function setupPortReferencing(port, eventEmitter, eventName) {
156157
// If there are none or all are removed, unref() the channel so the worker
157158
// can shutdown gracefully.
158159
port.unref();
159-
eventEmitter.on('newListener', (name) => {
160-
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
160+
eventEmitter.on('newListener', function(name) {
161+
if (name === eventName) newListener(eventEmitter.listenerCount(name));
162+
});
163+
eventEmitter.on('removeListener', function(name) {
164+
if (name === eventName) removeListener(eventEmitter.listenerCount(name));
165+
});
166+
const origNewListener = eventEmitter[kNewListener];
167+
eventEmitter[kNewListener] = function(size, type, ...args) {
168+
if (type === eventName) newListener(size - 1);
169+
return origNewListener.call(this, size, type, ...args);
170+
};
171+
const origRemoveListener = eventEmitter[kRemoveListener];
172+
eventEmitter[kRemoveListener] = function(size, type, ...args) {
173+
if (type === eventName) removeListener(size);
174+
return origRemoveListener.call(this, size, type, ...args);
175+
};
176+
177+
function newListener(size) {
178+
if (size === 0) {
161179
port.ref();
162180
MessagePortPrototype.start.call(port);
163181
}
164-
});
165-
eventEmitter.on('removeListener', (name) => {
166-
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
182+
}
183+
184+
function removeListener(size) {
185+
if (size === 0) {
167186
stopMessagePort(port);
168187
port.unref();
169188
}
170-
});
189+
}
171190
}
172191

173192

src/node_messaging.cc

+9-8
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,8 @@ void MessagePort::OnMessage() {
747747

748748
Local<Value> payload;
749749
Local<Value> message_error;
750+
Local<Value> argv[2];
751+
750752
{
751753
// Catch any exceptions from parsing the message itself (not from
752754
// emitting it) as 'messageeror' events.
@@ -765,16 +767,15 @@ void MessagePort::OnMessage() {
765767
continue;
766768
}
767769

768-
if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
770+
argv[0] = payload;
771+
argv[1] = env()->message_string();
772+
773+
if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
769774
reschedule:
770775
if (!message_error.IsEmpty()) {
771-
// This should become a `messageerror` event in the sense of the
772-
// EventTarget API at some point.
773-
Local<Value> argv[] = {
774-
env()->messageerror_string(),
775-
message_error
776-
};
777-
USE(MakeCallback(env()->emit_string(), arraysize(argv), argv));
776+
argv[0] = message_error;
777+
argv[1] = env()->messageerror_string();
778+
USE(MakeCallback(emit_message, arraysize(argv), argv));
778779
}
779780

780781
// Re-schedule OnMessage() execution in case of failure.

test/parallel/test-bootstrap-modules.js

+2
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ if (!common.isMainThread) {
9999
'NativeModule _stream_transform',
100100
'NativeModule _stream_writable',
101101
'NativeModule internal/error_serdes',
102+
'NativeModule internal/event_target',
102103
'NativeModule internal/process/worker_thread_only',
103104
'NativeModule internal/streams/buffer_list',
104105
'NativeModule internal/streams/destroy',
@@ -109,6 +110,7 @@ if (!common.isMainThread) {
109110
'NativeModule internal/worker',
110111
'NativeModule internal/worker/io',
111112
'NativeModule stream',
113+
'NativeModule util',
112114
'NativeModule worker_threads',
113115
].forEach(expectedModules.add.bind(expectedModules));
114116
}

test/parallel/test-crypto-key-objects-messageport.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ for (const [key, repr] of keys) {
7575

7676
// TODO(addaleax): Switch this to a 'messageerror' event once MessagePort
7777
// implements EventTarget fully and in a cross-context manner.
78-
port2moved.emit = common.mustCall((name, err) => {
79-
assert.strictEqual(name, 'messageerror');
80-
assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
78+
port2moved.onmessageerror = common.mustCall((event) => {
79+
assert.strictEqual(event.data.code,
80+
'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
8181
});
8282

8383
port2moved.start();

test/parallel/test-worker-message-port-inspect-during-init-hook.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ const { MessageChannel } = require('worker_threads');
1010

1111
async_hooks.createHook({
1212
init: common.mustCall((id, type, triggerId, resource) => {
13-
assert.strictEqual(util.inspect(resource),
14-
'MessagePort { active: true, refed: false }');
13+
assert.strictEqual(
14+
util.inspect(resource),
15+
'MessagePort [EventTarget] { active: true, refed: false }');
1516
}, 2)
1617
}).enable();
1718

test/parallel/test-worker-message-port-transfer-filehandle.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ const { once } = require('events');
5757
});
5858
// TODO(addaleax): Switch this to a 'messageerror' event once MessagePort
5959
// implements EventTarget fully and in a cross-context manner.
60-
port2moved.emit = common.mustCall((name, err) => {
61-
assert.strictEqual(name, 'messageerror');
62-
assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
60+
port2moved.onmessageerror = common.mustCall((event) => {
61+
assert.strictEqual(event.data.code,
62+
'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
6363
});
6464
port2moved.start();
6565

test/parallel/test-worker-message-port.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@ const { MessageChannel, MessagePort } = require('worker_threads');
154154
assert.deepStrictEqual(
155155
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
156156
[
157-
'close', 'constructor', 'onmessage', 'postMessage', 'ref', 'start',
157+
// TODO(addaleax): This should include onmessage (and eventually
158+
// onmessageerror).
159+
'close', 'constructor', 'postMessage', 'ref', 'start',
158160
'unref'
159161
]);
160162
}

0 commit comments

Comments
 (0)