Skip to content

Commit c1200da

Browse files
committed
dgram: add maxSendQueueSize socket option
This commit adds the maxSendQueueSize option to dgram sockets. When used, this option prevents the socket's send queue from growing without an upper bound.
1 parent 58cb9cd commit c1200da

File tree

3 files changed

+85
-4
lines changed

3 files changed

+85
-4
lines changed

doc/api/dgram.md

+9-2
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,8 @@ added: v0.11.13
431431
* Returns: {dgram.Socket}
432432

433433
Creates a `dgram.Socket` object. The `options` argument is an object that
434-
should contain a `type` field of either `udp4` or `udp6` and an optional
435-
boolean `reuseAddr` field.
434+
should contain a `type` field of either `udp4` or `udp6`. `options` may also
435+
contain optional `reuseAddr` and `maxSendQueueSize` fields.
436436

437437
When `reuseAddr` is `true` [`socket.bind()`][] will reuse the address, even if
438438
another process has already bound a socket on it. `reuseAddr` defaults to
@@ -446,6 +446,13 @@ interfaces" address on a random port (it does the right thing for both `udp4`
446446
and `udp6` sockets). The bound address and port can be retrieved using
447447
[`socket.address().address`][] and [`socket.address().port`][].
448448

449+
It is possible to begin sending data from a socket that has not been bound. In
450+
this situation, the socket will be bound automatically before sending the data.
451+
Send operations are stored in a queue until the socket binds. `maxSendQueueSize`
452+
is a number that places an upper bound on the number of send operations that can
453+
be queued. If the queue size is exceeded, an error is emitted.
454+
`maxSendQueueSize` defaults to `Infinity`.
455+
449456
### dgram.createSocket(type[, callback])
450457
<!-- YAML
451458
added: v0.1.99

lib/dgram.js

+18-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ const BIND_STATE_UNBOUND = 0;
1313
const BIND_STATE_BINDING = 1;
1414
const BIND_STATE_BOUND = 2;
1515

16+
const maxSendQueueSize = Symbol('maxSendQueueSize');
17+
1618
// lazily loaded
1719
var cluster = null;
1820
var dns = null;
@@ -76,11 +78,15 @@ exports._createSocketHandle = function(address, port, addressType, fd, flags) {
7678

7779

7880
function Socket(type, listener) {
81+
var options;
82+
7983
EventEmitter.call(this);
8084

8185
if (type !== null && typeof type === 'object') {
82-
var options = type;
86+
options = type;
8387
type = options.type;
88+
} else {
89+
options = {};
8490
}
8591

8692
var handle = newHandle(type);
@@ -89,11 +95,15 @@ function Socket(type, listener) {
8995
this._handle = handle;
9096
this._receiving = false;
9197
this._bindState = BIND_STATE_UNBOUND;
98+
this._queue = undefined;
9299
this.type = type;
93100
this.fd = null; // compatibility hack
94101

95102
// If true - UV_UDP_REUSEADDR flag will be set
96-
this._reuseAddr = options && options.reuseAddr;
103+
this._reuseAddr = !!options.reuseAddr;
104+
105+
this[maxSendQueueSize] = typeof options.maxSendQueueSize === 'number' ?
106+
options.maxSendQueueSize : Infinity;
97107

98108
if (typeof listener === 'function')
99109
this.on('message', listener);
@@ -284,6 +294,12 @@ function enqueue(self, toEnqueue) {
284294
self._queue = [];
285295
self.once('listening', clearQueue);
286296
}
297+
298+
if (self._queue.length >= self[maxSendQueueSize]) {
299+
self.emit('error', new Error('Maximum send queue size exceeded'));
300+
return;
301+
}
302+
287303
self._queue.push(toEnqueue);
288304
return;
289305
}
+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const dgram = require('dgram');
5+
const dns = require('dns');
6+
const maxSendQueueSize = 10;
7+
8+
// Monkey patch dns.lookup() so that it always fails.
9+
dns.lookup = function(address, family, callback) {
10+
callback(new Error('fake DNS'));
11+
};
12+
13+
// Verify that maxSendQueueSize is respected.
14+
{
15+
const socket = dgram.createSocket({ type: 'udp4', maxSendQueueSize });
16+
let queueErrors = 0;
17+
18+
process.on('exit', () => {
19+
assert.strictEqual(queueErrors, 1);
20+
});
21+
22+
socket.on('error', (err) => {
23+
// Ignore DNS errors.
24+
if (/^Error: fake DNS$/.test(err))
25+
return;
26+
27+
if (/^Error: Maximum send queue size exceeded$/.test(err)) {
28+
queueErrors++;
29+
return;
30+
}
31+
32+
common.fail(`Unexpected error: ${err}`);
33+
});
34+
35+
// Fill the send queue.
36+
for (let i = 0; i < maxSendQueueSize; ++i)
37+
socket.send('foobar', common.PORT, 'localhost');
38+
39+
// Pause to make sure nothing leaves the queue.
40+
setImmediate(() => {
41+
assert.strictEqual(socket._queue.length, maxSendQueueSize);
42+
socket.send('foobar', common.PORT, 'localhost');
43+
assert.strictEqual(socket._queue.length, maxSendQueueSize);
44+
});
45+
}
46+
47+
// Verify the default behavior when no maxSendQueueSize is specified.
48+
{
49+
const socket = dgram.createSocket({ type: 'udp4' });
50+
51+
// Only fake DNS errors should be seen.
52+
socket.on('error', (err) => { assert(/^Error: fake DNS$/.test(err)); });
53+
54+
for (let i = 0; i < maxSendQueueSize * 2; ++i)
55+
socket.send('foobar', common.PORT, 'localhost');
56+
57+
assert.strictEqual(socket._queue.length, maxSendQueueSize * 2);
58+
}

0 commit comments

Comments
 (0)