Skip to content

Commit c5edeb9

Browse files
ronagtargos
authored andcommitted
http: simplify drain()
Simplify and slightly optimize draining outgoing http streams. Avoid extra event listener and inline with rest of the drain logic. PR-URL: #29081 Reviewed-By: Matteo Collina <[email protected]>
1 parent 6bafd35 commit c5edeb9

File tree

5 files changed

+26
-27
lines changed

5 files changed

+26
-27
lines changed

lib/_http_client.js

+10-5
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ const {
3030
_checkIsHttpToken: checkIsHttpToken,
3131
debug,
3232
freeParser,
33-
httpSocketSetup,
3433
parsers,
3534
HTTPParser,
3635
prepareError,
@@ -40,7 +39,7 @@ const Agent = require('_http_agent');
4039
const { Buffer } = require('buffer');
4140
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
4241
const { URL, urlToOptions, searchParamsSymbol } = require('internal/url');
43-
const { kOutHeaders, ondrain } = require('internal/http');
42+
const { kOutHeaders, kNeedDrain } = require('internal/http');
4443
const { connResetException, codes } = require('internal/errors');
4544
const {
4645
ERR_HTTP_HEADERS_SENT,
@@ -335,6 +334,14 @@ function emitAbortNT() {
335334
this.emit('abort');
336335
}
337336

337+
function ondrain() {
338+
const msg = this._httpMessage;
339+
if (msg && !msg.finished && msg[kNeedDrain]) {
340+
msg[kNeedDrain] = false;
341+
msg.emit('drain');
342+
}
343+
}
344+
338345
function socketCloseListener() {
339346
const socket = this;
340347
const req = socket._httpMessage;
@@ -649,9 +656,6 @@ function tickOnSocket(req, socket) {
649656
socket.parser = parser;
650657
socket._httpMessage = req;
651658

652-
// Setup "drain" propagation.
653-
httpSocketSetup(socket);
654-
655659
// Propagate headers limit from request object to parser
656660
if (typeof req.maxHeadersCount === 'number') {
657661
parser.maxHeaderPairs = req.maxHeadersCount << 1;
@@ -663,6 +667,7 @@ function tickOnSocket(req, socket) {
663667
socket.on('data', socketOnData);
664668
socket.on('end', socketOnEnd);
665669
socket.on('close', socketCloseListener);
670+
socket.on('drain', ondrain);
666671

667672
if (
668673
req.timeout !== undefined ||

lib/_http_common.js

-8
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ const { methods, HTTPParser } =
3131
internalBinding('http_parser') : internalBinding('http_parser_llhttp');
3232

3333
const FreeList = require('internal/freelist');
34-
const { ondrain } = require('internal/http');
3534
const incoming = require('_http_incoming');
3635
const {
3736
IncomingMessage,
@@ -201,12 +200,6 @@ function freeParser(parser, req, socket) {
201200
}
202201
}
203202

204-
205-
function httpSocketSetup(socket) {
206-
socket.removeListener('drain', ondrain);
207-
socket.on('drain', ondrain);
208-
}
209-
210203
const tokenRegExp = /^[\^_`a-zA-Z\-0-9!#$%&'*+.|~]+$/;
211204
/**
212205
* Verifies that the given val is a valid HTTP token
@@ -253,7 +246,6 @@ module.exports = {
253246
CRLF: '\r\n',
254247
debug,
255248
freeParser,
256-
httpSocketSetup,
257249
methods,
258250
parsers,
259251
kIncomingMessage,

lib/_http_outgoing.js

+8-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const { getDefaultHighWaterMark } = require('internal/streams/state');
2727
const assert = require('internal/assert');
2828
const Stream = require('stream');
2929
const internalUtil = require('internal/util');
30-
const { kOutHeaders, utcDate } = require('internal/http');
30+
const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
3131
const { Buffer } = require('buffer');
3232
const common = require('_http_common');
3333
const checkIsHttpToken = common._checkIsHttpToken;
@@ -96,6 +96,7 @@ function OutgoingMessage() {
9696
this._contentLength = null;
9797
this._hasBody = true;
9898
this._trailer = '';
99+
this[kNeedDrain] = false;
99100

100101
this.finished = false;
101102
this._headerSent = false;
@@ -582,7 +583,10 @@ Object.defineProperty(OutgoingMessage.prototype, 'writableEnded', {
582583

583584
const crlf_buf = Buffer.from('\r\n');
584585
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
585-
return write_(this, chunk, encoding, callback, false);
586+
const ret = write_(this, chunk, encoding, callback, false);
587+
if (!ret)
588+
this[kNeedDrain] = true;
589+
return ret;
586590
};
587591

588592
function write_(msg, chunk, encoding, callback, fromEnd) {
@@ -782,8 +786,8 @@ OutgoingMessage.prototype._flush = function _flush() {
782786
if (this.finished) {
783787
// This is a queue to the server or client to bring in the next this.
784788
this._finish();
785-
} else if (ret) {
786-
// This is necessary to prevent https from breaking
789+
} else if (ret && this[kNeedDrain]) {
790+
this[kNeedDrain] = false;
787791
this.emit('drain');
788792
}
789793
}

lib/_http_server.js

+7-5
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ const {
3232
CRLF,
3333
continueExpression,
3434
chunkExpression,
35-
httpSocketSetup,
3635
kIncomingMessage,
3736
HTTPParser,
3837
_checkInvalidHeaderChar: checkInvalidHeaderChar,
@@ -41,7 +40,7 @@ const {
4140
const { OutgoingMessage } = require('_http_outgoing');
4241
const {
4342
kOutHeaders,
44-
ondrain,
43+
kNeedDrain,
4544
nowDate,
4645
emitStatistics
4746
} = require('internal/http');
@@ -363,8 +362,6 @@ function connectionListener(socket) {
363362
function connectionListenerInternal(server, socket) {
364363
debug('SERVER new http connection');
365364

366-
httpSocketSetup(socket);
367-
368365
// Ensure that the server property of the socket is correctly set.
369366
// See https://github.com/nodejs/node/issues/13435
370367
if (socket.server === null)
@@ -459,6 +456,12 @@ function socketOnDrain(socket, state) {
459456
socket.parser.resume();
460457
socket.resume();
461458
}
459+
460+
const msg = socket._httpMessage;
461+
if (msg && !msg.finished && msg[kNeedDrain]) {
462+
msg[kNeedDrain] = false;
463+
msg.emit('drain');
464+
}
462465
}
463466

464467
function socketOnTimeout() {
@@ -585,7 +588,6 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
585588
socket.removeListener('end', state.onEnd);
586589
socket.removeListener('close', state.onClose);
587590
socket.removeListener('drain', state.onDrain);
588-
socket.removeListener('drain', ondrain);
589591
socket.removeListener('error', socketOnError);
590592
unconsume(parser, socket);
591593
parser.finish();

lib/internal/http.js

+1-5
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ function resetCache() {
2828
utcCache = undefined;
2929
}
3030

31-
function ondrain() {
32-
if (this._httpMessage) this._httpMessage.emit('drain');
33-
}
34-
3531
class HttpRequestTiming extends PerformanceEntry {
3632
constructor(statistics) {
3733
super();
@@ -50,7 +46,7 @@ function emitStatistics(statistics) {
5046

5147
module.exports = {
5248
kOutHeaders: Symbol('kOutHeaders'),
53-
ondrain,
49+
kNeedDrain: Symbol('kNeedDrain'),
5450
nowDate,
5551
utcDate,
5652
emitStatistics

0 commit comments

Comments
 (0)