Skip to content

Commit d2b0568

Browse files
addaleaxtargos
authored andcommitted
worker: make transfer list behave like web MessagePort
Allow generic iterables as transfer list arguments, as well as an options object with a `transfer` option, for web compatibility. PR-URL: #29319 Refs: #28033 (comment) Reviewed-By: James M Snell <[email protected]>
1 parent a7331da commit d2b0568

5 files changed

+238
-77
lines changed

src/env.h

+3
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ constexpr size_t kFsStatsBufferLength =
211211
V(dns_soa_string, "SOA") \
212212
V(dns_srv_string, "SRV") \
213213
V(dns_txt_string, "TXT") \
214+
V(done_string, "done") \
214215
V(duration_string, "duration") \
215216
V(emit_warning_string, "emitWarning") \
216217
V(encoding_string, "encoding") \
@@ -272,6 +273,7 @@ constexpr size_t kFsStatsBufferLength =
272273
V(modulus_string, "modulus") \
273274
V(name_string, "name") \
274275
V(netmask_string, "netmask") \
276+
V(next_string, "next") \
275277
V(nistcurve_string, "nistCurve") \
276278
V(nsname_string, "nsname") \
277279
V(ocsp_request_string, "OCSPRequest") \
@@ -353,6 +355,7 @@ constexpr size_t kFsStatsBufferLength =
353355
V(ticketkeycallback_string, "onticketkeycallback") \
354356
V(timeout_string, "timeout") \
355357
V(tls_ticket_string, "tlsTicket") \
358+
V(transfer_string, "transfer") \
356359
V(ttl_string, "ttl") \
357360
V(type_string, "type") \
358361
V(uid_string, "uid") \

src/node_messaging.cc

