Skip to content

Commit 53f2371

Browse files
addaleaxBridgeAR
authored andcommitted
worker: prevent event loop starvation through MessagePorts
Limit the number of messages processed without interruption on a given `MessagePort` to prevent event loop starvation, but still make sure that all messages are emitted that were already in the queue when emitting began. This aligns the behaviour better with the web. Refs: #28030 PR-URL: #29315 Reviewed-By: Gus Caplan <[email protected]> Reviewed-By: Colin Ihrig <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Jeremiah Senkpiel <[email protected]>
1 parent 909c669 commit 53f2371

3 files changed

+63
-0
lines changed

src/node_messaging.cc

+19
Original file line numberDiff line numberDiff line change
@@ -604,11 +604,30 @@ void MessagePort::OnMessage() {
604604
HandleScope handle_scope(env()->isolate());
605605
Local<Context> context = object(env()->isolate())->CreationContext();
606606

607+
size_t processing_limit;
608+
{
609+
Mutex::ScopedLock(data_->mutex_);
610+
processing_limit = std::max(data_->incoming_messages_.size(),
611+
static_cast<size_t>(1000));
612+
}
613+
607614
// data_ can only ever be modified by the owner thread, so no need to lock.
608615
// However, the message port may be transferred while it is processing
609616
// messages, so we need to check that this handle still owns its `data_` field
610617
// on every iteration.
611618
while (data_) {
619+
if (processing_limit-- == 0) {
620+
// Prevent event loop starvation by only processing those messages without
621+
// interruption that were already present when the OnMessage() call was
622+
// first triggered, but at least 1000 messages because otherwise the
623+
// overhead of repeatedly triggering the uv_async_t instance becomes
624+
// noticable, at least on Windows.
625+
// (That might require more investigation by somebody more familiar with
626+
// Windows.)
627+
TriggerAsync();
628+
return;
629+
}
630+
612631
HandleScope handle_scope(env()->isolate());
613632
Context::Scope context_scope(context);
614633

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
'use strict';
2+
const common = require('../common');
3+
4+
const { MessageChannel } = require('worker_threads');
5+
6+
// Make sure that closing a message port while receiving messages on it does
7+
// not stop messages that are already in the queue from being emitted.
8+
9+
const { port1, port2 } = new MessageChannel();
10+
11+
port1.on('message', common.mustCall(() => {
12+
port1.close();
13+
}, 2));
14+
port2.postMessage('foo');
15+
port2.postMessage('bar');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
5+
const { MessageChannel } = require('worker_threads');
6+
7+
// Make sure that an infinite asynchronous .on('message')/postMessage loop
8+
// does not lead to a stack overflow and does not starve the event loop.
9+
// We schedule timeouts both from before the the .on('message') handler and
10+
// inside of it, which both should run.
11+
12+
const { port1, port2 } = new MessageChannel();
13+
let count = 0;
14+
port1.on('message', () => {
15+
if (count === 0) {
16+
setTimeout(common.mustCall(() => {
17+
port1.close();
18+
}), 0);
19+
}
20+
21+
port2.postMessage(0);
22+
assert(count++ < 10000, `hit ${count} loop iterations`);
23+
});
24+
25+
port2.postMessage(0);
26+
27+
// This is part of the test -- the event loop should be available and not stall
28+
// out due to the recursive .postMessage() calls.
29+
setTimeout(common.mustCall(), 0);

0 commit comments

Comments
 (0)