Skip to content

Commit c60857a

Browse files
indutnyrvagg
authored andcommitted
cluster: allow shared reused dgram sockets
Allow listening on reused dgram ports in cluster workers. Fix: nodejs/node-v0.x-archive#9261 PR-URL: #2548 Reviewed-By: Ben Noordhuis <[email protected]>
1 parent a7596d7 commit c60857a

File tree

4 files changed

+87
-23
lines changed

4 files changed

+87
-23
lines changed

lib/cluster.js

+25-14
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Worker.prototype.isConnected = function isConnected() {
5757

5858
// Master/worker specific methods are defined in the *Init() functions.
5959

60-
function SharedHandle(key, address, port, addressType, backlog, fd) {
60+
function SharedHandle(key, address, port, addressType, backlog, fd, flags) {
6161
this.key = key;
6262
this.workers = [];
6363
this.handle = null;
@@ -66,7 +66,7 @@ function SharedHandle(key, address, port, addressType, backlog, fd) {
6666
// FIXME(bnoordhuis) Polymorphic return type for lack of a better solution.
6767
var rval;
6868
if (addressType === 'udp4' || addressType === 'udp6')
69-
rval = dgram._createSocketHandle(address, port, addressType, fd);
69+
rval = dgram._createSocketHandle(address, port, addressType, fd, flags);
7070
else
7171
rval = net._createServerHandle(address, port, addressType, fd);
7272

@@ -438,7 +438,8 @@ function masterInit() {
438438
var args = [message.address,
439439
message.port,
440440
message.addressType,
441-
message.fd];
441+
message.fd,
442+
message.index];
442443
var key = args.join(':');
443444
var handle = handles[key];
444445
if (handle === undefined) {
@@ -456,7 +457,8 @@ function masterInit() {
456457
message.port,
457458
message.addressType,
458459
message.backlog,
459-
message.fd);
460+
message.fd,
461+
message.flags);
460462
}
461463
if (!handle.data) handle.data = message.data;
462464

@@ -485,7 +487,7 @@ function masterInit() {
485487
cluster.emit('listening', worker, info);
486488
}
487489

488-
// Round-robin only. Server in worker is closing, remove from list.
490+
// Server in worker is closing, remove from list.
489491
function close(worker, message) {
490492
var key = message.key;
491493
var handle = handles[key];
@@ -500,6 +502,7 @@ function masterInit() {
500502

501503
function workerInit() {
502504
var handles = {};
505+
var indexes = {};
503506

504507
// Called from src/node.js
505508
cluster._setupWorker = function() {
@@ -528,15 +531,22 @@ function workerInit() {
528531
};
529532

530533
// obj is a net#Server or a dgram#Socket object.
531-
cluster._getServer = function(obj, address, port, addressType, fd, cb) {
532-
var message = {
533-
addressType: addressType,
534-
address: address,
535-
port: port,
534+
cluster._getServer = function(obj, options, cb) {
535+
const key = [ options.address,
536+
options.port,
537+
options.addressType,
538+
options.fd ].join(':');
539+
if (indexes[key] === undefined)
540+
indexes[key] = 0;
541+
else
542+
indexes[key]++;
543+
544+
const message = util._extend({
536545
act: 'queryServer',
537-
fd: fd,
546+
index: indexes[key],
538547
data: null
539-
};
548+
}, options);
549+
540550
// Set custom data on handle (i.e. tls tickets key)
541551
if (obj._getServerData) message.data = obj._getServerData();
542552
send(message, function(reply, handle) {
@@ -549,9 +559,9 @@ function workerInit() {
549559
});
550560
obj.once('listening', function() {
551561
cluster.worker.state = 'listening';
552-
var address = obj.address();
562+
const address = obj.address();
553563
message.act = 'listening';
554-
message.port = address && address.port || port;
564+
message.port = address && address.port || options.port;
555565
send(message);
556566
});
557567
};
@@ -563,6 +573,7 @@ function workerInit() {
563573
// closed. Avoids resource leaks when the handle is short-lived.
564574
var close = handle.close;
565575
handle.close = function() {
576+
send({ act: 'close', key: key });
566577
delete handles[key];
567578
return close.apply(this, arguments);
568579
};

lib/dgram.js

+15-8
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,14 @@ function newHandle(type) {
6060
}
6161

6262

63-
exports._createSocketHandle = function(address, port, addressType, fd) {
63+
exports._createSocketHandle = function(address, port, addressType, fd, flags) {
6464
// Opening an existing fd is not supported for UDP handles.
6565
assert(typeof fd !== 'number' || fd < 0);
6666

6767
var handle = newHandle(addressType);
6868

6969
if (port || address) {
70-
var err = handle.bind(address, port || 0, 0);
70+
var err = handle.bind(address, port || 0, flags);
7171
if (err) {
7272
handle.close();
7373
return err;
@@ -176,8 +176,12 @@ Socket.prototype.bind = function(port /*, address, callback*/) {
176176
if (!cluster)
177177
cluster = require('cluster');
178178

179+
var flags = 0;
180+
if (self._reuseAddr)
181+
flags |= constants.UV_UDP_REUSEADDR;
182+
179183
if (cluster.isWorker && !exclusive) {
180-
cluster._getServer(self, ip, port, self.type, -1, function(err, handle) {
184+
function onHandle(err, handle) {
181185
if (err) {
182186
var ex = exceptionWithHostPort(err, 'bind', ip, port);
183187
self.emit('error', ex);
@@ -191,16 +195,19 @@ Socket.prototype.bind = function(port /*, address, callback*/) {
191195

192196
replaceHandle(self, handle);
193197
startListening(self);
194-
});
198+
}
199+
cluster._getServer(self, {
200+
address: ip,
201+
port: port,
202+
addressType: self.type,
203+
fd: -1,
204+
flags: flags
205+
}, onHandle);
195206

196207
} else {
197208
if (!self._handle)
198209
return; // handle has been closed in the mean time
199210

200-
var flags = 0;
201-
if (self._reuseAddr)
202-
flags |= constants.UV_UDP_REUSEADDR;
203-
204211
var err = self._handle.bind(ip, port || 0, flags);
205212
if (err) {
206213
var ex = exceptionWithHostPort(err, 'bind', ip, port);

lib/net.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -1268,7 +1268,13 @@ function listen(self, address, port, addressType, backlog, fd, exclusive) {
12681268
return;
12691269
}
12701270

1271-
cluster._getServer(self, address, port, addressType, fd, cb);
1271+
cluster._getServer(self, {
1272+
address: address,
1273+
port: port,
1274+
addressType: addressType,
1275+
fd: fd,
1276+
flags: 0
1277+
}, cb);
12721278

12731279
function cb(err, handle) {
12741280
// EADDRINUSE may not be reported until we call listen(). To complicate
+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const cluster = require('cluster');
5+
const dgram = require('dgram');
6+
7+
if (common.isWindows) {
8+
console.log('1..0 # Skipped: dgram clustering is currently not supported ' +
9+
'on windows.');
10+
return;
11+
}
12+
13+
if (cluster.isMaster) {
14+
cluster.fork().on('exit', function(code) {
15+
assert.equal(code, 0);
16+
});
17+
return;
18+
}
19+
20+
const sockets = [];
21+
function next() {
22+
sockets.push(this);
23+
if (sockets.length !== 2)
24+
return;
25+
26+
// Work around health check issue
27+
process.nextTick(function() {
28+
for (var i = 0; i < sockets.length; i++)
29+
sockets[i].close(close);
30+
});
31+
}
32+
33+
var waiting = 2;
34+
function close() {
35+
if (--waiting === 0)
36+
cluster.worker.disconnect();
37+
}
38+
39+
for (var i = 0; i < 2; i++)
40+
dgram.createSocket({ type: 'udp4', reuseAddr: true }).bind(common.PORT, next);

0 commit comments

Comments
 (0)