Skip to content

Commit 6070e88

Browse files
Flarnatargos
authored andcommitted
async_hooks: don't reuse resource in HttpAgent
As discussed in nodejs/diagnostics#248, #21313 and https://docs.google.com/document/d/1g8OrG5lMIUhRn1zbkutgY83MiTSMx-0NHDs8Bf-nXxM/preview reusing the resource object is a blocker for landing a resource based async hooks API and get rid of the promise destroy hook. This PR ensures that HttpAgent uses the a new resource object in case the socket handle gets reused. PR-URL: #27581 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent f872210 commit 6070e88

File tree

5 files changed

+141
-7
lines changed

5 files changed

+141
-7
lines changed

lib/_http_agent.js

+11-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ const { async_id_symbol } = require('internal/async_hooks').symbols;
4040
// ClientRequest.onSocket(). The Agent is now *strictly*
4141
// concerned with managing a connection pool.
4242

43+
class ReusedHandle {
44+
constructor(type, handle) {
45+
this.type = type;
46+
this.handle = handle;
47+
}
48+
}
49+
4350
function Agent(options) {
4451
if (!(this instanceof Agent))
4552
return new Agent(options);
@@ -166,10 +173,11 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
166173
// We have a free socket, so use that.
167174
var socket = this.freeSockets[name].shift();
168175
// Guard against an uninitialized or user supplied Socket.
169-
if (socket._handle && typeof socket._handle.asyncReset === 'function') {
176+
const handle = socket._handle;
177+
if (handle && typeof handle.asyncReset === 'function') {
170178
// Assign the handle a new asyncId and run any destroy()/init() hooks.
171-
socket._handle.asyncReset();
172-
socket[async_id_symbol] = socket._handle.getAsyncId();
179+
handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle));
180+
socket[async_id_symbol] = handle.getAsyncId();
173181
}
174182

175183
// don't leak

src/async_wrap.cc

+16-2
Original file line numberDiff line numberDiff line change
@@ -410,13 +410,26 @@ void AsyncWrap::PopAsyncIds(const FunctionCallbackInfo<Value>& args) {
410410

411411

412412
void AsyncWrap::AsyncReset(const FunctionCallbackInfo<Value>& args) {
413+
CHECK(args[0]->IsObject());
414+
413415
AsyncWrap* wrap;
414416
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
417+
418+
Local<Object> resource = args[0].As<Object>();
415419
double execution_async_id =
416-
args[0]->IsNumber() ? args[0].As<Number>()->Value() : kInvalidAsyncId;
417-
wrap->AsyncReset(execution_async_id);
420+
args[1]->IsNumber() ? args[1].As<Number>()->Value() : kInvalidAsyncId;
421+
wrap->AsyncReset(resource, execution_async_id);
418422
}
419423

424+
425+
void AsyncWrap::GetProviderType(const FunctionCallbackInfo<Value>& args) {
426+
AsyncWrap* wrap;
427+
args.GetReturnValue().Set(AsyncWrap::PROVIDER_NONE);
428+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
429+
args.GetReturnValue().Set(wrap->provider_type());
430+
}
431+
432+
420433
void AsyncWrap::EmitDestroy() {
421434
AsyncWrap::EmitDestroy(env(), async_id_);
422435
// Ensure no double destroy is emitted via AsyncReset().
@@ -437,6 +450,7 @@ Local<FunctionTemplate> AsyncWrap::GetConstructorTemplate(Environment* env) {
437450
tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "AsyncWrap"));
438451
env->SetProtoMethod(tmpl, "getAsyncId", AsyncWrap::GetAsyncId);
439452
env->SetProtoMethod(tmpl, "asyncReset", AsyncWrap::AsyncReset);
453+
env->SetProtoMethod(tmpl, "getProviderType", AsyncWrap::GetProviderType);
440454
env->set_async_wrap_ctor_template(tmpl);
441455
}
442456
return tmpl;

src/async_wrap.h

