Skip to content

Commit a627c5f

Browse files
committed
net: fix timeouts during long writes
Add updateWriteQueueSize which updates and returns queue size (net & tls). Make _onTimeout check whether an active write is ongoing and if so, call _unrefTimer rather than emitting a timeout event. Add http & https test that checks whether long-lasting (but active) writes timeout or can finish writing as expected. PR-URL: #15791 Fixes: #15082 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Refael Ackermann <[email protected]> Reviewed-By: Fedor Indutny <[email protected]>
1 parent aaf2a1c commit a627c5f

7 files changed

+210
-5
lines changed

lib/net.js

+8
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {
397397

398398

399399
Socket.prototype._onTimeout = function() {
400+
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
401+
// an active write in progress, so we suppress the timeout.
402+
const prevWriteQueueSize = this._handle.writeQueueSize;
403+
if (prevWriteQueueSize > 0 &&
404+
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
405+
this._unrefTimer();
406+
return;
407+
}
400408
debug('_onTimeout');
401409
this.emit('timeout');
402410
};

src/stream_wrap.cc

+18-4
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
104104
void LibuvStreamWrap::AddMethods(Environment* env,
105105
v8::Local<v8::FunctionTemplate> target,
106106
int flags) {
107+
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
107108
env->SetProtoMethod(target, "setBlocking", SetBlocking);
108109
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
109110
}
@@ -144,11 +145,14 @@ bool LibuvStreamWrap::IsIPCPipe() {
144145
}
145146

146147

147-
void LibuvStreamWrap::UpdateWriteQueueSize() {
148+
uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
148149
HandleScope scope(env()->isolate());
149-
Local<Integer> write_queue_size =
150-
Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
151-
object()->Set(env()->write_queue_size_string(), write_queue_size);
150+
uint32_t write_queue_size = stream()->write_queue_size;
151+
object()->Set(env()->context(),
152+
env()->write_queue_size_string(),
153+
Integer::NewFromUnsigned(env()->isolate(),
154+
write_queue_size)).FromJust();
155+
return write_queue_size;
152156
}
153157

154158

@@ -273,6 +277,16 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
273277
}
274278

275279

280+
void LibuvStreamWrap::UpdateWriteQueueSize(
281+
const FunctionCallbackInfo<Value>& args) {
282+
LibuvStreamWrap* wrap;
283+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
284+
285+
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
286+
args.GetReturnValue().Set(write_queue_size);
287+
}
288+
289+
276290
void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
277291
LibuvStreamWrap* wrap;
278292
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

src/stream_wrap.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,15 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
8484
}
8585

8686
AsyncWrap* GetAsyncWrap() override;
87-
void UpdateWriteQueueSize();
87+
uint32_t UpdateWriteQueueSize();
8888

8989
static void AddMethods(Environment* env,
9090
v8::Local<v8::FunctionTemplate> target,
9191
int flags = StreamBase::kFlagNone);
9292

9393
private:
94+
static void UpdateWriteQueueSize(
95+
const v8::FunctionCallbackInfo<v8::Value>& args);
9496
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
9597

9698
// Callbacks for libuv

src/tls_wrap.cc

+10
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
932932
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
933933

934934

935+
void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
936+
TLSWrap* wrap;
937+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
938+
939+
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
940+
args.GetReturnValue().Set(write_queue_size);
941+
}
942+
943+
935944
void TLSWrap::Initialize(Local<Object> target,
936945
Local<Value> unused,
937946
Local<Context> context) {
@@ -958,6 +967,7 @@ void TLSWrap::Initialize(Local<Object> target,
958967
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
959968
env->SetProtoMethod(t, "destroySSL", DestroySSL);
960969
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
970+
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);
961971

962972
StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
963973
SSLWrap<TLSWrap>::AddMethods(env, t);

src/tls_wrap.h

+4
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ class TLSWrap : public AsyncWrap,
188188
// If true - delivered EOF to the js-land, either after `close_notify`, or
189189
// after the `UV_EOF` on socket.
190190
bool eof_;
191+
192+
private:
193+
static void UpdateWriteQueueSize(
194+
const v8::FunctionCallbackInfo<v8::Value>& args);
191195
};
192196