+144-73
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ using v8::Object;
3131
using v8::ObjectTemplate;
3232
using v8::SharedArrayBuffer;
3333
using v8::String;
34+
using v8::Symbol;
3435
using v8::Value;
3536
using v8::ValueDeserializer;
3637
using v8::ValueSerializer;
@@ -304,7 +305,7 @@ class SerializerDelegate : public ValueSerializer::Delegate {
304305
Maybe<bool> Message::Serialize(Environment* env,
305306
Local<Context> context,
306307
Local<Value> input,
307-
Local<Value> transfer_list_v,
308+
const TransferList& transfer_list_v,
308309
Local<Object> source_port) {
309310
HandleScope handle_scope(env->isolate());
310311
Context::Scope context_scope(context);
@@ -317,72 +318,66 @@ Maybe<bool> Message::Serialize(Environment* env,
317318
delegate.serializer = &serializer;
318319

319320
std::vector<Local<ArrayBuffer>> array_buffers;
320-
if (transfer_list_v->IsArray()) {
321-
Local<Array> transfer_list = transfer_list_v.As<Array>();
322-
uint32_t length = transfer_list->Length();
323-
for (uint32_t i = 0; i < length; ++i) {
324-
Local<Value> entry;
325-
if (!transfer_list->Get(context, i).ToLocal(&entry))
326-
return Nothing<bool>();
327-
// Currently, we support ArrayBuffers and MessagePorts.
328-
if (entry->IsArrayBuffer()) {
329-
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
330-
// If we cannot render the ArrayBuffer unusable in this Isolate and
331-
// take ownership of its memory, copying the buffer will have to do.
332-
if (!ab->IsDetachable() || ab->IsExternal() ||
333-
!env->isolate_data()->uses_node_allocator()) {
334-
continue;
335-
}
336-
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
337-
array_buffers.end()) {
338-
ThrowDataCloneException(
339-
context,
340-
FIXED_ONE_BYTE_STRING(
341-
env->isolate(),
342-
"Transfer list contains duplicate ArrayBuffer"));
343-
return Nothing<bool>();
344-
}
345-
// We simply use the array index in the `array_buffers` list as the
346-
// ID that we write into the serialized buffer.
347-
uint32_t id = array_buffers.size();
348-
array_buffers.push_back(ab);
349-
serializer.TransferArrayBuffer(id, ab);
350-
continue;
351-
} else if (env->message_port_constructor_template()
352-
->HasInstance(entry)) {
353-
// Check if the source MessagePort is being transferred.
354-
if (!source_port.IsEmpty() && entry == source_port) {
355-
ThrowDataCloneException(
356-
context,
357-
FIXED_ONE_BYTE_STRING(env->isolate(),
358-
"Transfer list contains source port"));
359-
return Nothing<bool>();
360-
}
361-
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
362-
if (port == nullptr || port->IsDetached()) {
363-
ThrowDataCloneException(
364-
context,
365-
FIXED_ONE_BYTE_STRING(
366-
env->isolate(),
367-
"MessagePort in transfer list is already detached"));
368-
return Nothing<bool>();
369-
}
370-
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
371-
delegate.ports_.end()) {
372-
ThrowDataCloneException(
373-
context,
374-
FIXED_ONE_BYTE_STRING(
375-
env->isolate(),
376-
"Transfer list contains duplicate MessagePort"));
377-
return Nothing<bool>();
378-
}
379-
delegate.ports_.push_back(port);
321+
for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
322+
Local<Value> entry = transfer_list_v[i];
323+
// Currently, we support ArrayBuffers and MessagePorts.
324+
if (entry->IsArrayBuffer()) {
325+
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
326+
// If we cannot render the ArrayBuffer unusable in this Isolate and
327+
// take ownership of its memory, copying the buffer will have to do.
328+
if (!ab->IsDetachable() || ab->IsExternal() ||
329+
!env->isolate_data()->uses_node_allocator()) {
380330
continue;
381331
}
382-
383-
THROW_ERR_INVALID_TRANSFER_OBJECT(env);
384-
return Nothing<bool>();
332+
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
333+
array_buffers.end()) {
334+
ThrowDataCloneException(
335+
context,
336+
FIXED_ONE_BYTE_STRING(
337+
env->isolate(),
338+
"Transfer list contains duplicate ArrayBuffer"));
339+
return Nothing<bool>();
340+
}
341+
// We simply use the array index in the `array_buffers` list as the
342+
// ID that we write into the serialized buffer.
343+
uint32_t id = array_buffers.size();
344+
array_buffers.push_back(ab);
345+
serializer.TransferArrayBuffer(id, ab);
346+
continue;
347+
} else if (env->message_port_constructor_template()
348+
->HasInstance(entry)) {
349+
// Check if the source MessagePort is being transferred.
350+
if (!source_port.IsEmpty() && entry == source_port) {
351+
ThrowDataCloneException(
352+
context,
353+
FIXED_ONE_BYTE_STRING(env->isolate(),
354+
"Transfer list contains source port"));
355+
return Nothing<bool>();
356+
}
357+
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
358+
if (port == nullptr || port->IsDetached()) {
359+
ThrowDataCloneException(
360+
context,
361+
FIXED_ONE_BYTE_STRING(
362+
env->isolate(),
363+
"MessagePort in transfer list is already detached"));
364+
return Nothing<bool>();
365+
}
366+
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
367+
delegate.ports_.end()) {
368+
ThrowDataCloneException(
369+
context,
370+
FIXED_ONE_BYTE_STRING(
371+
env->isolate(),
372+
"Transfer list contains duplicate MessagePort"));
373+
return Nothing<bool>();
374+
}
375+
delegate.ports_.push_back(port);
376+
continue;
385377
}
378+
379+
THROW_ERR_INVALID_TRANSFER_OBJECT(env);
380+
return Nothing<bool>();
386381
}
387382

388383
serializer.WriteHeader();
@@ -664,7 +659,7 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
664659

665660
Maybe<bool> MessagePort::PostMessage(Environment* env,
666661
Local<Value> message_v,
667-
Local<Value> transfer_v) {
662+
const TransferList& transfer_v) {
668663
Isolate* isolate = env->isolate();
669664
Local<Object> obj = object(isolate);
670665
Local<Context> context = obj->CreationContext();
@@ -705,20 +700,98 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
705700
return Just(true);
706701
}
707702

703+
static Maybe<bool> ReadIterable(Environment* env,
704+
Local<Context> context,
705+
// NOLINTNEXTLINE(runtime/references)
706+
TransferList& transfer_list,
707+
Local<Value> object) {
708+
if (!object->IsObject()) return Just(false);
709+
710+
if (object->IsArray()) {
711+
Local<Array> arr = object.As<Array>();
712+
size_t length = arr->Length();
713+
transfer_list.AllocateSufficientStorage(length);
714+
for (size_t i = 0; i < length; i++) {
715+
if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
716+
return Nothing<bool>();
717+
}
718+
return Just(true);
719+
}
720+
721+
Isolate* isolate = env->isolate();
722+
Local<Value> iterator_method;
723+
if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
724+
.ToLocal(&iterator_method)) return Nothing<bool>();
725+
if (!iterator_method->IsFunction()) return Just(false);
726+
727+
Local<Value> iterator;
728+
if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
729+
.ToLocal(&iterator)) return Nothing<bool>();
730+
if (!iterator->IsObject()) return Just(false);
731+
732+
Local<Value> next;
733+
if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
734+
return Nothing<bool>();
735+
if (!next->IsFunction()) return Just(false);
736+
737+
std::vector<Local<Value>> entries;
738+
while (env->can_call_into_js()) {
739+
Local<Value> result;
740+
if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
741+
.ToLocal(&result)) return Nothing<bool>();
742+
if (!result->IsObject()) return Just(false);
743+
744+
Local<Value> done;
745+
if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
746+
return Nothing<bool>();
747+
if (done->BooleanValue(isolate)) break;
748+
749+
Local<Value> val;
750+
if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
751+
return Nothing<bool>();
752+
entries.push_back(val);
753+
}
754+
755+
transfer_list.AllocateSufficientStorage(entries.size());
756+
std::copy(entries.begin(), entries.end(), &transfer_list[0]);
757+
return Just(true);
758+
}
759+
708760
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
709761
Environment* env = Environment::GetCurrent(args);
762+
Local<Object> obj = args.This();
763+
Local<Context> context = obj->CreationContext();
764+
710765
if (args.Length() == 0) {
711766
return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
712767
"MessagePort.postMessage");
713768
}
769+
714770
if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
715771
// Browsers ignore null or undefined, and otherwise accept an array or an
716772
// options object.
717-
// TODO(addaleax): Add support for an options object and generic sequence
718-
// support.
719-
// Refs: https://github.com/nodejs/node/pull/28033#discussion_r289964991
720773
return THROW_ERR_INVALID_ARG_TYPE(env,
721-
"Optional transferList argument must be an array");
774+
"Optional transferList argument must be an iterable");
775+
}
776+
777+
TransferList transfer_list;
778+
if (args[1]->IsObject()) {
779+
bool was_iterable;
780+
if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
781+
return;
782+
if (!was_iterable) {
783+
Local<Value> transfer_option;
784+
if (!args[1].As<Object>()->Get(context, env->transfer_string())
785+
.ToLocal(&transfer_option)) return;
786+
if (!transfer_option->IsUndefined()) {
787+
if (!ReadIterable(env, context, transfer_list, transfer_option)
788+
.To(&was_iterable)) return;
789+
if (!was_iterable) {
790+
return THROW_ERR_INVALID_ARG_TYPE(env,
791+
"Optional options.transfer argument must be an iterable");
792+
}
793+
}
794+
}
722795
}
723796

