Skip to content

Commit ab03635

Browse files
committed
http: fix stalled pipeline bug
This is a two-part fix: - Fix pending data notification in `OutgoingMessage` to notify server about flushed data too - Fix pause/resume behavior for the consumed socket. `resume` event is emitted on a next tick, and `socket._paused` can already be `true` at this time. Pause the socket again to avoid PAUSED error on parser. Fix: #3332 PR-URL: #3342 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Trevor Norris <[email protected]>
1 parent f45c315 commit ab03635

5 files changed

+120
-47
lines changed

lib/_http_common.js

+4
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ var parsers = new FreeList('parsers', 1000, function() {
140140

141141
parser._headers = [];
142142
parser._url = '';
143+
parser._consumed = false;
143144

144145
// Only called in the slow case where slow means
145146
// that the request headers were either fragmented
@@ -167,6 +168,9 @@ function freeParser(parser, req, socket) {
167168
if (parser) {
168169
parser._headers = [];
169170
parser.onIncoming = null;
171+
if (parser._consumed)
172+
parser.unconsume();
173+
parser._consumed = false;
170174
if (parser.socket)
171175
parser.socket.parser = null;
172176
parser.socket = null;

lib/_http_outgoing.js

+31-35
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
135135
this.outputEncodings.unshift('binary');
136136
this.outputCallbacks.unshift(null);
137137
this.outputSize += this._header.length;
138-
if (this._onPendingData !== null)
138+
if (typeof this._onPendingData === 'function')
139139
this._onPendingData(this._header.length);
140140
}
141141
this._headerSent = true;
@@ -158,22 +158,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
158158
// There might be pending data in the this.output buffer.
159159
var outputLength = this.output.length;
160160
if (outputLength > 0) {
161-
var output = this.output;
162-
var outputEncodings = this.outputEncodings;
163-
var outputCallbacks = this.outputCallbacks;
164-
connection.cork();
165-
for (var i = 0; i < outputLength; i++) {
166-
connection.write(output[i], outputEncodings[i],
167-
outputCallbacks[i]);
168-
}
169-
connection.uncork();
170-
171-
this.output = [];
172-
this.outputEncodings = [];
173-
this.outputCallbacks = [];
174-
if (this._onPendingData !== null)
175-
this._onPendingData(-this.outputSize);
176-
this.outputSize = 0;
161+
this._flushOutput(connection);
177162
} else if (data.length === 0) {
178163
if (typeof callback === 'function')
179164
process.nextTick(callback);
@@ -198,7 +183,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
198183
this.outputEncodings.push(encoding);
199184
this.outputCallbacks.push(callback);
200185
this.outputSize += data.length;
201-
if (this._onPendingData !== null)
186+
if (typeof this._onPendingData === 'function')
202187
this._onPendingData(data.length);
203188
return false;
204189
};
@@ -644,26 +629,11 @@ OutgoingMessage.prototype._finish = function() {
644629
// to attempt to flush any pending messages out to the socket.
645630
OutgoingMessage.prototype._flush = function() {
646631
var socket = this.socket;
647-
var outputLength, ret;
632+
var ret;
648633

649634
if (socket && socket.writable) {
650635
// There might be remaining data in this.output; write it out
651-
outputLength = this.output.length;
652-
if (outputLength > 0) {
653-
var output = this.output;
654-
var outputEncodings = this.outputEncodings;
655-
var outputCallbacks = this.outputCallbacks;
656-
socket.cork();
657-
for (var i = 0; i < outputLength; i++) {
658-
ret = socket.write(output[i], outputEncodings[i],
659-
outputCallbacks[i]);
660-
}
661-
socket.uncork();
662-
663-
this.output = [];
664-
this.outputEncodings = [];
665-
this.outputCallbacks = [];
666-
}
636+
ret = this._flushOutput(socket);
667637

668638
if (this.finished) {
669639
// This is a queue to the server or client to bring in the next this.
@@ -675,6 +645,32 @@ OutgoingMessage.prototype._flush = function() {
675645
}
676646
};
677647

648+
OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
649+
var ret;
650+
var outputLength = this.output.length;
651+
if (outputLength <= 0)
652+
return ret;
653+
654+
var output = this.output;
655+
var outputEncodings = this.outputEncodings;
656+
var outputCallbacks = this.outputCallbacks;
657+
socket.cork();
658+
for (var i = 0; i < outputLength; i++) {
659+
ret = socket.write(output[i], outputEncodings[i],
660+
outputCallbacks[i]);
661+
}
662+
socket.uncork();
663+
664+
this.output = [];
665+
this.outputEncodings = [];
666+
this.outputCallbacks = [];
667+
if (typeof this._onPendingData === 'function')
668+
this._onPendingData(-this.outputSize);
669+
this.outputSize = 0;
670+
671+
return ret;
672+
};
673+
678674

679675
OutgoingMessage.prototype.flushHeaders = function() {
680676
if (!this._header) {

lib/_http_server.js

+33-6
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,10 @@ function connectionListener(socket) {
343343
socket.on = socketOnWrap;
344344

345345
var external = socket._handle._externalStream;
346-
if (external)
346+
if (external) {
347+
parser._consumed = true;
347348
parser.consume(external);
349+
}
348350
external = null;
349351
parser[kOnExecute] = onParserExecute;
350352

@@ -382,7 +384,7 @@ function connectionListener(socket) {
382384
socket.removeListener('data', socketOnData);
383385
socket.removeListener('end', socketOnEnd);
384386
socket.removeListener('close', serverSocketCloseListener);
385-
parser.unconsume(socket._handle._externalStream);
387+
unconsume(parser, socket);
386388
parser.finish();
387389
freeParser(parser, req, null);
388390
parser = null;
@@ -530,13 +532,38 @@ function connectionListener(socket) {
530532
exports._connectionListener = connectionListener;
531533

532534
function onSocketResume() {
533-
if (this._handle)
535+
// It may seem that the socket is resumed, but this is an enemy's trick to
536+
// deceive us! `resume` is emitted asynchronously, and may be called from
537+
// `incoming.readStart()`. Stop the socket again here, just to preserve the
538+
// state.
539+
//
540+
// We don't care about stream semantics for the consumed socket anyway.
541+
if (this._paused) {
542+
this.pause();
543+
return;
544+
}
545+
546+
if (this._handle && !this._handle.reading) {
547+
this._handle.reading = true;
534548
this._handle.readStart();
549+
}
535550
}
536551

537552
function onSocketPause() {
538-
if (this._handle)
553+
if (this._handle && this._handle.reading) {
554+
this._handle.reading = false;
539555
this._handle.readStop();
556+
}
557+
}
558+
559+
function unconsume(parser, socket) {
560+
if (socket._handle) {
561+
if (parser._consumed)
562+
parser.unconsume(socket._handle._externalStream);
563+
parser._consumed = false;
564+
socket.removeListener('pause', onSocketPause);
565+
socket.removeListener('resume', onSocketResume);
566+
}
540567
}
541568

542569
function socketOnWrap(ev, fn) {
@@ -546,8 +573,8 @@ function socketOnWrap(ev, fn) {
546573
return res;
547574
}
548575

549-
if (this._handle && (ev === 'data' || ev === 'readable'))
550-
this.parser.unconsume(this._handle._externalStream);
576+
if (ev === 'data' || ev === 'readable')
577+
unconsume(this.parser, this);
551578

552579
return res;
553580
}

src/node_http_parser.cc

+11-6
Original file line numberDiff line numberDiff line change
@@ -484,13 +484,18 @@ class Parser : public BaseObject {
484484
if (parser->prev_alloc_cb_.is_empty())
485485
return;
486486

487-
CHECK(args[0]->IsExternal());
488-
Local<External> stream_obj = args[0].As<External>();
489-
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
490-
CHECK_NE(stream, nullptr);
487+
// Restore stream's callbacks
488+
if (args.Length() == 1 && args[0]->IsExternal()) {
489+
Local<External> stream_obj = args[0].As<External>();
490+
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
491+
CHECK_NE(stream, nullptr);
492+
493+
stream->set_alloc_cb(parser->prev_alloc_cb_);
494+
stream->set_read_cb(parser->prev_read_cb_);
495+
}
491496

492-
stream->set_alloc_cb(parser->prev_alloc_cb_);
493-
stream->set_read_cb(parser->prev_read_cb_);
497+
parser->prev_alloc_cb_.clear();
498+
parser->prev_read_cb_.clear();
494499
}
495500

496501

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const http = require('http');
5+
const net = require('net');
6+
7+
const big = new Buffer(16 * 1024);
8+
big.fill('A');
9+
10+
const COUNT = 1e4;
11+
12+
var received = 0;
13+
14+
var client;
15+
const server = http.createServer(function(req, res) {
16+
res.end(big, function() {
17+
if (++received === COUNT) {
18+
server.close();
19+
client.end();
20+
}
21+
});
22+
}).listen(common.PORT, function() {
23+
var req = new Array(COUNT + 1).join('GET / HTTP/1.1\r\n\r\n');
24+
client = net.connect(common.PORT, function() {
25+
client.write(req);
26+
});
27+
28+
// Just let the test terminate instead of hanging
29+
client.on('close', function() {
30+
if (received !== COUNT)
31+
server.close();
32+
});
33+
client.resume();
34+
});
35+
36+
process.on('exit', function() {
37+
// The server should pause connection on pipeline flood, but it shoul still
38+
// resume it and finish processing the requests, when its output queue will
39+
// be empty again.
40+
assert.equal(received, COUNT);
41+
});

0 commit comments

Comments
 (0)