Skip to content

Commit 0427cdf

Browse files
indutnyjasnell
authored andcommitted
http: fix stalled pipeline bug
This is a two-part fix: - Fix pending data notification in `OutgoingMessage` to notify server about flushed data too - Fix pause/resume behavior for the consumed socket. `resume` event is emitted on a next tick, and `socket._paused` can already be `true` at this time. Pause the socket again to avoid PAUSED error on parser. Fix: #3332 PR-URL: #3342 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Trevor Norris <[email protected]>
1 parent 4bac5d9 commit 0427cdf

5 files changed

+120
-47
lines changed

lib/_http_common.js

+4
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ var parsers = new FreeList('parsers', 1000, function() {
140140

141141
parser._headers = [];
142142
parser._url = '';
143+
parser._consumed = false;
143144

144145
// Only called in the slow case where slow means
145146
// that the request headers were either fragmented
@@ -167,6 +168,9 @@ function freeParser(parser, req, socket) {
167168
if (parser) {
168169
parser._headers = [];
169170
parser.onIncoming = null;
171+
if (parser._consumed)
172+
parser.unconsume();
173+
parser._consumed = false;
170174
if (parser.socket)
171175
parser.socket.parser = null;
172176
parser.socket = null;

lib/_http_outgoing.js

+31-35
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
131131
this.outputEncodings.unshift('binary');
132132
this.outputCallbacks.unshift(null);
133133
this.outputSize += this._header.length;
134-
if (this._onPendingData !== null)
134+
if (typeof this._onPendingData === 'function')
135135
this._onPendingData(this._header.length);
136136
}
137137
this._headerSent = true;
@@ -154,22 +154,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
154154
// There might be pending data in the this.output buffer.
155155
var outputLength = this.output.length;
156156
if (outputLength > 0) {
157-
var output = this.output;
158-
var outputEncodings = this.outputEncodings;
159-
var outputCallbacks = this.outputCallbacks;
160-
connection.cork();
161-
for (var i = 0; i < outputLength; i++) {
162-
connection.write(output[i], outputEncodings[i],
163-
outputCallbacks[i]);
164-
}
165-
connection.uncork();
166-
167-
this.output = [];
168-
this.outputEncodings = [];
169-
this.outputCallbacks = [];
170-
if (this._onPendingData !== null)
171-
this._onPendingData(-this.outputSize);
172-
this.outputSize = 0;
157+
this._flushOutput(connection);
173158
} else if (data.length === 0) {
174159
if (typeof callback === 'function')
175160
process.nextTick(callback);
@@ -194,7 +179,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
194179
this.outputEncodings.push(encoding);
195180
this.outputCallbacks.push(callback);
196181
this.outputSize += data.length;
197-
if (this._onPendingData !== null)
182+
if (typeof this._onPendingData === 'function')
198183
this._onPendingData(data.length);
199184
return false;
200185
};
@@ -630,26 +615,11 @@ OutgoingMessage.prototype._finish = function() {
630615
// to attempt to flush any pending messages out to the socket.
631616
OutgoingMessage.prototype._flush = function() {
632617
var socket = this.socket;
633-
var outputLength, ret;
618+
var ret;
634619

635620
if (socket && socket.writable) {
636621
// There might be remaining data in this.output; write it out
637-
outputLength = this.output.length;
638-
if (outputLength > 0) {
639-
var output = this.output;
640-
var outputEncodings = this.outputEncodings;
641-
var outputCallbacks = this.outputCallbacks;
642-
socket.cork();
643-
for (var i = 0; i < outputLength; i++) {
644-
ret = socket.write(output[i], outputEncodings[i],
645-
outputCallbacks[i]);
646-
}
647-
socket.uncork();
648-
649-
this.output = [];
650-
this.outputEncodings = [];
651-
this.outputCallbacks = [];
652-
}
622+
ret = this._flushOutput(socket);
653623

654624
if (this.finished) {
655625
// This is a queue to the server or client to bring in the next this.
@@ -661,6 +631,32 @@ OutgoingMessage.prototype._flush = function() {
661631
}
662632
};
663633

634+
OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
635+
var ret;
636+
var outputLength = this.output.length;
637+
if (outputLength <= 0)
638+
return ret;
639+
640+
var output = this.output;
641+
var outputEncodings = this.outputEncodings;
642+
var outputCallbacks = this.outputCallbacks;
643+
socket.cork();
644+
for (var i = 0; i < outputLength; i++) {
645+
ret = socket.write(output[i], outputEncodings[i],
646+
outputCallbacks[i]);
647+
}
648+
socket.uncork();
649+
650+
this.output = [];
651+
this.outputEncodings = [];
652+
this.outputCallbacks = [];
653+
if (typeof this._onPendingData === 'function')
654+
this._onPendingData(-this.outputSize);
655+
this.outputSize = 0;
656+
657+
return ret;
658+
};
659+
664660

665661
OutgoingMessage.prototype.flushHeaders = function() {
666662
if (!this._header) {

lib/_http_server.js

+33-6
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,10 @@ function connectionListener(socket) {
343343
socket.on = socketOnWrap;
344344

345345
var external = socket._handle._externalStream;
346-
if (external)
346+
if (external) {
347+
parser._consumed = true;
347348
parser.consume(external);
349+
}
348350
external = null;
349351
parser[kOnExecute] = onParserExecute;
350352

@@ -382,7 +384,7 @@ function connectionListener(socket) {
382384
socket.removeListener('data', socketOnData);
383385
socket.removeListener('end', socketOnEnd);
384386
socket.removeListener('close', serverSocketCloseListener);
385-
parser.unconsume(socket._handle._externalStream);
387+
unconsume(parser, socket);
386388
parser.finish();
387389
freeParser(parser, req, null);
388390
parser = null;
@@ -530,13 +532,38 @@ function connectionListener(socket) {
530532
exports._connectionListener = connectionListener;
531533

532534
function onSocketResume() {
533-
if (this._handle)
535+
// It may seem that the socket is resumed, but this is an enemy's trick to
536+
// deceive us! `resume` is emitted asynchronously, and may be called from
537+
// `incoming.readStart()`. Stop the socket again here, just to preserve the
538+
// state.
539+
//
540+
// We don't care about stream semantics for the consumed socket anyway.
541+
if (this._paused) {
542+
this.pause();
543+
return;
544+
}
545+
546+
if (this._handle && !this._handle.reading) {
547+
this._handle.reading = true;
534548
this._handle.readStart();
549+
}
535550
}
536551

537552
function onSocketPause() {
538-
if (this._handle)
553+
if (this._handle && this._handle.reading) {
554+
this._handle.reading = false;
539555
this._handle.readStop();
556+
}
557+
}
558+
559+
function unconsume(parser, socket) {
560+
if (socket._handle) {
561+
if (parser._consumed)
562+
parser.unconsume(socket._handle._externalStream);
563+
parser._consumed = false;
564+
socket.removeListener('pause', onSocketPause);
565+
socket.removeListener('resume', onSocketResume);
566+
}
540567
}
541568

542569
function socketOnWrap(ev, fn) {
@@ -546,8 +573,8 @@ function socketOnWrap(ev, fn) {
546573
return res;
547574
}
548575

549-
if (this._handle && (ev === 'data' || ev === 'readable'))
550-
this.parser.unconsume(this._handle._externalStream);
576+
if (ev === 'data' || ev === 'readable')
577+
unconsume(this.parser, this);
551578

552579
return res;
553580
}

src/node_http_parser.cc

+11-6
Original file line numberDiff line numberDiff line change
@@ -484,13 +484,18 @@ class Parser : public BaseObject {
484484
if (parser->prev_alloc_cb_.is_empty())
485485
return;
486486

487-
CHECK(args[0]->IsExternal());
488-
Local<External> stream_obj = args[0].As<External>();
489-
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
490-
CHECK_NE(stream, nullptr);
487+
// Restore stream's callbacks
488+
if (args.Length() == 1 && args[0]->IsExternal()) {
489+
Local<External> stream_obj = args[0].As<External>();
490+
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
491+
CHECK_NE(stream, nullptr);
492+
493+
stream->set_alloc_cb(parser->prev_alloc_cb_);
494+
stream->set_read_cb(parser->prev_read_cb_);
495+
}
491496

492-
stream->set_alloc_cb(parser->prev_alloc_cb_);
493-
stream->set_read_cb(parser->prev_read_cb_);
497+
parser->prev_alloc_cb_.clear();
498+
parser->prev_read_cb_.clear();
494499
}
495500

496501

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const http = require('http');
5+
const net = require('net');
6+
7+
const big = new Buffer(16 * 1024);
8+
big.fill('A');
9+
10+
const COUNT = 1e4;
11+
12+
var received = 0;
13+
14+
var client;
15+
const server = http.createServer(function(req, res) {
16+
res.end(big, function() {
17+
if (++received === COUNT) {
18+
server.close();
19+
client.end();
20+
}
21+
});
22+
}).listen(common.PORT, function() {
23+
var req = new Array(COUNT + 1).join('GET / HTTP/1.1\r\n\r\n');
24+
client = net.connect(common.PORT, function() {
25+
client.write(req);
26+
});
27+
28+
// Just let the test terminate instead of hanging
29+
client.on('close', function() {
30+
if (received !== COUNT)
31+
server.close();
32+
});
33+
client.resume();
34+
});
35+
36+
process.on('exit', function() {
37+
// The server should pause connection on pipeline flood, but it shoul still
38+
// resume it and finish processing the requests, when its output queue will
39+
// be empty again.
40+
assert.equal(received, COUNT);
41+
});

0 commit comments

Comments
 (0)