Skip to content

Commit 967b5db

Browse files
committed
http: Use streams3 directly, not .ondata/end
1 parent cec8159 commit 967b5db

6 files changed

+48
-45
lines changed

lib/_http_client.js

+13-5
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ function socketOnData(d) {
253253
var req = this._httpMessage;
254254
var parser = this.parser;
255255

256+
assert(parser);
257+
256258
var ret = parser.execute(d);
257259
if (ret instanceof Error) {
258260
debug('parse error');
@@ -266,8 +268,8 @@ function socketOnData(d) {
266268
var res = parser.incoming;
267269
req.res = res;
268270

269-
socket.ondata = null;
270-
socket.onend = null;
271+
socket.removeListener('data', socketOnData);
272+
socket.removeListener('end', socketOnEnd);
271273
parser.finish();
272274

273275
var bodyHead = d.slice(bytesParsed, d.length);
@@ -281,6 +283,10 @@ function socketOnData(d) {
281283
socket.removeListener('close', socketCloseListener);
282284
socket.removeListener('error', socketErrorListener);
283285

286+
// TODO(isaacs): Need a way to reset a stream to fresh state
287+
// IE, not flowing, and not explicitly paused.
288+
socket._readableState.flowing = null;
289+
284290
req.emit(eventName, res, socket, bodyHead);
285291
req.emit('close');
286292
} else {
@@ -293,6 +299,8 @@ function socketOnData(d) {
293299
// send a final response after this client sends a request
294300
// body. So, we must not free the parser.
295301
parser.incoming.statusCode !== 100) {
302+
socket.removeListener('data', socketOnData);
303+
socket.removeListener('end', socketOnEnd);
296304
freeParser(parser, req);
297305
}
298306
}
@@ -422,11 +430,11 @@ ClientRequest.prototype.onSocket = function(socket) {
422430
parser.maxHeaderPairs = 2000;
423431
}
424432

433+
parser.onIncoming = parserOnIncomingClient;
425434
socket.on('error', socketErrorListener);
426-
socket.ondata = socketOnData;
427-
socket.onend = socketOnEnd;
435+
socket.on('data', socketOnData);
436+
socket.on('end', socketOnEnd);
428437
socket.on('close', socketCloseListener);
429-
parser.onIncoming = parserOnIncomingClient;
430438
req.emit('socket', socket);
431439
});
432440

lib/_http_common.js

+2-4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ function parserOnHeaders(headers, url) {
5555
// info.url is not set for response parsers but that's not
5656
// applicable here since all our parsers are request parsers.
5757
function parserOnHeadersComplete(info) {
58+
debug('parserOnHeadersComplete', info);
5859
var parser = this;
5960
var headers = info.headers;
6061
var url = info.url;
@@ -200,11 +201,8 @@ function freeParser(parser, req) {
200201
if (parser) {
201202
parser._headers = [];
202203
parser.onIncoming = null;
203-
if (parser.socket) {
204-
parser.socket.onend = null;
205-
parser.socket.ondata = null;
204+
if (parser.socket)
206205
parser.socket.parser = null;
207-
}
208206
parser.socket = null;
209207
parser.incoming = null;
210208
parsers.free(parser);

lib/_http_server.js

+29-14
Original file line numberDiff line numberDiff line change
@@ -339,11 +339,19 @@ function connectionListener(socket) {
339339
parser.maxHeaderPairs = 2000;
340340
}
341341

342-
socket.addListener('error', function(e) {
342+
socket.addListener('error', socketOnError);
343+
socket.addListener('close', serverSocketCloseListener);
344+
parser.onIncoming = parserOnIncoming;
345+
socket.on('end', socketOnEnd);
346+
socket.on('data', socketOnData);
347+
348+
// TODO(isaacs): Move all these functions out of here
349+
function socketOnError(e) {
343350
self.emit('clientError', e, this);
344-
});
351+
}
345352

346-
socket.ondata = function(d) {
353+
function socketOnData(d) {
354+
debug('SERVER socketOnData %d', d.length);
347355
var ret = parser.execute(d);
348356
if (ret instanceof Error) {
349357
debug('parse error');
@@ -352,26 +360,32 @@ function connectionListener(socket) {
352360
// Upgrade or CONNECT
353361
var bytesParsed = ret;
354362
var req = parser.incoming;
363+
debug('SERVER upgrade or connect', req.method);
355364

356-
socket.ondata = null;
357-
socket.onend = null;
365+
socket.removeListener('data', socketOnData);
366+
socket.removeListener('end', socketOnEnd);
358367
socket.removeListener('close', serverSocketCloseListener);
359368
parser.finish();
360369
freeParser(parser, req);
361370

362371
var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
363372
if (EventEmitter.listenerCount(self, eventName) > 0) {
373+
debug('SERVER have listener for %s', eventName);
364374
var bodyHead = d.slice(bytesParsed, d.length);
365375

366-
self.emit(eventName, req, req.socket, bodyHead);
376+
// TODO(isaacs): Need a way to reset a stream to fresh state
377+
// IE, not flowing, and not explicitly paused.
378+
socket._readableState.flowing = null;
379+
self.emit(eventName, req, socket, bodyHead);
367380
} else {
368381
// Got upgrade header or CONNECT method, but have no handler.
369382
socket.destroy();
370383
}
371384
}
372-
};
385+
}
373386

374-
socket.onend = function() {
387+
function socketOnEnd() {
388+
var socket = this;
375389
var ret = parser.finish();
376390

377391
if (ret instanceof Error) {
@@ -390,14 +404,14 @@ function connectionListener(socket) {
390404
} else {
391405
if (socket.writable) socket.end();
392406
}
393-
};
407+
}
394408

395-
socket.addListener('close', serverSocketCloseListener);
396409

397410
// The following callback is issued after the headers have been read on a
398411
// new message. In this callback we setup the response object and pass it
399412
// to the user.
400-
parser.onIncoming = function(req, shouldKeepAlive) {
413+
414+
function parserOnIncoming(req, shouldKeepAlive) {
401415
incoming.push(req);
402416

403417
var res = new ServerResponse(req);
@@ -415,7 +429,8 @@ function connectionListener(socket) {
415429

416430
// When we're finished writing the response, check if this is the last
417431
// respose, if so destroy the socket.
418-
res.on('finish', function() {
432+
res.on('finish', resOnFinish);
433+
function resOnFinish() {
419434
// Usually the first incoming element should be our request. it may
420435
// be that in the case abortIncoming() was called that the incoming
421436
// array will be empty.
@@ -440,7 +455,7 @@ function connectionListener(socket) {
440455
m.assignSocket(socket);
441456
}
442457
}
443-
});
458+
}
444459

445460
if (!util.isUndefined(req.headers.expect) &&
446461
(req.httpVersionMajor == 1 && req.httpVersionMinor == 1) &&
@@ -456,6 +471,6 @@ function connectionListener(socket) {
456471
self.emit('request', req, res);
457472
}
458473
return false; // Not a HEAD response. (Not even a response!)
459-
};
474+
}
460475
}
461476
exports._connectionListener = connectionListener;

lib/_tls_legacy.js

-12
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,6 @@ function onCryptoStreamEnd() {
125125
} else {
126126
debug('encrypted.onend');
127127
}
128-
129-
if (this.onend) this.onend();
130128
}
131129

132130

@@ -306,16 +304,6 @@ CryptoStream.prototype._read = function read(size) {
306304
}
307305
} else {
308306
// Give them requested data
309-
if (this.ondata) {
310-
var self = this;
311-
this.ondata(pool, start, start + bytesRead);
312-
313-
// Consume data automatically
314-
// simple/test-https-drain fails without it
315-
process.nextTick(function() {
316-
self.read(bytesRead);
317-
});
318-
}
319307
this.push(pool.slice(start, start + bytesRead));
320308
}
321309

lib/net.js

+1-7
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,6 @@ function Socket(options) {
154154
this.readable = this.writable = false;
155155
}
156156

157-
this.onend = null;
158-
159157
// shut down the socket when we're finished with it.
160158
this.on('finish', onSocketFinish);
161159
this.on('_socketEnd', onSocketEnd);
@@ -507,9 +505,7 @@ function onread(nread, buffer) {
507505
self.bytesRead += nread;
508506

509507
// Optimization: emit the original buffer with end points
510-
var ret = true;
511-
if (self.ondata) self.ondata(buffer);
512-
else ret = self.push(buffer);
508+
var ret = self.push(buffer);
513509

514510
if (handle.reading && !ret) {
515511
handle.reading = false;
@@ -540,8 +536,6 @@ function onread(nread, buffer) {
540536
maybeDestroy(self);
541537
}
542538

543-
if (self.onend) self.once('end', self.onend);
544-
545539
// push a null to signal the end of data.
546540
self.push(null);
547541

test/simple/test-http-upgrade-server.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@ function testServer() {
5757

5858
request_upgradeHead = upgradeHead;
5959

60-
socket.ondata = function(d, start, end) {
61-
var data = d.toString('utf8', start, end);
60+
socket.on('data', function(d) {
61+
var data = d.toString('utf8');
6262
if (data == 'kill') {
6363
socket.end();
6464
} else {
6565
socket.write(data, 'utf8');
6666
}
67-
};
67+
});
6868
});
6969
}
7070

0 commit comments

Comments
 (0)