Skip to content

Commit a80b8de

Browse files
committed
cluster: send connection to other server when worker drop it
1 parent 2fe4e94 commit a80b8de

File tree

2 files changed

+67
-1
lines changed

2 files changed

+67
-1
lines changed

lib/internal/cluster/child.js

+8-1
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,15 @@ function rr(message, { indexesKey, index }, cb) {
204204
function onconnection(message, handle) {
205205
const key = message.key;
206206
const server = handles.get(key);
207-
const accepted = server !== undefined;
207+
let accepted = server !== undefined;
208208

209+
if (accepted && server[owner_symbol]) {
210+
const self = server[owner_symbol];
211+
if (self.maxConnections && self._connections >= self.maxConnections) {
212+
accepted = false;
213+
}
214+
}
215+
209216
send({ ack: message.seq, accepted });
210217

211218
if (accepted)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const net = require('net');
5+
const cluster = require('cluster');
6+
const tmpdir = require('../common/tmpdir');
7+
8+
let connectionCount = 0;
9+
let listenCount = 0;
10+
let worker1;
11+
let worker2;
12+
13+
function request(path) {
14+
for (let i = 0; i < 10; i++) {
15+
net.connect(path);
16+
}
17+
}
18+
19+
function handleMessage(message) {
20+
assert.match(message.action, /listen|connection/);
21+
if (message.action === 'listen') {
22+
if (++listenCount === 2) {
23+
request(common.PIPE);
24+
}
25+
} else if (message.action === 'connection') {
26+
if (++connectionCount === 10) {
27+
worker1.send({ action: 'disconnect' });
28+
worker2.send({ action: 'disconnect' });
29+
}
30+
}
31+
}
32+
33+
if (cluster.isPrimary) {
34+
cluster.schedulingPolicy = cluster.SCHED_RR;
35+
tmpdir.refresh();
36+
worker1 = cluster.fork({ maxConnections: 1, pipePath: common.PIPE });
37+
worker2 = cluster.fork({ maxConnections: 9, pipePath: common.PIPE });
38+
worker1.on('message', common.mustCall((message) => {
39+
handleMessage(message);
40+
}, 2));
41+
worker2.on('message', common.mustCall((message) => {
42+
handleMessage(message);
43+
}, 10));
44+
} else {
45+
const server = net.createServer(common.mustCall((socket) => {
46+
process.send({ action: 'connection' });
47+
}, +process.env.maxConnections));
48+
49+
server.listen(process.env.pipePath, common.mustCall(() => {
50+
process.send({ action: 'listen' });
51+
}));
52+
53+
server.maxConnections = +process.env.maxConnections;
54+
55+
process.on('message', common.mustCall((message) => {
56+
assert.strictEqual(message.action, 'disconnect');
57+
process.disconnect();
58+
}));
59+
}

0 commit comments

Comments
 (0)