@@ -40,6 +40,10 @@ namespace worker {
40
40
Message::Message (MallocedBuffer<char >&& buffer)
41
41
: main_message_buf_(std::move(buffer)) {}
42
42
43
+ bool Message::IsCloseMessage () const {
44
+ return main_message_buf_.data == nullptr ;
45
+ }
46
+
43
47
namespace {
44
48
45
49
// This is used to tell V8 how to read transferred host objects, like other
@@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
91
95
92
96
MaybeLocal<Value> Message::Deserialize (Environment* env,
93
97
Local<Context> context) {
98
+ CHECK (!IsCloseMessage ());
99
+
94
100
EscapableHandleScope handle_scope (env->isolate ());
95
101
Context::Scope context_scope (context);
96
102
@@ -395,6 +401,7 @@ Maybe<bool> Message::Serialize(Environment* env,
395
401
396
402
// The serializer gave us a buffer allocated using `malloc()`.
397
403
std::pair<uint8_t *, size_t > data = serializer.Release ();
404
+ CHECK_NOT_NULL (data.first );
398
405
main_message_buf_ =
399
406
MallocedBuffer<char >(reinterpret_cast <char *>(data.first ), data.second );
400
407
return Just (true );
@@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
430
437
}
431
438
}
432
439
433
- bool MessagePortData::IsSiblingClosed () const {
434
- Mutex::ScopedLock lock (*sibling_mutex_);
435
- return sibling_ == nullptr ;
436
- }
437
-
438
440
void MessagePortData::Entangle (MessagePortData* a, MessagePortData* b) {
439
441
CHECK_NULL (a->sibling_ );
440
442
CHECK_NULL (b->sibling_ );
@@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
443
445
a->sibling_mutex_ = b->sibling_mutex_ ;
444
446
}
445
447
446
- void MessagePortData::PingOwnerAfterDisentanglement () {
447
- Mutex::ScopedLock lock (mutex_);
448
- if (owner_ != nullptr )
449
- owner_->TriggerAsync ();
450
- }
451
-
452
448
void MessagePortData::Disentangle () {
453
449
// Grab a copy of the sibling mutex, then replace it so that each sibling
454
450
// has its own sibling_mutex_ now.
@@ -462,11 +458,12 @@ void MessagePortData::Disentangle() {
462
458
sibling_ = nullptr ;
463
459
}
464
460
465
- // We close MessagePorts after disentanglement, so we trigger the
466
- // corresponding uv_async_t to let them know that this happened.
467
- PingOwnerAfterDisentanglement ();
461
+ // We close MessagePorts after disentanglement, so we enqueue a corresponding
462
+ // message and trigger the corresponding uv_async_t to let them know that
463
+ // this happened.
464
+ AddToIncomingQueue (Message ());
468
465
if (sibling != nullptr ) {
469
- sibling->PingOwnerAfterDisentanglement ( );
466
+ sibling->AddToIncomingQueue ( Message () );
470
467
}
471
468
}
472
469
@@ -590,14 +587,25 @@ void MessagePort::OnMessage() {
590
587
Debug (this , " MessagePort has message, receiving = %d" ,
591
588
static_cast <int >(receiving_messages_));
592
589
593
- if (!receiving_messages_)
594
- break ;
595
- if (data_->incoming_messages_ .empty ())
590
+ // We have nothing to do if:
591
+ // - There are no pending messages
592
+ // - We are not intending to receive messages, and the message we would
593
+ // receive is not the final "close" message.
594
+ if (data_->incoming_messages_ .empty () ||
595
+ (!receiving_messages_ &&
596
+ !data_->incoming_messages_ .front ().IsCloseMessage ())) {
596
597
break ;
598
+ }
599
+
597
600
received = std::move (data_->incoming_messages_ .front ());
598
601
data_->incoming_messages_ .pop_front ();
599
602
}
600
603
604
+ if (received.IsCloseMessage ()) {
605
+ Close ();
606
+ return ;
607
+ }
608
+
601
609
if (!env ()->can_call_into_js ()) {
602
610
Debug (this , " MessagePort drains queue because !can_call_into_js()" );
603
611
// In this case there is nothing to do but to drain the current queue.
@@ -628,15 +636,6 @@ void MessagePort::OnMessage() {
628
636
}
629
637
}
630
638
}
631
-
632
- if (data_ && data_->IsSiblingClosed ()) {
633
- Close ();
634
- }
635
- }
636
-
637
- bool MessagePort::IsSiblingClosed () const {
638
- CHECK (data_);
639
- return data_->IsSiblingClosed ();
640
639
}
641
640
642
641
void MessagePort::OnClose () {
0 commit comments