Skip to content
This repository was archived by the owner on Apr 22, 2023. It is now read-only.

Commit 6c383c6

Browse files
committed
cluster: add graceful disconnect support
This patch add a worker.disconnect() method there will stop the worker from accepting new connections and then stop the IPC. This allow the worker to die graceful. When the IPC has been disconnected a 'disconnect' event will emit. The patch also add a cluster.disconnect() method, this will call worker.disconnect() on all connected workers. When the workers are disconneted it will then close all server handlers. This allow the cluster itself to self terminate in a graceful way.
1 parent 36761b2 commit 6c383c6

File tree

4 files changed

+426
-15
lines changed

4 files changed

+426
-15
lines changed

doc/api/cluster.markdown

+87-3
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,21 @@ where the 'listening' event is emitted.
118118
console.log("We are now connected");
119119
});
120120

121+
## Event: 'disconnect'
122+
123+
* `worker` {Worker object}
124+
125+
When a workers IPC channel has disconnected this event is emitted. This will happen
126+
when the worker die, usually after calling `.destroy()`.
127+
128+
But also when calling `.disconnect()`, in this case it is possible there is delay
129+
between the `disconnect` and `death` and the event can be used to detect if the
130+
process is stuck in a cleanup or if there are long living connection.
131+
132+
cluster.on('disconnect', function(worker) {
133+
console.log('The worker #' + worker.uniqueID + ' has disconnected');
134+
});
135+
121136
## Event: 'death'
122137

123138
* `worker` {Worker object}
@@ -179,6 +194,16 @@ Spawn a new worker process. This can only be called from the master process.
179194
All settings set by the `.setupMaster` is stored in this settings object.
180195
This object is not supposed to be change or set manually.
181196

197+
## cluster.disconnect([callback])
198+
199+
* `callback` {Function} called when all workers are disconnected and handlers are closed
200+
201+
When calling this method all workers will commit a graceful suicide. When they are
202+
disconnected all internal handlers will be closed, allowing the master process to
203+
die graceful if no other event is waiting.
204+
205+
The method takes an optional callback argument there will be called when finished.
206+
182207
## cluster.workers
183208

184209
* {Object}
@@ -232,9 +257,8 @@ See: [Child Process module](child_process.html)
232257

233258
* {Boolean}
234259

235-
This property is a boolean. It is set when a worker dies, until then it is
236-
`undefined`. It is true if the worker was killed using the `.destroy()`
237-
method, and false otherwise.
260+
This property is a boolean. It is set when a worker dies after calling `.destroy()`
261+
or immediately after calling the `.disconnect()` method. Until then it is `undefined`.
238262

239263
### worker.send(message, [sendHandle])
240264

@@ -273,6 +297,55 @@ a suicide boolean is set to true.
273297
// destroy worker
274298
worker.destroy();
275299

300+
301+
## Worker.disconnect()
302+
303+
When calling this function the worker will no longer accept new connections, but
304+
they will be handled by any other listening worker. Existing connection will be
305+
allowed to exit as usual. When no more connections exist, the IPC channel to the worker
306+
will close allowing it to die graceful. When the IPC channel is closed the `disconnect`
307+
event will emit, this is then followed by the `death` event, there is emitted when
308+
the worker finally die.
309+
310+
Because there might be long living connections, it is useful to implement a timeout.
311+
This example ask the worker to disconnect and after 2 seconds it will destroy the
312+
server. An alternative wound be to execute `worker.destroy()` after 2 seconds, but
313+
that would normally not allow the worker to do any cleanup if needed.
314+
315+
if (cluster.isMaster) {
316+
var worker = cluser.fork();
317+
var timeout;
318+
319+
worker.on('listening', function () {
320+
worker.disconnect();
321+
timeout = setTimeout(function () {
322+
worker.send('force kill');
323+
}, 2000);
324+
});
325+
326+
worker.on('disconnect', function () {
327+
clearTimeout(timeout);
328+
});
329+
330+
} else if (cluster.isWorker) {
331+
var net = require('net');
332+
var server = net.createServer(function (socket) {
333+
// connection never end
334+
});
335+
336+
server.listen(8000);
337+
338+
server.on('close', function () {
339+
// cleanup
340+
});
341+
342+
process.on('message', function (msg) {
343+
if (msg === 'force kill') {
344+
server.destroy();
345+
}
346+
});
347+
}
348+
276349
### Event: 'message'
277350

278351
* `message` {Object}
@@ -342,6 +415,17 @@ on the specified worker.
342415
// Worker is listening
343416
};
344417

418+
## Event: 'disconnect'
419+
420+
* `worker` {Worker object}
421+
422+
Same as the `cluster.on('disconnect')` event, but emits only when the state change
423+
on the specified worker.
424+
425+
cluster.fork().on('disconnect', function (worker) {
426+
// Worker has disconnected
427+
};
428+
345429
## Event: 'death'
346430

347431
* `worker` {Worker object}

lib/cluster.js

+107-12
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ function eachWorker(cb) {
7777
}
7878
}
7979

