Skip to content

Commit fb7a491

Browse files
indutnyrvagg
authored andcommitted
http_server: pause socket properly
Account pending response data to decide whether pause the socket or not. Writable stream state is a not reliable measure, because it just says how much data is pending on a **current** request, thus not helping much with problem we are trying to solve here. PR-URL: #3128
1 parent 99943e1 commit fb7a491

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

lib/_http_outgoing.js

+12
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ function OutgoingMessage() {
4949
this.output = [];
5050
this.outputEncodings = [];
5151
this.outputCallbacks = [];
52+
this.outputSize = 0;
5253

5354
this.writable = true;
5455

@@ -71,6 +72,8 @@ function OutgoingMessage() {
7172
this._header = null;
7273
this._headers = null;
7374
this._headerNames = {};
75+
76+
this._onPendingData = null;
7477
}
7578
util.inherits(OutgoingMessage, Stream);
7679

@@ -120,6 +123,9 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
120123
this.output.unshift(this._header);
121124
this.outputEncodings.unshift('binary');
122125
this.outputCallbacks.unshift(null);
126+
this.outputSize += this._header.length;
127+
if (this._onPendingData !== null)
128+
this._onPendingData(this._header.length);
123129
}
124130
this._headerSent = true;
125131
}
@@ -152,6 +158,9 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
152158
this.output = [];
153159
this.outputEncodings = [];
154160
this.outputCallbacks = [];
161+
if (this._onPendingData !== null)
162+
this._onPendingData(-this.outputSize);
163+
this.outputSize = 0;
155164
} else if (data.length === 0) {
156165
if (typeof callback === 'function')
157166
process.nextTick(callback);
@@ -175,6 +184,9 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
175184
this.output.push(data);
176185
this.outputEncodings.push(encoding);
177186
this.outputCallbacks.push(callback);
187+
this.outputSize += data.length;
188+
if (this._onPendingData !== null)
189+
this._onPendingData(data.length);
178190
return false;
179191
};
180192

lib/_http_server.js

+15-2
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ function Server(requestListener) {
241241
});
242242

243243
this.timeout = 2 * 60 * 1000;
244+
245+
this._pendingResponseData = 0;
244246
}
245247
util.inherits(Server, net.Server);
246248

@@ -260,6 +262,13 @@ function connectionListener(socket) {
260262
var self = this;
261263
var outgoing = [];
262264
var incoming = [];
265+
var outgoingData = 0;
266+
267+
function updateOutgoingData(delta) {
268+
outgoingData += delta;
269+
if (socket._paused && outgoingData < socket._writableState.highWaterMark)
270+
return socketOnDrain();
271+
}
263272

264273
function abortIncoming() {
265274
while (incoming.length) {
@@ -425,8 +434,10 @@ function connectionListener(socket) {
425434

426435
socket._paused = false;
427436
function socketOnDrain() {
437+
var needPause = outgoingData > socket._writableState.highWaterMark;
438+
428439
// If we previously paused, then start reading again.
429-
if (socket._paused) {
440+
if (socket._paused && !needPause) {
430441
socket._paused = false;
431442
socket.parser.resume();
432443
socket.resume();
@@ -440,7 +451,8 @@ function connectionListener(socket) {
440451
// so that we don't become overwhelmed by a flood of
441452
// pipelined requests that may never be resolved.
442453
if (!socket._paused) {
443-
var needPause = socket._writableState.needDrain;
454+
var needPause = socket._writableState.needDrain ||
455+
outgoingData >= socket._writableState.highWaterMark;
444456
if (needPause) {
445457
socket._paused = true;
446458
// We also need to pause the parser, but don't do that until after
@@ -451,6 +463,7 @@ function connectionListener(socket) {
451463
}
452464

453465
var res = new ServerResponse(req);
466+
res._onPendingData = updateOutgoingData;
454467

455468
res.shouldKeepAlive = shouldKeepAlive;
456469
DTRACE_HTTP_SERVER_REQUEST(req, socket);

0 commit comments

Comments
 (0)