Skip to content

Commit ed774b7

Browse files
TimothyGutargos
authored andcommitted
messaging: fix edge cases with transferring ports
Currently, transferring the port on which postMessage is called causes a segmentation fault, and transferring the target port causes a subsequent port.onmessage setting to throw, or a deadlock if onmessage is set before the postMessage. Fix both of these behaviors and align the methods more closely with the normative definitions in the HTML Standard. Also, per spec postMessage must not throw just because the ports are disentangled. Implement that behavior. PR-URL: #21540 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 221c8bd commit ed774b7

5 files changed

+213
-28
lines changed

src/node_messaging.cc

+76-23
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
144144

145145
namespace {
146146

147-
void ThrowDataCloneError(Environment* env, Local<String> message) {
147+
void ThrowDataCloneException(Environment* env, Local<String> message) {
148148
Local<Value> argv[] = {
149149
message,
150150
FIXED_ONE_BYTE_STRING(env->isolate(), "DataCloneError")
@@ -168,7 +168,7 @@ class SerializerDelegate : public ValueSerializer::Delegate {
168168
: env_(env), context_(context), msg_(m) {}
169169

170170
void ThrowDataCloneError(Local<String> message) override {
171-
ThrowDataCloneError(env_, message);
171+
ThrowDataCloneException(env_, message);
172172
}
173173

174174
Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
@@ -239,7 +239,8 @@ class SerializerDelegate : public ValueSerializer::Delegate {
239239
Maybe<bool> Message::Serialize(Environment* env,
240240
Local<Context> context,
241241
Local<Value> input,
242-
Local<Value> transfer_list_v) {
242+
Local<Value> transfer_list_v,
243+
Local<Object> source_port) {
243244
HandleScope handle_scope(env->isolate());
244245
Context::Scope context_scope(context);
245246

@@ -273,8 +274,23 @@ Maybe<bool> Message::Serialize(Environment* env,
273274
continue;
274275
} else if (env->message_port_constructor_template()
275276
->HasInstance(entry)) {
277+
// Check if the source MessagePort is being transferred.
278+
if (!source_port.IsEmpty() && entry == source_port) {
279+
ThrowDataCloneException(
280+
env,
281+
FIXED_ONE_BYTE_STRING(env->isolate(),
282+
"Transfer list contains source port"));
283+
return Nothing<bool>();
284+
}
276285
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
277-
CHECK_NE(port, nullptr);
286+
if (port == nullptr || port->IsDetached()) {
287+
ThrowDataCloneException(
288+
env,
289+
FIXED_ONE_BYTE_STRING(
290+
env->isolate(),
291+
"MessagePort in transfer list is already detached"));
292+
return Nothing<bool>();
293+
}
278294
delegate.ports_.push_back(port);
279295
continue;
280296
}
@@ -410,6 +426,10 @@ uv_async_t* MessagePort::async() {
410426
return reinterpret_cast<uv_async_t*>(GetHandle());
411427
}
412428

429+
bool MessagePort::IsDetached() const {
430+
return data_ == nullptr || IsHandleClosing();
431+
}
432+
413433
void MessagePort::TriggerAsync() {
414434
if (IsHandleClosing()) return;
415435
CHECK_EQ(uv_async_send(async()), 0);
@@ -552,36 +572,69 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
552572
}
553573

554574

555-
void MessagePort::Send(Message&& message) {
556-
Mutex::ScopedLock lock(*data_->sibling_mutex_);
557-
if (data_->sibling_ == nullptr)
558-
return;
559-
data_->sibling_->AddToIncomingQueue(std::move(message));
560-
}
575+
Maybe<bool> MessagePort::PostMessage(Environment* env,
576+
Local<Value> message_v,
577+
Local<Value> transfer_v) {
578+
Isolate* isolate = env->isolate();
579+
Local<Object> obj = object(isolate);
580+
Local<Context> context = obj->CreationContext();
561581

562-
void MessagePort::Send(const FunctionCallbackInfo<Value>& args) {
563-
Environment* env = Environment::GetCurrent(args);
564-
Local<Context> context = object(env->isolate())->CreationContext();
565582
Message msg;
566-
if (msg.Serialize(env, context, args[0], args[1])
567-
.IsNothing()) {
568-
return;
583+
584+
// Per spec, we need to both check if transfer list has the source port, and
585+
// serialize the input message, even if the MessagePort is closed or detached.
586+
587+
Maybe<bool> serialization_maybe =
588+
msg.Serialize(env, context, message_v, transfer_v, obj);
589+
if (data_ == nullptr) {
590+
return serialization_maybe;
591+
}
592+
if (serialization_maybe.IsNothing()) {
593+
return Nothing<bool>();
594+
}
595+
596+
Mutex::ScopedLock lock(*data_->sibling_mutex_);
597+
bool doomed = false;
598+
599+
// Check if the target port is posted to itself.
600+
if (data_->sibling_ != nullptr) {
601+
for (const auto& port_data : msg.message_ports()) {
602+
if (data_->sibling_ == port_data.get()) {
603+
doomed = true;
604+
ProcessEmitWarning(env, "The target port was posted to itself, and "
605+
"the communication channel was lost");
606+
break;
607+
}
608+
}
569609
}
570-
Send(std::move(msg));
610+
611+
if (data_->sibling_ == nullptr || doomed)
612+
return Just(true);
613+
614+
data_->sibling_->AddToIncomingQueue(std::move(msg));
615+
return Just(true);
571616
}
572617

573618
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
574619
Environment* env = Environment::GetCurrent(args);
575-
MessagePort* port;
576-
ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
577-
if (!port->data_) {
578-
return THROW_ERR_CLOSED_MESSAGE_PORT(env);
579-
}
580620
if (args.Length() == 0) {
581621
return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
582622
"MessagePort.postMessage");
583623
}
584-
port->Send(args);
624+
625+
MessagePort* port = Unwrap<MessagePort>(args.This());
626+
// Even if the backing MessagePort object has already been deleted, we still
627+
// want to serialize the message to ensure spec-compliant behavior w.r.t.
628+
// transfers.
629+
if (port == nullptr) {
630+
Message msg;
631+
Local<Object> obj = args.This();
632+
Local<Context> context = obj->CreationContext();
633+
USE(msg.Serialize(env, context, args[0], args[1], obj));
634+
return;
635+
}
636+
637+
port->PostMessage(env, args[0], args[1]);
585638
}
586639

587640
void MessagePort::Start() {

src/node_messaging.h

+26-5
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ class Message {
3232
// Serialize a JS value, and optionally transfer objects, into this message.
3333
// The Message object retains ownership of all transferred objects until
3434
// deserialization.
35+
// The source_port parameter, if provided, will make Serialize() throw a
36+
// "DataCloneError" DOMException if source_port is found in transfer_list.
3537
v8::Maybe<bool> Serialize(Environment* env,
3638
v8::Local<v8::Context> context,
3739
v8::Local<v8::Value> input,
38-
v8::Local<v8::Value> transfer_list);
40+
v8::Local<v8::Value> transfer_list,
41+
v8::Local<v8::Object> source_port =
42+
v8::Local<v8::Object>());
3943

4044
// Internal method of Message that is called when a new SharedArrayBuffer
4145
// object is encountered in the incoming value's structure.
@@ -44,6 +48,13 @@ class Message {
4448
// and that transfers ownership of `data` to this message.
4549
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);
4650

51+
// The MessagePorts that will be transferred, as recorded by Serialize().
52+
// Used for warning user about posting the target MessagePort to itself,
53+
// which will as a side effect destroy the communication channel.
54+
const std::vector<std::unique_ptr<MessagePortData>>& message_ports() const {
55+
return message_ports_;
56+
}
57+
4758
private:
4859
MallocedBuffer<char> main_message_buf_;
4960
std::vector<MallocedBuffer<char>> array_buffer_contents_;
@@ -122,10 +133,11 @@ class MessagePort : public HandleWrap {
122133
std::unique_ptr<MessagePortData> data = nullptr);
123134

124135
// Send a message, i.e. deliver it into the sibling's incoming queue.
125-
// If there is no sibling, i.e. this port is closed,
126-
// this message is silently discarded.
127-
void Send(Message&& message);
128-
void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
136+
// If this port is closed, or if there is no sibling, this message is
137+
// serialized with transfers, then silently discarded.
138+
v8::Maybe<bool> PostMessage(Environment* env,
139+
v8::Local<v8::Value> message,
140+
v8::Local<v8::Value> transfer);
129141
// Deliver a single message into this port's incoming queue.
130142
void AddToIncomingQueue(Message&& message);
131143

@@ -157,6 +169,15 @@ class MessagePort : public HandleWrap {
157169
void Close(
158170
v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
159171

172+
// Returns true if either data_ has been freed, or if the handle is being
173+
// closed. Equivalent to the [[Detached]] internal slot in the HTML Standard.
174+
//
175+
// If checking if a JavaScript MessagePort object is detached, this method
176+
// alone is often not enough, since the backing C++ MessagePort object may
177+
// have been deleted already. For all intents and purposes, an object with a
178+
// NULL pointer to the C++ MessagePort object is also detached.
179+
inline bool IsDetached() const;
180+
160181
size_t self_size() const override;
161182

162183
private:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Flags: --experimental-worker
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { MessageChannel } = require('worker_threads');
7+
8+
// This tests various behaviors around transferring MessagePorts with closing
9+
// or closed handles.
10+
11+
const { port1, port2 } = new MessageChannel();
12+
13+
const arrayBuf = new ArrayBuffer(10);
14+
port1.onmessage = common.mustNotCall();
15+
port2.onmessage = common.mustNotCall();
16+
17+
function testSingle(closedPort, potentiallyOpenPort) {
18+
assert.throws(common.mustCall(() => {
19+
potentiallyOpenPort.postMessage(null, [arrayBuf, closedPort]);
20+
}), common.mustCall((err) => {
21+
assert.strictEqual(err.name, 'DataCloneError');
22+
assert.strictEqual(err.message,
23+
'MessagePort in transfer list is already detached');
24+
assert.strictEqual(err.code, 25);
25+
assert.ok(err instanceof Error);
26+
27+
const DOMException = err.constructor;
28+
assert.ok(err instanceof DOMException);
29+
assert.strictEqual(DOMException.name, 'DOMException');
30+
31+
return true;
32+
}));
33+
34+
// arrayBuf must not be transferred, even though it is present earlier in the
35+
// transfer list than the closedPort.
36+
assert.strictEqual(arrayBuf.byteLength, 10);
37+
}
38+
39+
function testBothClosed() {
40+
testSingle(port1, port2);
41+
testSingle(port2, port1);
42+
}
43+
44+
// Even though the port handles may not be completely closed in C++ land, the
45+
// observable behavior must be that the closing/detachment is synchronous and
46+
// instant.
47+
48+
port1.close(common.mustCall(testBothClosed));
49+
testSingle(port1, port2);
50+
port2.close(common.mustCall(testBothClosed));
51+
testBothClosed();
52+
53+
setTimeout(common.mustNotCall('The communication channel is still open'),
54+
common.platformTimeout(1000)).unref();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Flags: --experimental-worker
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { MessageChannel } = require('worker_threads');
7+
8+
const { port1, port2 } = new MessageChannel();
9+
10+
assert.throws(common.mustCall(() => {
11+
port1.postMessage(null, [port1]);
12+
}), common.mustCall((err) => {
13+
assert.strictEqual(err.name, 'DataCloneError');
14+
assert.strictEqual(err.message, 'Transfer list contains source port');
15+
assert.strictEqual(err.code, 25);
16+
assert.ok(err instanceof Error);
17+
18+
const DOMException = err.constructor;
19+
assert.ok(err instanceof DOMException);
20+
assert.strictEqual(DOMException.name, 'DOMException');
21+
22+
return true;
23+
}));
24+
25+
// The failed transfer should not affect the ports in anyway.
26+
port2.onmessage = common.mustCall((message) => {
27+
assert.strictEqual(message, 2);
28+
port1.close();
29+
30+
setTimeout(common.mustNotCall('The communication channel is still open'),
31+
common.platformTimeout(1000)).unref();
32+
});
33+
port1.postMessage(2);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Flags: --experimental-worker
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { MessageChannel } = require('worker_threads');
7+
8+
const { port1, port2 } = new MessageChannel();
9+
10+
const arrayBuf = new ArrayBuffer(10);
11+
12+
common.expectWarning('Warning',
13+
'The target port was posted to itself, and the ' +
14+
'communication channel was lost',
15+
common.noWarnCode);
16+
port2.onmessage = common.mustNotCall();
17+
port2.postMessage(null, [port1, arrayBuf]);
18+
19+
// arrayBuf must be transferred, despite the fact that port2 never received the
20+
// message.
21+
assert.strictEqual(arrayBuf.byteLength, 0);
22+
23+
setTimeout(common.mustNotCall('The communication channel is still open'),
24+
common.platformTimeout(1000)).unref();

0 commit comments

Comments
 (0)