Skip to content

Commit 4571c84

Browse files
mcollinaMyles Borins
authored and
Myles Borins
committed
dgram: generalized send queue to handle close
If the udp socket is not ready and we are accumulating messages to send, it needs to delay closing the socket when all messages are flushed. Fixes: #7061 PR-URL: #7066 Reviewed-By: Anna Henningsen <[email protected]>
1 parent 02e6c84 commit 4571c84

File tree

2 files changed

+57
-13
lines changed

2 files changed

+57
-13
lines changed

lib/dgram.js

+39-13
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,32 @@ Socket.prototype.sendto = function(buffer,
243243
};
244244

245245

246+
function enqueue(self, toEnqueue) {
247+
// If the send queue hasn't been initialized yet, do it, and install an
248+
// event handler that flushes the send queue after binding is done.
249+
if (!self._queue) {
250+
self._queue = [];
251+
self.once('listening', clearQueue);
252+
}
253+
self._queue.push(toEnqueue);
254+
return;
255+
}
256+
257+
258+
function clearQueue() {
259+
const queue = this._queue;
260+
this._queue = undefined;
261+
262+
// Flush the send queue.
263+
for (var i = 0; i < queue.length; i++)
264+
queue[i]();
265+
}
266+
267+
268+
// valid combinations
269+
// send(buffer, offset, length, port, address, callback)
270+
// send(buffer, offset, length, port, address)
271+
// send(buffer, offset, length, port)
246272
Socket.prototype.send = function(buffer,
247273
offset,
248274
length,
@@ -290,18 +316,13 @@ Socket.prototype.send = function(buffer,
290316
// If the socket hasn't been bound yet, push the outbound packet onto the
291317
// send queue and send after binding is complete.
292318
if (self._bindState != BIND_STATE_BOUND) {
293-
// If the send queue hasn't been initialized yet, do it, and install an
294-
// event handler that flushes the send queue after binding is done.
295-
if (!self._sendQueue) {
296-
self._sendQueue = [];
297-
self.once('listening', function() {
298-
// Flush the send queue.
299-
for (var i = 0; i < self._sendQueue.length; i++)
300-
self.send.apply(self, self._sendQueue[i]);
301-
self._sendQueue = undefined;
302-
});
303-
}
304-
self._sendQueue.push([buffer, offset, length, port, address, callback]);
319+
enqueue(self, self.send.bind(self,
320+
buffer,
321+
offset,
322+
length,
323+
port,
324+
address,
325+
callback));
305326
return;
306327
}
307328

@@ -347,10 +368,15 @@ function afterSend(err) {
347368
this.callback(err, this.length);
348369
}
349370

350-
351371
Socket.prototype.close = function(callback) {
352372
if (typeof callback === 'function')
353373
this.on('close', callback);
374+
375+
if (this._queue) {
376+
this._queue.push(this.close.bind(this));
377+
return this;
378+
}
379+
354380
this._healthCheck();
355381
this._stopReceiving();
356382
this._handle.close();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
'use strict';
2+
// Ensure that if a dgram socket is closed before the sendQueue is drained
3+
// will not crash
4+
5+
const common = require('../common');
6+
const dgram = require('dgram');
7+
8+
const buf = Buffer.alloc(1024, 42);
9+
10+
const socket = dgram.createSocket('udp4');
11+
12+
socket.on('listening', function() {
13+
socket.close();
14+
});
15+
16+
// adds a listener to 'listening' to send the data when
17+
// the socket is available
18+
socket.send(buf, 0, buf.length, common.PORT, 'localhost');

0 commit comments

Comments
 (0)