@@ -37,6 +37,19 @@ using v8::ValueSerializer;
37
37
using v8::WasmModuleObject;
38
38
39
39
namespace node {
40
+
41
+ BaseObject::TransferMode BaseObject::GetTransferMode () const {
42
+ return BaseObject::TransferMode::kUntransferable ;
43
+ }
44
+
45
+ std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging () {
46
+ return CloneForMessaging ();
47
+ }
48
+
49
+ std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging () const {
50
+ return {};
51
+ }
52
+
40
53
namespace worker {
41
54
42
55
Message::Message (MallocedBuffer<char >&& buffer)
@@ -55,21 +68,20 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
55
68
DeserializerDelegate (
56
69
Message* m,
57
70
Environment* env,
58
- const std::vector<MessagePort*>& message_ports ,
71
+ const std::vector<BaseObjectPtr<BaseObject>>& host_objects ,
59
72
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
60
73
const std::vector<CompiledWasmModule>& wasm_modules)
61
- : message_ports_(message_ports ),
74
+ : host_objects_(host_objects ),
62
75
shared_array_buffers_ (shared_array_buffers),
63
76
wasm_modules_(wasm_modules) {}
64
77
65
78
MaybeLocal<Object> ReadHostObject (Isolate* isolate) override {
66
- // Currently, only MessagePort hosts objects are supported, so identifying
67
- // by the index in the message's MessagePort array is sufficient.
79
+ // Identifying the index in the message's BaseObject array is sufficient.
68
80
uint32_t id;
69
81
if (!deserializer->ReadUint32 (&id))
70
82
return MaybeLocal<Object>();
71
- CHECK_LE (id, message_ports_ .size ());
72
- return message_ports_ [id]->object (isolate);
83
+ CHECK_LE (id, host_objects_ .size ());
84
+ return host_objects_ [id]->object (isolate);
73
85
}
74
86
75
87
MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId (
@@ -88,7 +100,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
88
100
ValueDeserializer* deserializer = nullptr ;
89
101
90
102
private:
91
- const std::vector<MessagePort*>& message_ports_ ;
103
+ const std::vector<BaseObjectPtr<BaseObject>>& host_objects_ ;
92
104
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
93
105
const std::vector<CompiledWasmModule>& wasm_modules_;
94
106
};
@@ -102,22 +114,25 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
102
114
EscapableHandleScope handle_scope (env->isolate ());
103
115
Context::Scope context_scope (context);
104
116
105
- // Create all necessary MessagePort handles.
106
- std::vector<MessagePort*> ports (message_ports_.size ());
107
- for (uint32_t i = 0 ; i < message_ports_.size (); ++i) {
108
- ports[i] = MessagePort::New (env,
109
- context,
110
- std::move (message_ports_[i]));
111
- if (ports[i] == nullptr ) {
112
- for (MessagePort* port : ports) {
113
- // This will eventually release the MessagePort object itself.
114
- if (port != nullptr )
115
- port->Close ();
117
+ // Create all necessary objects for transferables, e.g. MessagePort handles.
118
+ std::vector<BaseObjectPtr<BaseObject>> host_objects (transferables_.size ());
119
+ for (uint32_t i = 0 ; i < transferables_.size (); ++i) {
120
+ TransferData* data = transferables_[i].get ();
121
+ host_objects[i] = data->Deserialize (
122
+ env, context, std::move (transferables_[i]));
123
+ if (!host_objects[i]) {
124
+ for (BaseObjectPtr<BaseObject> object : host_objects) {
125
+ if (!object) continue ;
126
+
127
+ // Since creating one of the objects failed, we don't want to have the
128
+ // other objects lying around in memory. We act as if the object has
129
+ // been garbage-collected.
130
+ object->Detach ();
116
131
}
117
132
return MaybeLocal<Value>();
118
133
}
119
134
}
120
- message_ports_ .clear ();
135
+ transferables_ .clear ();
121
136
122
137
std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
123
138
// Attach all transferred SharedArrayBuffers to their new Isolate.
@@ -130,7 +145,7 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
130
145
shared_array_buffers_.clear ();
131
146
132
147
DeserializerDelegate delegate (
133
- this , env, ports , shared_array_buffers, wasm_modules_);
148
+ this , env, host_objects , shared_array_buffers, wasm_modules_);
134
149
ValueDeserializer deserializer (
135
150
env->isolate (),
136
151
reinterpret_cast <const uint8_t *>(main_message_buf_.data ),
@@ -157,8 +172,8 @@ void Message::AddSharedArrayBuffer(
157
172
shared_array_buffers_.emplace_back (std::move (backing_store));
158
173
}
159
174
160
- void Message::AddMessagePort (std::unique_ptr<MessagePortData >&& data) {
161
- message_ports_ .emplace_back (std::move (data));
175
+ void Message::AddTransferable (std::unique_ptr<TransferData >&& data) {
176
+ transferables_ .emplace_back (std::move (data));
162
177
}
163
178
164
179
uint32_t Message::AddWASMModule (CompiledWasmModule&& mod) {
@@ -224,8 +239,8 @@ class SerializerDelegate : public ValueSerializer::Delegate {
224
239
}
225
240
226
241
Maybe<bool > WriteHostObject (Isolate* isolate, Local<Object> object) override {
227
- if (env_->message_port_constructor_template ()->HasInstance (object)) {
228
- return WriteMessagePort (Unwrap<MessagePort >(object));
242
+ if (env_->base_object_ctor_template ()->HasInstance (object)) {
243
+ return WriteHostObject (Unwrap<BaseObject >(object));
229
244
}
230
245
231
246
ThrowDataCloneError (env_->clone_unsupported_type_str ());
@@ -257,32 +272,61 @@ class SerializerDelegate : public ValueSerializer::Delegate {
257
272
void Finish () {
258
273
// Only close the MessagePort handles and actually transfer them
259
274
// once we know that serialization succeeded.
260
- for (MessagePort* port : ports_) {
261
- port->Close ();
262
- msg_->AddMessagePort (port->Detach ());
275
+ for (uint32_t i = 0 ; i < host_objects_.size (); i++) {
276
+ BaseObject* host_object = host_objects_[i];
277
+ std::unique_ptr<TransferData> data;
278
+ if (i < first_cloned_object_index_)
279
+ data = host_object->TransferForMessaging ();
280
+ if (!data)
281
+ data = host_object->CloneForMessaging ();
282
+ CHECK (data);
283
+ msg_->AddTransferable (std::move (data));
263
284
}
264
285
}
265
286
287
+ inline void AddHostObject (BaseObject* host_object) {
288
+ // Make sure we have not started serializing the value itself yet.
289
+ CHECK_EQ (first_cloned_object_index_, SIZE_MAX);
290
+ host_objects_.push_back (host_object);
291
+ }
292
+
266
293
ValueSerializer* serializer = nullptr ;
267
294
268
295
private:
269
- Maybe<bool > WriteMessagePort (MessagePort* port ) {
270
- for (uint32_t i = 0 ; i < ports_ .size (); i++) {
271
- if (ports_ [i] == port ) {
296
+ Maybe<bool > WriteHostObject (BaseObject* host_object ) {
297
+ for (uint32_t i = 0 ; i < host_objects_ .size (); i++) {
298
+ if (host_objects_ [i] == host_object ) {
272
299
serializer->WriteUint32 (i);
273
300
return Just (true );
274
301
}
275
302
}
276
303
277
- THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST (env_);
278
- return Nothing<bool >();
304
+ BaseObject::TransferMode mode = host_object->GetTransferMode ();
305
+ if (mode == BaseObject::TransferMode::kUntransferable ) {
306
+ ThrowDataCloneError (env_->clone_unsupported_type_str ());
307
+ return Nothing<bool >();
308
+ } else if (mode == BaseObject::TransferMode::kTransferable ) {
309
+ // TODO(addaleax): This message code is too specific. Fix that in a
310
+ // semver-major follow-up.
311
+ THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST (env_);
312
+ return Nothing<bool >();
313
+ }
314
+
315
+ CHECK_EQ (mode, BaseObject::TransferMode::kCloneable );
316
+ uint32_t index = host_objects_.size ();
317
+ if (first_cloned_object_index_ == SIZE_MAX)
318
+ first_cloned_object_index_ = index ;
319
+ serializer->WriteUint32 (index );
320
+ host_objects_.push_back (host_object);
321
+ return Just (true );
279
322
}
280
323
281
324
Environment* env_;
282
325
Local<Context> context_;
283
326
Message* msg_;
284
327
std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
285
- std::vector<MessagePort*> ports_;
328
+ std::vector<BaseObject*> host_objects_;
329
+ size_t first_cloned_object_index_ = SIZE_MAX;
286
330
287
331
friend class worker ::Message;
288
332
};
@@ -344,8 +388,7 @@ Maybe<bool> Message::Serialize(Environment* env,
344
388
array_buffers.push_back (ab);
345
389
serializer.TransferArrayBuffer (id, ab);
346
390
continue ;
347
- } else if (env->message_port_constructor_template ()
348
- ->HasInstance (entry)) {
391
+ } else if (env->base_object_ctor_template ()->HasInstance (entry)) {
349
392
// Check if the source MessagePort is being transferred.
350
393
if (!source_port.IsEmpty () && entry == source_port) {
351
394
ThrowDataCloneException (
@@ -354,26 +397,34 @@ Maybe<bool> Message::Serialize(Environment* env,
354
397
" Transfer list contains source port" ));
355
398
return Nothing<bool >();
356
399
}
357
- MessagePort* port = Unwrap<MessagePort>(entry.As <Object>());
358
- if (port == nullptr || port->IsDetached ()) {
400
+ BaseObject* host_object = Unwrap<BaseObject>(entry.As <Object>());
401
+ if (env->message_port_constructor_template ()->HasInstance (entry) &&
402
+ (host_object == nullptr ||
403
+ static_cast <MessagePort*>(host_object)->IsDetached ())) {
359
404
ThrowDataCloneException (
360
405
context,
361
406
FIXED_ONE_BYTE_STRING (
362
407
env->isolate (),
363
408
" MessagePort in transfer list is already detached" ));
364
409
return Nothing<bool >();
365
410
}
366
- if (std::find (delegate.ports_ .begin (), delegate.ports_ .end (), port) !=
367
- delegate.ports_ .end ()) {
411
+ if (std::find (delegate.host_objects_ .begin (),
412
+ delegate.host_objects_ .end (),
413
+ host_object) != delegate.host_objects_ .end ()) {
368
414
ThrowDataCloneException (
369
415
context,
370
- FIXED_ONE_BYTE_STRING (
371
- env->isolate (),
372
- " Transfer list contains duplicate MessagePort" ));
416
+ String::Concat (env->isolate (),
417
+ FIXED_ONE_BYTE_STRING (
418
+ env->isolate (),
419
+ " Transfer list contains duplicate " ),
420
+ entry.As <Object>()->GetConstructorName ()));
373
421
return Nothing<bool >();
374
422
}
375
- delegate.ports_ .push_back (port);
376
- continue ;
423
+ if (host_object != nullptr && host_object->GetTransferMode () !=
424
+ BaseObject::TransferMode::kUntransferable ) {
425
+ delegate.AddHostObject (host_object);
426
+ continue ;
427
+ }
377
428
}
378
429
379
430
THROW_ERR_INVALID_TRANSFER_OBJECT (env);
@@ -406,7 +457,7 @@ Maybe<bool> Message::Serialize(Environment* env,
406
457
void Message::MemoryInfo (MemoryTracker* tracker) const {
407
458
tracker->TrackField (" array_buffers_" , array_buffers_);
408
459
tracker->TrackField (" shared_array_buffers" , shared_array_buffers_);
409
- tracker->TrackField (" message_ports " , message_ports_ );
460
+ tracker->TrackField (" transferables " , transferables_ );
410
461
}
411
462
412
463
MessagePortData::MessagePortData (MessagePort* owner) : owner_(owner) { }
@@ -672,6 +723,25 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
672
723
return std::move (data_);
673
724
}
674
725
726
+ BaseObject::TransferMode MessagePort::GetTransferMode () const {
727
+ if (IsDetached ())
728
+ return BaseObject::TransferMode::kUntransferable ;
729
+ return BaseObject::TransferMode::kTransferable ;
730
+ }
731
+
732
+ std::unique_ptr<TransferData> MessagePort::TransferForMessaging () {
733
+ Close ();
734
+ return Detach ();
735
+ }
736
+
737
+ BaseObjectPtr<BaseObject> MessagePortData::Deserialize (
738
+ Environment* env,
739
+ Local<Context> context,
740
+ std::unique_ptr<TransferData> self) {
741
+ return BaseObjectPtr<MessagePort> { MessagePort::New (
742
+ env, context,
743
+ static_unique_pointer_cast<MessagePortData>(std::move (self))) };
744
+ }
675
745
676
746
Maybe<bool > MessagePort::PostMessage (Environment* env,
677
747
Local<Value> message_v,
@@ -699,8 +769,8 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
699
769
700
770
// Check if the target port is posted to itself.
701
771
if (data_->sibling_ != nullptr ) {
702
- for (const auto & port_data : msg.message_ports ()) {
703
- if (data_->sibling_ == port_data .get ()) {
772
+ for (const auto & transferable : msg.transferables ()) {
773
+ if (data_->sibling_ == transferable .get ()) {
704
774
doomed = true ;
705
775
ProcessEmitWarning (env, " The target port was posted to itself, and "
706
776
" the communication channel was lost" );
0 commit comments