Skip to content

Commit 64b903e

Browse files
committed
http: reuse socket only when it is drained
Ensuring every request is assigned to a drained socket or nothing. Because is has no benifit for a request to be attached to a non drained socket and it prevents the request from being assigned to a drained one, which might occur soon or already in the free pool We achieve this by claiming a socket as free only when the socket is drained.
1 parent 3473d1b commit 64b903e

4 files changed

+94
-11
lines changed

lib/_http_client.js

+9-4
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,6 @@ ObjectSetPrototypeOf(ClientRequest.prototype, OutgoingMessage.prototype);
351351
ObjectSetPrototypeOf(ClientRequest, OutgoingMessage);
352352

353353
ClientRequest.prototype._finish = function _finish() {
354-
FunctionPrototypeCall(OutgoingMessage.prototype._finish, this);
355354
if (hasObserver('http')) {
356355
startPerf(this, kClientRequestStatistics, {
357356
type: 'http',
@@ -658,7 +657,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
658657

659658
// Add our listener first, so that we guarantee socket cleanup
660659
res.on('end', responseOnEnd);
661-
req.on('prefinish', requestOnPrefinish);
660+
req.on('finish', requestOnFinish);
662661
socket.on('timeout', responseOnTimeout);
663662

664663
// If the user did not listen for the 'response' event, then they
@@ -730,12 +729,16 @@ function responseOnEnd() {
730729
socket.end();
731730
}
732731
assert(!socket.writable);
733-
} else if (req.finished && !this.aborted) {
732+
} else if (req.writableFinished && !this.aborted) {
733+
assert(req.finished);
734734
// We can assume `req.finished` means all data has been written since:
735735
// - `'responseOnEnd'` means we have been assigned a socket.
736736
// - when we have a socket we write directly to it without buffering.
737737
// - `req.finished` means `end()` has been called and no further data.
738738
// can be written
739+
// In addition, `req.writableFinished` means all data written has been
740+
// accepted by the kernel. (i.e. the `req.socket` is drained).Without
741+
// this constraint, we may assign a non drained socket to a request.
739742
responseKeepAlive(req);
740743
}
741744
}
@@ -748,7 +751,9 @@ function responseOnTimeout() {
748751
res.emit('timeout');
749752
}
750753

751-
function requestOnPrefinish() {
754+
// This function is necessary in the case where we receive the entire reponse
755+
// from server before we finish sending out the request
756+
function requestOnFinish() {
752757
const req = this;
753758

754759
if (req.shouldKeepAlive && req._ended)

lib/_http_outgoing.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ const {
4040
} = primordials;
4141

4242
const { getDefaultHighWaterMark } = require('internal/streams/state');
43-
const assert = require('internal/assert');
4443
const EE = require('events');
4544
const Stream = require('stream');
4645
const internalUtil = require('internal/util');
@@ -985,10 +984,11 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
985984
};
986985

987986

988-
OutgoingMessage.prototype._finish = function _finish() {
989-
assert(this.socket);
990-
this.emit('prefinish');
991-
};
987+
// Subclasses HttpClientRequest and HttpServerResponse should
988+
// implement this function. This function is called once all user data
989+
// are flushed to the socket. Note that it has a chance that the socket
990+
// is not drained.
991+
OutgoingMessage.prototype._finish = function() {};
992992

993993

994994
// This logic is probably a bit confusing. Let me explain a bit:
@@ -1008,7 +1008,7 @@ OutgoingMessage.prototype._finish = function _finish() {
10081008
// the socket yet. Thus the outgoing messages need to be prepared to queue
10091009
// up data internally before sending it on further to the socket's queue.
10101010
//
1011-
// This function, outgoingFlush(), is called by both the Server and Client
1011+
// This function, _flush(), is called by both the Server and Client
10121012
// to attempt to flush any pending messages out to the socket.
10131013
OutgoingMessage.prototype._flush = function _flush() {
10141014
const socket = this.socket;

lib/_http_server.js

-1
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ ServerResponse.prototype._finish = function _finish() {
222222
},
223223
});
224224
}
225-
OutgoingMessage.prototype._finish.call(this);
226225
};
227226

228227

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const http = require('http');
5+
const net = require('net');
6+
7+
const agent = new http.Agent({
8+
keepAlive: true,
9+
maxFreeSockets: 1024
10+
});
11+
12+
const server = net.createServer({
13+
pauseOnConnect: true,
14+
}, (sock) => {
15+
// Do not read anything from `sock`
16+
sock.pause();
17+
sock.write('HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: Keep-Alive\r\n\r\n');
18+
});
19+
20+
server.listen(0, common.mustCall(() => {
21+
sendFstReq(server.address().port);
22+
}));
23+
24+
function sendFstReq(serverPort) {
25+
const req = http.request({
26+
agent,
27+
host: '127.0.0.1',
28+
port: serverPort,
29+
}, (res) => {
30+
res.on('data', noop);
31+
res.on('end', common.mustCall(() => {
32+
// Agent's socket reusing code is registered to process.nextTick(),
33+
// to ensure it take effect, fire in the next event loop
34+
setTimeout(sendSecReq, 10, serverPort, req.socket.localPort);
35+
}));
36+
});
37+
38+
// Overwhelm the flow control window
39+
// note that tcp over the loopback inteface has a large flow control window
40+
assert.strictEqual(req.write('a'.repeat(6_000_000)), false);
41+
42+
req.end();
43+
}
44+
45+
function sendSecReq(serverPort, fstReqCliPort) {
46+
// Make the second request, which should be sent on a new socket
47+
// because the first socket is not drained and hence can not be reused
48+
const req = http.request({
49+
agent,
50+
host: '127.0.0.1',
51+
port: serverPort,
52+
}, (res) => {
53+
res.on('data', noop);
54+
res.on('end', common.mustCall(() => {
55+
setTimeout(sendThrReq, 10, serverPort, req.socket.localPort);
56+
}));
57+
});
58+
59+
req.on('socket', common.mustCall((sock) => {
60+
assert.notStrictEqual(sock.localPort, fstReqCliPort);
61+
}));
62+
req.end();
63+
}
64+
65+
function sendThrReq(serverPort, secReqCliPort) {
66+
// Make the third request, the agent should reuse the second socket we just made
67+
const req = http.request({
68+
agent,
69+
host: '127.0.0.1',
70+
port: serverPort,
71+
}, noop);
72+
73+
req.on('socket', common.mustCall((sock) => {
74+
assert.strictEqual(sock.localPort, secReqCliPort);
75+
process.exit(0);
76+
}));
77+
}
78+
79+
function noop() { }

0 commit comments

Comments
 (0)