+1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ class AsyncWrap : public BaseObject {
133133
static void PushAsyncIds(const v8::FunctionCallbackInfo<v8::Value>& args);
134134
static void PopAsyncIds(const v8::FunctionCallbackInfo<v8::Value>& args);
135135
static void AsyncReset(const v8::FunctionCallbackInfo<v8::Value>& args);
136+
static void GetProviderType(const v8::FunctionCallbackInfo<v8::Value>& args);
136137
static void QueueDestroyAsyncId(
137138
const v8::FunctionCallbackInfo<v8::Value>& args);
138139

test/async-hooks/init-hooks.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class ActivityCollector {
158158
// events this makes sense for a few tests in which we enable some hooks
159159
// later
160160
if (this._allowNoInit) {
161-
const stub = { uid, type: 'Unknown', handleIsObject: true };
161+
const stub = { uid, type: 'Unknown', handleIsObject: true, handle: {} };
162162
this._activities.set(uid, stub);
163163
return stub;
164164
} else if (!common.isMainThread) {
@@ -184,7 +184,8 @@ class ActivityCollector {
184184
triggerAsyncId,
185185
// In some cases (e.g. Timeout) the handle is a function, thus the usual
186186
// `typeof handle === 'object' && handle !== null` check can't be used.
187-
handleIsObject: handle instanceof Object
187+
handleIsObject: handle instanceof Object,
188+
handle
188189
};
189190
this._stamp(activity, 'init');
190191
this._activities.set(uid, activity);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
'use strict';
2+
// Flags: --expose-internals
3+
const common = require('../common');
4+
const initHooks = require('./init-hooks');
5+
const { checkInvocations } = require('./hook-checks');
6+
const assert = require('assert');
7+
const { async_id_symbol } = require('internal/async_hooks').symbols;
8+
const http = require('http');
9+
10+
// Checks that the async resource used in init in case of a resused handle
11+
// is not reused. Test is based on parallel\test-async-hooks-http-agent.js.
12+
13+
const hooks = initHooks();
14+
hooks.enable();
15+
16+
let asyncIdAtFirstReq;
17+
let asyncIdAtSecondReq;
18+
19+
// Make sure a single socket is transparently reused for 2 requests.
20+
const agent = new http.Agent({
21+
keepAlive: true,
22+
keepAliveMsecs: Infinity,
23+
maxSockets: 1
24+
});
25+
26+
const server = http.createServer(common.mustCall((req, res) => {
27+
req.once('data', common.mustCallAtLeast(() => {
28+
res.writeHead(200, { 'Content-Type': 'text/plain' });
29+
res.write('foo');
30+
}));
31+
req.on('end', common.mustCall(() => {
32+
res.end('bar');
33+
}));
34+
}, 2)).listen(0, common.mustCall(() => {
35+
const port = server.address().port;
36+
const payload = 'hello world';
37+
38+
// First request. This is useless except for adding a socket to the
39+
// agent’s pool for reuse.
40+
const r1 = http.request({
41+
agent, port, method: 'POST'
42+
}, common.mustCall((res) => {
43+
// Remember which socket we used.
44+
const socket = res.socket;
45+
asyncIdAtFirstReq = socket[async_id_symbol];
46+
assert.ok(asyncIdAtFirstReq > 0, `${asyncIdAtFirstReq} > 0`);
47+
// Check that request and response share their socket.
48+
assert.strictEqual(r1.socket, socket);
49+
50+
res.on('data', common.mustCallAtLeast(() => {}));
51+
res.on('end', common.mustCall(() => {
52+
// setImmediate() to give the agent time to register the freed socket.
53+
setImmediate(common.mustCall(() => {
54+
// The socket is free for reuse now.
55+
assert.strictEqual(socket[async_id_symbol], -1);
56+
57+
// Second request. To re-create the exact conditions from the
58+
// referenced issue, we use a POST request without chunked encoding
59+
// (hence the Content-Length header) and call .end() after the
60+
// response header has already been received.
61+
const r2 = http.request({
62+
agent, port, method: 'POST', headers: {
63+
'Content-Length': payload.length
64+
}
65+
}, common.mustCall((res) => {
66+
asyncIdAtSecondReq = res.socket[async_id_symbol];
67+
assert.ok(asyncIdAtSecondReq > 0, `${asyncIdAtSecondReq} > 0`);
68+
assert.strictEqual(r2.socket, socket);
69+
70+
// Empty payload, to hit the “right” code path.
71+
r2.end('');
72+
73+
res.on('data', common.mustCallAtLeast(() => {}));
74+
res.on('end', common.mustCall(() => {
75+
// Clean up to let the event loop stop.
76+
server.close();
77+
agent.destroy();
78+
}));
79+
}));
80+
81+
// Schedule a payload to be written immediately, but do not end the
82+
// request just yet.
83+
r2.write(payload);
84+
}));
85+
}));
86+
}));
87+
r1.end(payload);
88+
}));
89+
90+
91+
process.on('exit', onExit);
92+
93+
function onExit() {
94+
hooks.disable();
95+
hooks.sanityCheck();
96+
const activities = hooks.activities;
97+
98+
// Verify both invocations
99+
const first = activities.filter((x) => x.uid === asyncIdAtFirstReq)[0];
100+
checkInvocations(first, { init: 1, destroy: 1 }, 'when process exits');
101+
102+
const second = activities.filter((x) => x.uid === asyncIdAtSecondReq)[0];
103+
checkInvocations(second, { init: 1, destroy: 1 }, 'when process exits');
104+
105+
// Verify reuse handle has been wrapped
106+
assert.strictEqual(first.type, second.type);
107+
assert.ok(first.handle !== second.handle, 'Resource reused');
108+
assert.ok(first.handle === second.handle.handle,
109+
'Resource not wrapped correctly');
110+
}

0 commit comments

Comments
 (0)