193197
} // namespace node
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const http = require('http');
5+
6+
// This test assesses whether long-running writes can complete
7+
// or timeout because the socket is not aware that the backing
8+
// stream is still writing.
9+
// To simulate a slow client, we write a really large chunk and
10+
// then proceed through the following cycle:
11+
// 1) Receive first 'data' event and record currently written size
12+
// 2) Once we've read up to currently written size recorded above,
13+
// we pause the stream and wait longer than the server timeout
14+
// 3) Socket.prototype._onTimeout triggers and should confirm
15+
// that the backing stream is still active and writing
16+
// 4) Our timer fires, we resume the socket and start at 1)
17+
18+
const minReadSize = 250000;
19+
const serverTimeout = common.platformTimeout(500);
20+
let offsetTimeout = common.platformTimeout(100);
21+
let serverConnectionHandle;
22+
let writeSize = 3000000;
23+
let didReceiveData = false;
24+
// this represents each cycles write size, where the cycle consists
25+
// of `write > read > _onTimeout`
26+
let currentWriteSize = 0;
27+
28+
const server = http.createServer(common.mustCall((req, res) => {
29+
const content = Buffer.alloc(writeSize, 0x44);
30+
31+
res.writeHead(200, {
32+
'Content-Type': 'application/octet-stream',
33+
'Content-Length': content.length.toString(),
34+
'Vary': 'Accept-Encoding'
35+
});
36+
37+
serverConnectionHandle = res.socket._handle;
38+
res.write(content);
39+
res.end();
40+
}));
41+
server.setTimeout(serverTimeout);
42+
server.on('timeout', () => {
43+
assert.strictEqual(didReceiveData, false, 'Should not timeout');
44+
});
45+
46+
server.listen(0, common.mustCall(() => {
47+
http.get({
48+
path: '/',
49+
port: server.address().port
50+
}, common.mustCall((res) => {
51+
const resume = () => res.resume();
52+
let receivedBufferLength = 0;
53+
let firstReceivedAt;
54+
res.on('data', common.mustCallAtLeast((buf) => {
55+
if (receivedBufferLength === 0) {
56+
currentWriteSize = Math.max(
57+
minReadSize,
58+
writeSize - serverConnectionHandle.writeQueueSize
59+
);
60+
didReceiveData = false;
61+
firstReceivedAt = Date.now();
62+
}
63+
receivedBufferLength += buf.length;
64+
if (receivedBufferLength >= currentWriteSize) {
65+
didReceiveData = true;
66+
writeSize = serverConnectionHandle.writeQueueSize;
67+
receivedBufferLength = 0;
68+
res.pause();
69+
setTimeout(
70+
resume,
71+
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
72+
);
73+
offsetTimeout = 0;
74+
}
75+
}, 1));
76+
res.on('end', common.mustCall(() => {
77+
server.close();
78+
}));
79+
}));
80+
}));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (!common.hasCrypto)
4+
common.skip('missing crypto');
5+
const assert = require('assert');
6+
const fixtures = require('../common/fixtures');
7+
const https = require('https');
8+
9+
// This test assesses whether long-running writes can complete
10+
// or timeout because the socket is not aware that the backing
11+
// stream is still writing.
12+
// To simulate a slow client, we write a really large chunk and
13+
// then proceed through the following cycle:
14+
// 1) Receive first 'data' event and record currently written size
15+
// 2) Once we've read up to currently written size recorded above,
16+
// we pause the stream and wait longer than the server timeout
17+
// 3) Socket.prototype._onTimeout triggers and should confirm
18+
// that the backing stream is still active and writing
19+
// 4) Our timer fires, we resume the socket and start at 1)
20+
21+
const minReadSize = 250000;
22+
const serverTimeout = common.platformTimeout(500);
23+
let offsetTimeout = common.platformTimeout(100);
24+
let serverConnectionHandle;
25+
let writeSize = 2000000;
26+
let didReceiveData = false;
27+
// this represents each cycles write size, where the cycle consists
28+
// of `write > read > _onTimeout`
29+
let currentWriteSize = 0;
30+
31+
const server = https.createServer({
32+
key: fixtures.readKey('agent1-key.pem'),
33+
cert: fixtures.readKey('agent1-cert.pem')
34+
}, common.mustCall((req, res) => {
35+
const content = Buffer.alloc(writeSize, 0x44);
36+
37+
res.writeHead(200, {
38+
'Content-Type': 'application/octet-stream',
39+
'Content-Length': content.length.toString(),
40+
'Vary': 'Accept-Encoding'
41+
});
42+
43+
serverConnectionHandle = res.socket._handle;
44+
res.write(content);
45+
res.end();
46+
}));
47+
server.setTimeout(serverTimeout);
48+
server.on('timeout', () => {
49+
assert.strictEqual(didReceiveData, false, 'Should not timeout');
50+
});
51+
52+
server.listen(0, common.mustCall(() => {
53+
https.get({
54+
path: '/',
55+
port: server.address().port,
56+
rejectUnauthorized: false
57+
}, common.mustCall((res) => {
58+
const resume = () => res.resume();
59+
let receivedBufferLength = 0;
60+
let firstReceivedAt;
61+
res.on('data', common.mustCallAtLeast((buf) => {
62+
if (receivedBufferLength === 0) {
63+
currentWriteSize = Math.max(
64+
minReadSize,
65+
writeSize - serverConnectionHandle.writeQueueSize
66+
);
67+
didReceiveData = false;
68+
firstReceivedAt = Date.now();
69+
}
70+
receivedBufferLength += buf.length;
71+
if (receivedBufferLength >= currentWriteSize) {
72+
didReceiveData = true;
73+
writeSize = serverConnectionHandle.writeQueueSize;
74+
receivedBufferLength = 0;
75+
res.pause();
76+
setTimeout(
77+
resume,
78+
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
79+
);
80+
offsetTimeout = 0;
81+
}
82+
}, 1));
83+
res.on('end', common.mustCall(() => {
84+
server.close();
85+
}));
86+
}));
87+
}));

0 commit comments

Comments
 (0)