80+
// Extremely simple progress tracker
81+
function ProgressTracker(missing, callback) {
82+
this.missing = missing;
83+
this.callback = callback;
84+
}
85+
ProgressTracker.prototype.done = function() {
86+
this.missing -= 1;
87+
this.check();
88+
};
89+
ProgressTracker.prototype.check = function() {
90+
if (this.missing === 0) this.callback();
91+
};
92+
8093
cluster.setupMaster = function(options) {
8194
// This can only be called from the master.
8295
assert(cluster.isMaster);
@@ -238,7 +251,10 @@ if (cluster.isMaster) {
238251
// Messages to a worker will be handled using this methods
239252
else if (cluster.isWorker) {
240253

241-
// TODO: the disconnect step will use this
254+
// Handle worker.disconnect from master
255+
messageHandingObject.disconnect = function(message, worker) {
256+
worker.disconnect();
257+
};
242258
}
243259

244260
function toDecInt(value) {
@@ -291,9 +307,11 @@ function Worker(customEnv) {
291307
});
292308
}
293309

294-
// handle internalMessage and exit event
310+
// handle internalMessage, exit and disconnect event
295311
this.process.on('internalMessage', handleMessage.bind(null, this));
296312
this.process.on('exit', prepareDeath.bind(null, this, 'dead', 'death'));
313+
this.process.on('disconnect',
314+
prepareDeath.bind(null, this, 'disconnected', 'disconnect'));
297315

298316
// relay message and error
299317
this.process.on('message', this.emit.bind(this, 'message'));
@@ -354,14 +372,6 @@ Worker.prototype.send = function() {
354372
this.process.send.apply(this.process, arguments);
355373
};
356374

357-
358-
function closeWorkerChannel(worker, callback) {
359-
//Apparently the .close method is async, but do not have a callback
360-
worker.process._channel.close();
361-
worker.process._channel = null;
362-
process.nextTick(callback);
363-
}
364-
365375
// Kill the worker without restarting
366376
Worker.prototype.destroy = function() {
367377
var self = this;
@@ -371,9 +381,14 @@ Worker.prototype.destroy = function() {
371381
if (cluster.isMaster) {
372382
// Disconnect IPC channel
373383
// this way the worker won't need to propagate suicide state to master
374-
closeWorkerChannel(this, function() {
384+
if (self.process.connected) {
385+
self.process.once('disconnect', function() {
386+
self.process.kill();
387+
});
388+
self.process.disconnect();
389+
} else {
375390
self.process.kill();
376-
});
391+
}
377392

378393
} else {
379394
// Channel is open
@@ -401,6 +416,59 @@ Worker.prototype.destroy = function() {
401416
}
402417
};
403418

419+
// The .disconnect function will close all server and then disconnect
420+
// the IPC channel.
421+
if (cluster.isMaster) {
422+
// Used in master
423+
Worker.prototype.disconnect = function() {
424+
this.suicide = true;
425+
426+
sendInternalMessage(this, {cmd: 'disconnect'});
427+
};
428+
429+
} else {
430+
// Used in workers
431+
Worker.prototype.disconnect = function() {
432+
var self = this;
433+
434+
this.suicide = true;
435+
436+
// keep track of open servers
437+
var servers = Object.keys(serverLisenters).length;
438+
var progress = new ProgressTracker(servers, function() {
439+
// there are no more servers open so we will close the IPC channel.
440+
// Closeing the IPC channel will emit emit a disconnect event
441+
// in both master and worker on the process object.
442+
// This event will be handled by prepearDeath.
443+
self.process.disconnect();
444+
});
445+
446+
// depending on where this function was called from (master or worker)
447+
// the suicide state has allready been set.
448+
// But it dosn't really matter if we set it again.
449+
sendInternalMessage(this, {cmd: 'suicide'}, function() {
450+
// in case there are no servers
451+
progress.check();
452+
453+
// closeing all servers graceful
454+
var server;
455+
for (var key in serverLisenters) {
456+
server = serverLisenters[key];
457+
458+
// in case the server is closed we wont close it again
459+
if (server._handle === null) {
460+
progress.done();
461+
continue;
462+
}
463+
464+
server.on('close', progress.done.bind(progress));
465+
server.close();
466+
}
467+
});
468+
469+
};
470+
}
471+
404472
// Fork a new worker
405473
cluster.fork = function(env) {
406474
// This can only be called from the master.
@@ -412,6 +480,33 @@ cluster.fork = function(env) {
412480
return (new cluster.Worker(env));
413481
};
414482

483+
// execute .disconnect on all workers and close handlers when done
484+
cluster.disconnect = function(callback) {
485+
// This can only be called from the master.
486+
assert(cluster.isMaster);
487+
488+
// Close all TCP handlers when all workers are disconnected
489+
var workers = Object.keys(cluster.workers).length;
490+
var progress = new ProgressTracker(workers, function() {
491+
for (var key in serverHandlers) {
492+
serverHandlers[key].close();
493+
delete serverHandlers[key];
494+
}
495+
496+
// call callback when done
497+
if (callback) callback();
498+
});
499+
500+
// begin disconnecting all workers
501+
eachWorker(function(worker) {
502+
worker.once('disconnect', progress.done.bind(progress));
503+
worker.disconnect();
504+
});
505+
506+
// in case there wasn't any workers
507+
progress.check();
508+
};
509+
415510
// Sync way to quickly kill all cluster workers
416511
// However the workers may not die instantly
417512
function quickDestroyCluster() {

0 commit comments

Comments
 (0)