Skip to content

Commit 14656e1

Browse files
puzpuzpuzcodebytere
authored andcommitted
async_hooks: don't reuse resource in HttpAgent when queued
PR-URL: #34439 Fixes: #34401 Refs: #27581 Reviewed-By: Vladimir de Turckheim <[email protected]> Reviewed-By: Gerhard Stöbich <[email protected]>
1 parent 70e9ece commit 14656e1

4 files changed

+154
-10
lines changed

lib/_http_agent.js

+26-9
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const EventEmitter = require('events');
3434
let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
3535
debug = fn;
3636
});
37+
const { AsyncResource } = require('async_hooks');
3738
const { async_id_symbol } = require('internal/async_hooks').symbols;
3839
const {
3940
codes: {
@@ -47,6 +48,7 @@ const { validateNumber } = require('internal/validators');
4748

4849
const kOnKeylog = Symbol('onkeylog');
4950
const kRequestOptions = Symbol('requestOptions');
51+
const kRequestAsyncResource = Symbol('requestAsyncResource');
5052
// New Agent code.
5153

5254
// The largest departure from the previous implementation is that
@@ -127,7 +129,17 @@ function Agent(options) {
127129
const requests = this.requests[name];
128130
if (requests && requests.length) {
129131
const req = requests.shift();
130-
setRequestSocket(this, req, socket);
132+
const reqAsyncRes = req[kRequestAsyncResource];
133+
if (reqAsyncRes) {
134+
// Run request within the original async context.
135+
reqAsyncRes.runInAsyncScope(() => {
136+
asyncResetHandle(socket);
137+
setRequestSocket(this, req, socket);
138+
});
139+
req[kRequestAsyncResource] = null;
140+
} else {
141+
setRequestSocket(this, req, socket);
142+
}
131143
if (requests.length === 0) {
132144
delete this.requests[name];
133145
}
@@ -253,14 +265,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
253265
const sockLen = freeLen + this.sockets[name].length;
254266

255267
if (socket) {
256-
// Guard against an uninitialized or user supplied Socket.
257-
const handle = socket._handle;
258-
if (handle && typeof handle.asyncReset === 'function') {
259-
// Assign the handle a new asyncId and run any destroy()/init() hooks.
260-
handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle));
261-
socket[async_id_symbol] = handle.getAsyncId();
262-
}
263-
268+
asyncResetHandle(socket);
264269
this.reuseSocket(socket, req);
265270
setRequestSocket(this, req, socket);
266271
this.sockets[name].push(socket);
@@ -284,6 +289,8 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
284289

285290
// Used to create sockets for pending requests from different origin
286291
req[kRequestOptions] = options;
292+
// Used to capture the original async context.
293+
req[kRequestAsyncResource] = new AsyncResource('QueuedRequest');
287294

288295
this.requests[name].push(req);
289296
}
@@ -493,6 +500,16 @@ function setRequestSocket(agent, req, socket) {
493500
socket.setTimeout(req.timeout);
494501
}
495502

503+
function asyncResetHandle(socket) {
504+
// Guard against an uninitialized or user supplied Socket.
505+
const handle = socket._handle;
506+
if (handle && typeof handle.asyncReset === 'function') {
507+
// Assign the handle a new asyncId and run any destroy()/init() hooks.
508+
handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle));
509+
socket[async_id_symbol] = handle.getAsyncId();
510+
}
511+
}
512+
496513
module.exports = {
497514
Agent,
498515
globalAgent: new Agent()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const { AsyncLocalStorage } = require('async_hooks');
5+
const http = require('http');
6+
7+
const asyncLocalStorage = new AsyncLocalStorage();
8+
9+
const agent = new http.Agent({
10+
maxSockets: 1
11+
});
12+
13+
const N = 3;
14+
let responses = 0;
15+
16+
const server = http.createServer(common.mustCall((req, res) => {
17+
res.end('ok');
18+
}, N));
19+
20+
server.listen(0, common.mustCall(() => {
21+
const port = server.address().port;
22+
23+
for (let i = 0; i < N; i++) {
24+
asyncLocalStorage.run(i, () => {
25+
http.get({ agent, port }, common.mustCall((res) => {
26+
assert.strictEqual(asyncLocalStorage.getStore(), i);
27+
if (++responses === N) {
28+
server.close();
29+
agent.destroy();
30+
}
31+
res.resume();
32+
}));
33+
});
34+
}
35+
}));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
'use strict';
2+
// Flags: --expose-internals
3+
const common = require('../common');
4+
const initHooks = require('./init-hooks');
5+
const { checkInvocations } = require('./hook-checks');
6+
const assert = require('assert');
7+
const { async_id_symbol } = require('internal/async_hooks').symbols;
8+
const http = require('http');
9+
10+
// Checks that the async resource used in init in case of a reused handle
11+
// is not reused. Test is based on parallel\test-async-hooks-http-agent.js.
12+
13+
const hooks = initHooks();
14+
hooks.enable();
15+
16+
const reqAsyncIds = [];
17+
let socket;
18+
let responses = 0;
19+
20+
// Make sure a single socket is transparently reused for 2 requests.
21+
const agent = new http.Agent({
22+
keepAlive: true,
23+
keepAliveMsecs: Infinity,
24+
maxSockets: 1
25+
});
26+
27+
const verifyRequest = (idx) => (res) => {
28+
reqAsyncIds[idx] = res.socket[async_id_symbol];
29+
assert.ok(reqAsyncIds[idx] > 0, `${reqAsyncIds[idx]} > 0`);
30+
if (socket) {
31+
// Check that both requests share their socket.
32+
assert.strictEqual(res.socket, socket);
33+
} else {
34+
socket = res.socket;
35+
}
36+
37+
res.on('data', common.mustCallAtLeast(() => {}));
38+
res.on('end', common.mustCall(() => {
39+
if (++responses === 2) {
40+
// Clean up to let the event loop stop.
41+
server.close();
42+
agent.destroy();
43+
}
44+
}));
45+
};
46+
47+
const server = http.createServer(common.mustCall((req, res) => {
48+
req.once('data', common.mustCallAtLeast(() => {
49+
res.writeHead(200, { 'Content-Type': 'text/plain' });
50+
res.write('foo');
51+
}));
52+
req.on('end', common.mustCall(() => {
53+
res.end('bar');
54+
}));
55+
}, 2)).listen(0, common.mustCall(() => {
56+
const port = server.address().port;
57+
const payload = 'hello world';
58+
59+
// First request.
60+
const r1 = http.request({
61+
agent, port, method: 'POST'
62+
}, common.mustCall(verifyRequest(0)));
63+
r1.end(payload);
64+
65+
// Second request. Sent in parallel with the first one.
66+
const r2 = http.request({
67+
agent, port, method: 'POST'
68+
}, common.mustCall(verifyRequest(1)));
69+
r2.end(payload);
70+
}));
71+
72+
73+
process.on('exit', onExit);
74+
75+
function onExit() {
76+
hooks.disable();
77+
hooks.sanityCheck();
78+
const activities = hooks.activities;
79+
80+
// Verify both invocations
81+
const first = activities.filter((x) => x.uid === reqAsyncIds[0])[0];
82+
checkInvocations(first, { init: 1, destroy: 1 }, 'when process exits');
83+
84+
const second = activities.filter((x) => x.uid === reqAsyncIds[1])[0];
85+
checkInvocations(second, { init: 1, destroy: 1 }, 'when process exits');
86+
87+
// Verify reuse handle has been wrapped
88+
assert.strictEqual(first.type, second.type);
89+
assert.ok(first.handle !== second.handle, 'Resource reused');
90+
assert.ok(first.handle === second.handle.handle,
91+
'Resource not wrapped correctly');
92+
}

test/async-hooks/test-http-agent-handle-reuse.js test/async-hooks/test-http-agent-handle-reuse-serial.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const assert = require('assert');
77
const { async_id_symbol } = require('internal/async_hooks').symbols;
88
const http = require('http');
99

10-
// Checks that the async resource used in init in case of a resused handle
10+
// Checks that the async resource used in init in case of a reused handle
1111
// is not reused. Test is based on parallel\test-async-hooks-http-agent.js.
1212

1313
const hooks = initHooks();

0 commit comments

Comments
 (0)