Skip to content

Commit 0ca4bd1

Browse files
mscdexaddaleax
authored andcommitted
child_process: reduce nextTick() usage
PR-URL: #13459 Reviewed-By: Refael Ackermann <[email protected]> Reviewed-By: Ben Noordhuis <[email protected]>
1 parent d1fa59f commit 0ca4bd1

File tree

2 files changed

+102
-21
lines changed

2 files changed

+102
-21
lines changed

benchmark/cluster/echo.js

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
'use strict';
2+
3+
const cluster = require('cluster');
4+
if (cluster.isMaster) {
5+
const common = require('../common.js');
6+
const bench = common.createBenchmark(main, {
7+
workers: [1],
8+
payload: ['string', 'object'],
9+
sendsPerBroadcast: [1, 10],
10+
n: [1e5]
11+
});
12+
13+
function main(conf) {
14+
var n = +conf.n;
15+
var workers = +conf.workers;
16+
var sends = +conf.sendsPerBroadcast;
17+
var expectedPerBroadcast = sends * workers;
18+
var payload;
19+
var readies = 0;
20+
var broadcasts = 0;
21+
var msgCount = 0;
22+
23+
switch (conf.payload) {
24+
case 'string':
25+
payload = 'hello world!';
26+
break;
27+
case 'object':
28+
payload = { action: 'pewpewpew', powerLevel: 9001 };
29+
break;
30+
default:
31+
throw new Error('Unsupported payload type');
32+
}
33+
34+
for (var i = 0; i < workers; ++i)
35+
cluster.fork().on('online', onOnline).on('message', onMessage);
36+
37+
function onOnline(msg) {
38+
if (++readies === workers) {
39+
bench.start();
40+
broadcast();
41+
}
42+
}
43+
44+
function broadcast() {
45+
var id;
46+
if (broadcasts++ === n) {
47+
bench.end(n);
48+
for (id in cluster.workers)
49+
cluster.workers[id].disconnect();
50+
return;
51+
}
52+
for (id in cluster.workers) {
53+
const worker = cluster.workers[id];
54+
for (var i = 0; i < sends; ++i)
55+
worker.send(payload);
56+
}
57+
}
58+
59+
function onMessage(msg) {
60+
if (++msgCount === expectedPerBroadcast) {
61+
msgCount = 0;
62+
broadcast();
63+
}
64+
}
65+
}
66+
} else {
67+
process.on('message', function(msg) {
68+
process.send(msg);
69+
});
70+
}

lib/internal/child_process.js

+32-21
Original file line numberDiff line numberDiff line change
@@ -456,16 +456,22 @@ function setupChannel(target, channel) {
456456
}
457457
chunks[0] = jsonBuffer + chunks[0];
458458

459+
var nextTick = false;
459460
for (var i = 0; i < numCompleteChunks; i++) {
460461
var message = JSON.parse(chunks[i]);
461462

462463
// There will be at most one NODE_HANDLE message in every chunk we
463464
// read because SCM_RIGHTS messages don't get coalesced. Make sure
464465
// that we deliver the handle with the right message however.
465-
if (message && message.cmd === 'NODE_HANDLE')
466-
handleMessage(target, message, recvHandle);
467-
else
468-
handleMessage(target, message, undefined);
466+
if (isInternal(message)) {
467+
if (message.cmd === 'NODE_HANDLE')
468+
handleMessage(message, recvHandle, true, false);
469+
else
470+
handleMessage(message, undefined, true, false);
471+
} else {
472+
handleMessage(message, undefined, false, nextTick);
473+
nextTick = true;
474+
}
469475
}
470476
jsonBuffer = incompleteChunk;
471477
this.buffering = jsonBuffer.length !== 0;
@@ -526,7 +532,7 @@ function setupChannel(target, channel) {
526532

527533
// Convert handle object
528534
obj.got.call(this, message, handle, function(handle) {
529-
handleMessage(target, message.msg, handle);
535+
handleMessage(message.msg, handle, isInternal(message.msg), false);
530536
});
531537
});
532538

@@ -732,27 +738,32 @@ function setupChannel(target, channel) {
732738
process.nextTick(finish);
733739
};
734740

741+
function emit(event, message, handle) {
742+
target.emit(event, message, handle);
743+
}
744+
745+
function handleMessage(message, handle, internal, nextTick) {
746+
if (!target.channel)
747+
return;
748+
749+
var eventName = (internal ? 'internalMessage' : 'message');
750+
if (nextTick)
751+
process.nextTick(emit, eventName, message, handle);
752+
else
753+
target.emit(eventName, message, handle);
754+
}
755+
735756
channel.readStart();
736757
return control;
737758
}
738759

739-
740760
const INTERNAL_PREFIX = 'NODE_';
741-
function handleMessage(target, message, handle) {
742-
if (!target.channel)
743-
return;
744-
745-
var eventName = 'message';
746-
if (message !== null &&
747-
typeof message === 'object' &&
748-
typeof message.cmd === 'string' &&
749-
message.cmd.length > INTERNAL_PREFIX.length &&
750-
message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX) {
751-
eventName = 'internalMessage';
752-
}
753-
process.nextTick(() => {
754-
target.emit(eventName, message, handle);
755-
});
761+
function isInternal(message) {
762+
return (message !== null &&
763+
typeof message === 'object' &&
764+
typeof message.cmd === 'string' &&
765+
message.cmd.length > INTERNAL_PREFIX.length &&
766+
message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX);
756767
}
757768

758769
function nop() { }

0 commit comments

Comments
 (0)