724797
MessagePort* port = Unwrap<MessagePort>(args.This());
@@ -727,13 +800,11 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
727800
// transfers.
728801
if (port == nullptr) {
729802
Message msg;
730-
Local<Object> obj = args.This();
731-
Local<Context> context = obj->CreationContext();
732-
USE(msg.Serialize(env, context, args[0], args[1], obj));
803+
USE(msg.Serialize(env, context, args[0], transfer_list, obj));
733804
return;
734805
}
735806

736-
port->PostMessage(env, args[0], args[1]);
807+
port->PostMessage(env, args[0], transfer_list);
737808
}
738809

739810
void MessagePort::Start() {

src/node_messaging.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ namespace worker {
1414
class MessagePortData;
1515
class MessagePort;
1616

17+
typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
18+
1719
// Represents a single communication message.
1820
class Message : public MemoryRetainer {
1921
public:
@@ -44,7 +46,7 @@ class Message : public MemoryRetainer {
4446
v8::Maybe<bool> Serialize(Environment* env,
4547
v8::Local<v8::Context> context,
4648
v8::Local<v8::Value> input,
47-
v8::Local<v8::Value> transfer_list,
49+
const TransferList& transfer_list,
4850
v8::Local<v8::Object> source_port =
4951
v8::Local<v8::Object>());
5052

@@ -149,7 +151,7 @@ class MessagePort : public HandleWrap {
149151
// serialized with transfers, then silently discarded.
150152
v8::Maybe<bool> PostMessage(Environment* env,
151153
v8::Local<v8::Value> message,
152-
v8::Local<v8::Value> transfer);
154+
const TransferList& transfer);
153155

154156
// Start processing messages on this port as a receiving end.
155157
void Start();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
'use strict';
2+
const common = require('../common');
3+
4+
const { parentPort, MessageChannel, Worker } = require('worker_threads');
5+
6+
// Do not use isMainThread so that this test itself can be run inside a Worker.
7+
if (!process.env.HAS_STARTED_WORKER) {
8+
process.env.HAS_STARTED_WORKER = 1;
9+
const w = new Worker(__filename);
10+
w.once('message', common.mustCall(() => {
11+
w.once('message', common.mustNotCall());
12+
setTimeout(() => w.terminate(), 100);
13+
}));
14+
} else {
15+
const { port1 } = new MessageChannel();
16+
17+
parentPort.postMessage('ready');
18+
19+
// Make sure we don’t end up running JS after the infinite loop is broken.
20+
port1.postMessage({}, {
21+
transfer: (function*() { while (true); })()
22+
});
23+
24+
parentPort.postMessage('UNREACHABLE');
25+
process.kill(process.pid, 'SIGINT');
26+
}

0 commit comments

Comments
 (0)