Skip to content

Commit 1d36931

Browse files
committed
http: Refactor for streams2
Because of some of the peculiarities of http, this has a bit of special magic to handle cases where the IncomingMessage would wait forever in a paused state. In the server, if you do not begin consuming the request body by the time the response emits 'finish', then it will be flushed out. In the client, if you do not add a 'response' handler onto the request, then the response stream will be flushed out.
1 parent 81e3562 commit 1d36931

File tree

1 file changed

+64
-70
lines changed

1 file changed

+64
-70
lines changed

lib/http.js

+64-70
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,30 @@ function parserOnHeadersComplete(info) {
114114
return skipBody;
115115
}
116116

117+
// XXX This is a mess.
118+
// TODO: http.Parser should be a Writable emits request/response events.
117119
function parserOnBody(b, start, len) {
118120
var parser = this;
119-
var slice = b.slice(start, start + len);
120-
if (parser.incoming._paused || parser.incoming._pendings.length) {
121-
parser.incoming._pendings.push(slice);
122-
} else {
123-
parser.incoming._emitData(slice);
121+
var stream = parser.incoming;
122+
var rs = stream._readableState;
123+
var socket = stream.socket;
124+
125+
// pretend this was the result of a stream._read call.
126+
if (len > 0) {
127+
var slice = b.slice(start, start + len);
128+
rs.onread(null, slice);
124129
}
130+
131+
if (rs.length >= rs.highWaterMark)
132+
socket.pause();
125133
}
126134

127135
function parserOnMessageComplete() {
128136
var parser = this;
129-
parser.incoming.complete = true;
137+
var stream = parser.incoming;
138+
var socket = stream.socket;
139+
140+
stream.complete = true;
130141

131142
// Emit any trailing headers.
132143
var headers = parser._headers;
@@ -140,19 +151,13 @@ function parserOnMessageComplete() {
140151
parser._url = '';
141152
}
142153

143-
if (!parser.incoming.upgrade) {
154+
if (!stream.upgrade)
144155
// For upgraded connections, also emit this after parser.execute
145-
if (parser.incoming._paused || parser.incoming._pendings.length) {
146-
parser.incoming._pendings.push(END_OF_FILE);
147-
} else {
148-
parser.incoming.readable = false;
149-
parser.incoming._emitEnd();
150-
}
151-
}
156+
stream._readableState.onread(null, null);
152157

153158
if (parser.socket.readable) {
154159
// force to read the next incoming message
155-
parser.socket.resume();
160+
socket.resume();
156161
}
157162
}
158163

@@ -263,9 +268,13 @@ function utcDate() {
263268

264269
/* Abstract base class for ServerRequest and ClientResponse. */
265270
function IncomingMessage(socket) {
266-
Stream.call(this);
271+
Stream.Readable.call(this);
272+
273+
// XXX This implementation is kind of all over the place
274+
// When the parser emits body chunks, they go in this list.
275+
// _read() pulls them out, and when it finds EOF, it ends.
276+
this._pendings = [];
267277

268-
// TODO Remove one of these eventually.
269278
this.socket = socket;
270279
this.connection = socket;
271280

@@ -276,77 +285,49 @@ function IncomingMessage(socket) {
276285

277286
this.readable = true;
278287

279-
this._paused = false;
280288
this._pendings = [];
281-
282-
this._endEmitted = false;
289+
this._pendingIndex = 0;
283290

284291
// request (server) only
285292
this.url = '';
286-
287293
this.method = null;
288294

289295
// response (client) only
290296
this.statusCode = null;
291297
this.client = this.socket;
298+
299+
// flag for backwards compatibility grossness.
300+
this._consuming = false;
292301
}
293-
util.inherits(IncomingMessage, Stream);
302+
util.inherits(IncomingMessage, Stream.Readable);
294303

295304

296305
exports.IncomingMessage = IncomingMessage;
297306

298307

299-
IncomingMessage.prototype.destroy = function(error) {
300-
this.socket.destroy(error);
308+
IncomingMessage.prototype.read = function(n) {
309+
this._consuming = true;
310+
return Stream.Readable.prototype.read.call(this, n);
301311
};
302312

303313

304-
IncomingMessage.prototype.setEncoding = function(encoding) {
305-
var StringDecoder = require('string_decoder').StringDecoder; // lazy load
306-
this._decoder = new StringDecoder(encoding);
307-
};
308-
309-
310-
IncomingMessage.prototype.pause = function() {
311-
this._paused = true;
312-
this.socket.pause();
314+
IncomingMessage.prototype._read = function(n, callback) {
315+
// We actually do almost nothing here, because the parserOnBody
316+
// function fills up our internal buffer directly. However, we
317+
// do need to unpause the underlying socket so that it flows.
318+
if (!this.socket.readable)
319+
return callback(null, null);
320+
else
321+
this.socket.resume();
313322
};
314323

315324

316-
IncomingMessage.prototype.resume = function() {
317-
this._paused = false;
318-
if (this.socket) {
319-
this.socket.resume();
320-
}
321-
322-
this._emitPending();
325+
IncomingMessage.prototype.destroy = function(error) {
326+
this.socket.destroy(error);
323327
};
324328

325329

326-
IncomingMessage.prototype._emitPending = function(callback) {
327-
if (this._pendings.length) {
328-
var self = this;
329-
process.nextTick(function() {
330-
while (!self._paused && self._pendings.length) {
331-
var chunk = self._pendings.shift();
332-
if (chunk !== END_OF_FILE) {
333-
assert(Buffer.isBuffer(chunk));
334-
self._emitData(chunk);
335-
} else {
336-
assert(self._pendings.length === 0);
337-
self.readable = false;
338-
self._emitEnd();
339-
}
340-
}
341330

342-
if (callback) {
343-
callback();
344-
}
345-
});
346-
} else if (callback) {
347-
callback();
348-
}
349-
};
350331

351332

352333
IncomingMessage.prototype._emitData = function(d) {
@@ -1016,7 +997,7 @@ ServerResponse.prototype.writeHead = function(statusCode) {
1016997

1017998
// don't keep alive connections where the client expects 100 Continue
1018999
// but we sent a final status; they may put extra bytes on the wire.
1019-
if (this._expect_continue && ! this._sent100) {
1000+
if (this._expect_continue && !this._sent100) {
10201001
this.shouldKeepAlive = false;
10211002
}
10221003

@@ -1321,11 +1302,10 @@ function socketCloseListener() {
13211302
// Socket closed before we emitted 'end' below.
13221303
req.res.emit('aborted');
13231304
var res = req.res;
1324-
req.res._emitPending(function() {
1325-
res._emitEnd();
1305+
res.on('end', function() {
13261306
res.emit('close');
1327-
res = null;
13281307
});
1308+
res._readableState.onread(null, null);
13291309
} else if (!req.res && !req._hadError) {
13301310
// This socket error fired before we started to
13311311
// receive a response. The error needs to
@@ -1428,11 +1408,13 @@ function socketOnData(d, start, end) {
14281408
}
14291409

14301410

1411+
// client
14311412
function parserOnIncomingClient(res, shouldKeepAlive) {
14321413
var parser = this;
14331414
var socket = this.socket;
14341415
var req = socket._httpMessage;
14351416

1417+
14361418
// propogate "domain" setting...
14371419
if (req.domain && !res.domain) {
14381420
debug('setting "res.domain"');
@@ -1480,15 +1462,21 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
14801462

14811463
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
14821464
COUNTER_HTTP_CLIENT_RESPONSE();
1483-
req.emit('response', res);
14841465
req.res = res;
14851466
res.req = req;
1486-
1467+
var handled = req.emit('response', res);
14871468
res.on('end', responseOnEnd);
14881469

1470+
// If the user did not listen for the 'response' event, then they
1471+
// can't possibly read the data, so we .resume() it into the void
1472+
// so that the socket doesn't hang there in a paused state.
1473+
if (!handled)
1474+
res.resume();
1475+
14891476
return isHeadResponse;
14901477
}
14911478

1479+
// client
14921480
function responseOnEnd() {
14931481
var res = this;
14941482
var req = res.req;
@@ -1784,7 +1772,7 @@ function connectionListener(socket) {
17841772
incoming.push(req);
17851773

17861774
var res = new ServerResponse(req);
1787-
debug('server response shouldKeepAlive: ' + shouldKeepAlive);
1775+
17881776
res.shouldKeepAlive = shouldKeepAlive;
17891777
DTRACE_HTTP_SERVER_REQUEST(req, socket);
17901778
COUNTER_HTTP_SERVER_REQUEST();
@@ -1806,6 +1794,12 @@ function connectionListener(socket) {
18061794

18071795
incoming.shift();
18081796

1797+
// if the user never called req.read(), and didn't pipe() or
1798+
// .resume() or .on('data'), then we call req.resume() so that the
1799+
// bytes will be pulled off the wire.
1800+
if (!req._consuming)
1801+
req.resume();
1802+
18091803
res.detachSocket(socket);
18101804

18111805
if (res._last) {

0 commit comments

Comments
 (0)