Skip to content

Commit c7cf8d9

Browse files
addaleaxtargos
authored andcommitted
worker: add ability to unshift message from MessagePort
In combination with Atomics, this makes it possible to implement generic synchronous functionality, e.g. `importScript()`, in Workers purely by communicating with other threads. This is a continuation of #26686, where a preference for a solution was voiced that allowed reading individual messages, rather than emitting all messages through events. PR-URL: #27294 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Gus Caplan <[email protected]> Reviewed-By: Ruben Bridgewater <[email protected]>
1 parent 1f7a527 commit c7cf8d9

File tree

7 files changed

+139
-49
lines changed

7 files changed

+139
-49
lines changed

doc/api/worker_threads.md

+28
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,34 @@ if (isMainThread) {
125125
}
126126
```
127127

128+
## worker.receiveMessageOnPort(port)
129+
<!-- YAML
130+
added: REPLACEME
131+
-->
132+
133+
* `port` {MessagePort}
134+
135+
* Returns: {Object|undefined}
136+
137+
Receive a single message from a given `MessagePort`. If no message is available,
138+
`undefined` is returned, otherwise an object with a single `message` property
139+
that contains the message payload, corresponding to the oldest message in the
140+
`MessagePort`’s queue.
141+
142+
```js
143+
const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
144+
const { port1, port2 } = new MessageChannel();
145+
port1.postMessage({ hello: 'world' });
146+
147+
console.log(receiveMessageOnPort(port2));
148+
// Prints: { message: { hello: 'world' } }
149+
console.log(receiveMessageOnPort(port2));
150+
// Prints: undefined
151+
```
152+
153+
When this function is used, no `'message'` event will be emitted and the
154+
`onmessage` listener will not be invoked.
155+
128156
## worker.SHARE_ENV
129157
<!-- YAML
130158
added: v11.14.0

lib/internal/worker/io.js

+10-1
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ const { Object } = primordials;
44

55
const {
66
handle_onclose: handleOnCloseSymbol,
7-
oninit: onInitSymbol
7+
oninit: onInitSymbol,
8+
no_message_symbol: noMessageSymbol
89
} = internalBinding('symbols');
910
const {
1011
MessagePort,
1112
MessageChannel,
1213
drainMessagePort,
1314
moveMessagePortToContext,
15+
receiveMessageOnPort: receiveMessageOnPort_,
1416
stopMessagePort
1517
} = internalBinding('messaging');
1618
const {
@@ -235,6 +237,12 @@ function createWorkerStdio() {
235237
};
236238
}
237239

240+
function receiveMessageOnPort(port) {
241+
const message = receiveMessageOnPort_(port);
242+
if (message === noMessageSymbol) return undefined;
243+
return { message };
244+
}
245+
238246
module.exports = {
239247
drainMessagePort,
240248
messageTypes,
@@ -245,6 +253,7 @@ module.exports = {
245253
moveMessagePortToContext,
246254
MessagePort,
247255
MessageChannel,
256+
receiveMessageOnPort,
248257
setupPortReferencing,
249258
ReadableWorkerStdio,
250259
WritableWorkerStdio,

lib/worker_threads.js

+2
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ const {
1111
MessagePort,
1212
MessageChannel,
1313
moveMessagePortToContext,
14+
receiveMessageOnPort
1415
} = require('internal/worker/io');
1516

1617
module.exports = {
1718
isMainThread,
1819
MessagePort,
1920
MessageChannel,
2021
moveMessagePortToContext,
22+
receiveMessageOnPort,
2123
threadId,
2224
SHARE_ENV,
2325
Worker,

src/env.h

+1
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
130130
// for the sake of convenience.
131131
#define PER_ISOLATE_SYMBOL_PROPERTIES(V) \
132132
V(handle_onclose_symbol, "handle_onclose") \
133+
V(no_message_symbol, "no_message_symbol") \
133134
V(oninit_symbol, "oninit") \
134135
V(owner_symbol, "owner") \
135136

src/node_messaging.cc

+70-48
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,40 @@ MessagePort* MessagePort::New(
569569
return port;
570570
}
571571

572+
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
573+
bool only_if_receiving) {
574+
Message received;
575+
{
576+
// Get the head of the message queue.
577+
Mutex::ScopedLock lock(data_->mutex_);
578+
579+
Debug(this, "MessagePort has message");
580+
581+
bool wants_message = receiving_messages_ || !only_if_receiving;
582+
// We have nothing to do if:
583+
// - There are no pending messages
584+
// - We are not intending to receive messages, and the message we would
585+
// receive is not the final "close" message.
586+
if (data_->incoming_messages_.empty() ||
587+
(!wants_message &&
588+
!data_->incoming_messages_.front().IsCloseMessage())) {
589+
return env()->no_message_symbol();
590+
}
591+
592+
received = std::move(data_->incoming_messages_.front());
593+
data_->incoming_messages_.pop_front();
594+
}
595+
596+
if (received.IsCloseMessage()) {
597+
Close();
598+
return env()->no_message_symbol();
599+
}
600+
601+
if (!env()->can_call_into_js()) return MaybeLocal<Value>();
602+
603+
return received.Deserialize(env(), context);
604+
}
605+
572606
void MessagePort::OnMessage() {
573607
Debug(this, "Running MessagePort::OnMessage()");
574608
HandleScope handle_scope(env()->isolate());
@@ -579,61 +613,33 @@ void MessagePort::OnMessage() {
579613
// messages, so we need to check that this handle still owns its `data_` field
580614
// on every iteration.
581615
while (data_) {
582-
Message received;
583-
{
584-
// Get the head of the message queue.
585-
Mutex::ScopedLock lock(data_->mutex_);
586-
587-
Debug(this, "MessagePort has message, receiving = %d",
588-
static_cast<int>(receiving_messages_));
589-
590-
// We have nothing to do if:
591-
// - There are no pending messages
592-
// - We are not intending to receive messages, and the message we would
593-
// receive is not the final "close" message.
594-
if (data_->incoming_messages_.empty() ||
595-
(!receiving_messages_ &&
596-
!data_->incoming_messages_.front().IsCloseMessage())) {
597-
break;
598-
}
616+
HandleScope handle_scope(env()->isolate());
617+
Context::Scope context_scope(context);
599618

600-
received = std::move(data_->incoming_messages_.front());
601-
data_->incoming_messages_.pop_front();
602-
}
603-
604-
if (received.IsCloseMessage()) {
605-
Close();
606-
return;
607-
}
619+
Local<Value> payload;
620+
if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
621+
if (payload == env()->no_message_symbol()) break;
608622

609623
if (!env()->can_call_into_js()) {
610624
Debug(this, "MessagePort drains queue because !can_call_into_js()");
611625
// In this case there is nothing to do but to drain the current queue.
612626
continue;
613627
}
614628

615-
{
616-
// Call the JS .onmessage() callback.
617-
HandleScope handle_scope(env()->isolate());
618-
Context::Scope context_scope(context);
619-
620-
Local<Object> event;
621-
Local<Value> payload;
622-
Local<Value> cb_args[1];
623-
if (!received.Deserialize(env(), context).ToLocal(&payload) ||
624-
!env()->message_event_object_template()->NewInstance(context)
625-
.ToLocal(&event) ||
626-
event->Set(context, env()->data_string(), payload).IsNothing() ||
627-
event->Set(context, env()->target_string(), object()).IsNothing() ||
628-
(cb_args[0] = event, false) ||
629-
MakeCallback(env()->onmessage_string(),
630-
arraysize(cb_args),
631-
cb_args).IsEmpty()) {
632-
// Re-schedule OnMessage() execution in case of failure.
633-
if (data_)
634-
TriggerAsync();
635-
return;
636-
}
629+
Local<Object> event;
630+
Local<Value> cb_args[1];
631+
if (!env()->message_event_object_template()->NewInstance(context)
632+
.ToLocal(&event) ||
633+
event->Set(context, env()->data_string(), payload).IsNothing() ||
634+
event->Set(context, env()->target_string(), object()).IsNothing() ||
635+
(cb_args[0] = event, false) ||
636+
MakeCallback(env()->onmessage_string(),
637+
arraysize(cb_args),
638+
cb_args).IsEmpty()) {
639+
// Re-schedule OnMessage() execution in case of failure.
640+
if (data_)
641+
TriggerAsync();
642+
return;
637643
}
638644
}
639645
}
@@ -754,11 +760,26 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
754760

755761
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
756762
MessagePort* port;
757-
CHECK(args[0]->IsObject());
758763
ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
759764
port->OnMessage();
760765
}
761766

767+
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
768+
CHECK(args[0]->IsObject());
769+
MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
770+
if (port == nullptr) {
771+
// Return 'no messages' for a closed port.
772+
args.GetReturnValue().Set(
773+
Environment::GetCurrent(args)->no_message_symbol());
774+
return;
775+
}
776+
777+
MaybeLocal<Value> payload =
778+
port->ReceiveMessage(port->object()->CreationContext(), false);
779+
if (!payload.IsEmpty())
780+
args.GetReturnValue().Set(payload.ToLocalChecked());
781+
}
782+
762783
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
763784
Environment* env = Environment::GetCurrent(args);
764785
if (!args[0]->IsObject() ||
@@ -875,6 +896,7 @@ static void InitMessaging(Local<Object> target,
875896
// the browser equivalents do not provide them.
876897
env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
877898
env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
899+
env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
878900
env->SetMethod(target, "moveMessagePortToContext",
879901
MessagePort::MoveToContext);
880902
}

src/node_messaging.h

+3
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ class MessagePort : public HandleWrap {
163163
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
164164
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
165165
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
166+
static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
166167

167168
/* static */
168169
static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -200,6 +201,8 @@ class MessagePort : public HandleWrap {
200201
void OnClose() override;
201202
void OnMessage();
202203
void TriggerAsync();
204+
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
205+
bool only_if_receiving);
203206

204207
std::unique_ptr<MessagePortData> data_ = nullptr;
205208
bool receiving_messages_ = false;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
5+
6+
const { port1, port2 } = new MessageChannel();
7+
8+
const message1 = { hello: 'world' };
9+
const message2 = { foo: 'bar' };
10+
11+
// Make sure receiveMessageOnPort() works in a FIFO way, the same way it does
12+
// when we’re using events.
13+
assert.deepStrictEqual(receiveMessageOnPort(port2), undefined);
14+
port1.postMessage(message1);
15+
port1.postMessage(message2);
16+
assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 });
17+
assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message2 });
18+
assert.deepStrictEqual(receiveMessageOnPort(port2), undefined);
19+
assert.deepStrictEqual(receiveMessageOnPort(port2), undefined);
20+
21+
// Make sure message handlers aren’t called.
22+
port2.on('message', common.mustNotCall());
23+
port1.postMessage(message1);
24+
assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 });
25+
port1.close();

0 commit comments

Comments
 (0)