Skip to content

Commit a301c1a

Browse files
apapirovskiMylesBorins
authored andcommitted
net: fix timeouts during long writes
Add updateWriteQueueSize which updates and returns queue size (net & tls). Make _onTimeout check whether an active write is ongoing and if so, call _unrefTimer rather than emitting a timeout event. Add http & https test that checks whether long-lasting (but active) writes timeout or can finish writing as expected. Backport-PR-URL: #16420 PR-URL: #15791 Fixes: #15082 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Refael Ackermann <[email protected]> Reviewed-By: Fedor Indutny <[email protected]>
1 parent 521dc25 commit a301c1a

7 files changed

+210
-5
lines changed

lib/net.js

+8
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {
334334

335335

336336
Socket.prototype._onTimeout = function() {
337+
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
338+
// an active write in progress, so we suppress the timeout.
339+
const prevWriteQueueSize = this._handle.writeQueueSize;
340+
if (prevWriteQueueSize > 0 &&
341+
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
342+
this._unrefTimer();
343+
return;
344+
}
337345
debug('_onTimeout');
338346
this.emit('timeout');
339347
};

src/stream_wrap.cc

+18-4
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ StreamWrap::StreamWrap(Environment* env,
7373
void StreamWrap::AddMethods(Environment* env,
7474
v8::Local<v8::FunctionTemplate> target,
7575
int flags) {
76+
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
7677
env->SetProtoMethod(target, "setBlocking", SetBlocking);
7778
StreamBase::AddMethods<StreamWrap>(env, target, flags);
7879
}
@@ -113,11 +114,14 @@ bool StreamWrap::IsIPCPipe() {
113114
}
114115

115116

116-
void StreamWrap::UpdateWriteQueueSize() {
117+
uint32_t StreamWrap::UpdateWriteQueueSize() {
117118
HandleScope scope(env()->isolate());
118-
Local<Integer> write_queue_size =
119-
Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
120-
object()->Set(env()->write_queue_size_string(), write_queue_size);
119+
uint32_t write_queue_size = stream()->write_queue_size;
120+
object()->Set(env()->context(),
121+
env()->write_queue_size_string(),
122+
Integer::NewFromUnsigned(env()->isolate(),
123+
write_queue_size)).FromJust();
124+
return write_queue_size;
121125
}
122126

123127

@@ -242,6 +246,16 @@ void StreamWrap::OnRead(uv_stream_t* handle,
242246
}
243247

244248

249+
void StreamWrap::UpdateWriteQueueSize(
250+
const FunctionCallbackInfo<Value>& args) {
251+
StreamWrap* wrap;
252+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
253+
254+
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
255+
args.GetReturnValue().Set(write_queue_size);
256+
}
257+
258+
245259
void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
246260
StreamWrap* wrap;
247261
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

src/stream_wrap.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,15 @@ class StreamWrap : public HandleWrap, public StreamBase {
6767
}
6868

6969
AsyncWrap* GetAsyncWrap() override;
70-
void UpdateWriteQueueSize();
70+
uint32_t UpdateWriteQueueSize();
7171

7272
static void AddMethods(Environment* env,
7373
v8::Local<v8::FunctionTemplate> target,
7474
int flags = StreamBase::kFlagNone);
7575

7676
private:
77+
static void UpdateWriteQueueSize(
78+
const v8::FunctionCallbackInfo<v8::Value>& args);
7779
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
7880

7981
// Callbacks for libuv

src/tls_wrap.cc

+10
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
911911
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
912912

913913

914+
void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
915+
TLSWrap* wrap;
916+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
917+
918+
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
919+
args.GetReturnValue().Set(write_queue_size);
920+
}
921+
922+
914923
void TLSWrap::Initialize(Local<Object> target,
915924
Local<Value> unused,
916925
Local<Context> context) {
@@ -932,6 +941,7 @@ void TLSWrap::Initialize(Local<Object> target,
932941
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
933942
env->SetProtoMethod(t, "destroySSL", DestroySSL);
934943
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
944+
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);
935945

936946
StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
937947
SSLWrap<TLSWrap>::AddMethods(env, t);

src/tls_wrap.h

+4
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ class TLSWrap : public AsyncWrap,
167167
// If true - delivered EOF to the js-land, either after `close_notify`, or
168168
// after the `UV_EOF` on socket.
169169
bool eof_;
170+
171+
private:
172+
static void UpdateWriteQueueSize(
173+
const v8::FunctionCallbackInfo<v8::Value>& args);
170174
};
171175

172176
} // namespace node
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const http = require('http');
5+
6+
// This test assesses whether long-running writes can complete
7+
// or timeout because the socket is not aware that the backing
8+
// stream is still writing.
9+
// To simulate a slow client, we write a really large chunk and
10+
// then proceed through the following cycle:
11+
// 1) Receive first 'data' event and record currently written size
12+
// 2) Once we've read up to currently written size recorded above,
13+
// we pause the stream and wait longer than the server timeout
14+
// 3) Socket.prototype._onTimeout triggers and should confirm
15+
// that the backing stream is still active and writing
16+
// 4) Our timer fires, we resume the socket and start at 1)
17+
18+
const minReadSize = 250000;
19+
const serverTimeout = common.platformTimeout(500);
20+
let offsetTimeout = common.platformTimeout(100);
21+
let serverConnectionHandle;
22+
let writeSize = 3000000;
23+
let didReceiveData = false;
24+
// this represents each cycles write size, where the cycle consists
25+
// of `write > read > _onTimeout`
26+
let currentWriteSize = 0;
27+
28+
const server = http.createServer(common.mustCall((req, res) => {
29+
const content = Buffer.alloc(writeSize, 0x44);
30+
31+
res.writeHead(200, {
32+
'Content-Type': 'application/octet-stream',
33+
'Content-Length': content.length.toString(),
34+
'Vary': 'Accept-Encoding'
35+
});
36+
37+
serverConnectionHandle = res.socket._handle;
38+
res.write(content);
39+
res.end();
40+
}));
41+
server.setTimeout(serverTimeout);
42+
server.on('timeout', () => {
43+
assert.strictEqual(didReceiveData, false, 'Should not timeout');
44+
});
45+
46+
server.listen(0, common.mustCall(() => {
47+
http.get({
48+
path: '/',
49+
port: server.address().port
50+
}, common.mustCall((res) => {
51+
const resume = () => res.resume();
52+
let receivedBufferLength = 0;
53+
let firstReceivedAt;
54+
res.on('data', common.mustCallAtLeast((buf) => {
55+
if (receivedBufferLength === 0) {
56+
currentWriteSize = Math.max(
57+
minReadSize,
58+
writeSize - serverConnectionHandle.writeQueueSize
59+
);
60+
didReceiveData = false;
61+
firstReceivedAt = Date.now();
62+
}
63+
receivedBufferLength += buf.length;
64+
if (receivedBufferLength >= currentWriteSize) {
65+
didReceiveData = true;
66+
writeSize = serverConnectionHandle.writeQueueSize;
67+
receivedBufferLength = 0;
68+
res.pause();
69+
setTimeout(
70+
resume,
71+
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
72+
);
73+
offsetTimeout = 0;
74+
}
75+
}, 1));
76+
res.on('end', common.mustCall(() => {
77+
server.close();
78+
}));
79+
}));
80+
}));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (!common.hasCrypto)
4+
common.skip('missing crypto');
5+
const assert = require('assert');
6+
const fixtures = require('../common/fixtures');
7+
const https = require('https');
8+
9+
// This test assesses whether long-running writes can complete
10+
// or timeout because the socket is not aware that the backing
11+
// stream is still writing.
12+
// To simulate a slow client, we write a really large chunk and
13+
// then proceed through the following cycle:
14+
// 1) Receive first 'data' event and record currently written size
15+
// 2) Once we've read up to currently written size recorded above,
16+
// we pause the stream and wait longer than the server timeout
17+
// 3) Socket.prototype._onTimeout triggers and should confirm
18+
// that the backing stream is still active and writing
19+
// 4) Our timer fires, we resume the socket and start at 1)
20+
21+
const minReadSize = 250000;
22+
const serverTimeout = common.platformTimeout(500);
23+
let offsetTimeout = common.platformTimeout(100);
24+
let serverConnectionHandle;
25+
let writeSize = 2000000;
26+
let didReceiveData = false;
27+
// this represents each cycles write size, where the cycle consists
28+
// of `write > read > _onTimeout`
29+
let currentWriteSize = 0;
30+
31+
const server = https.createServer({
32+
key: fixtures.readKey('agent1-key.pem'),
33+
cert: fixtures.readKey('agent1-cert.pem')
34+
}, common.mustCall((req, res) => {
35+
const content = Buffer.alloc(writeSize, 0x44);
36+
37+
res.writeHead(200, {
38+
'Content-Type': 'application/octet-stream',
39+
'Content-Length': content.length.toString(),
40+
'Vary': 'Accept-Encoding'
41+
});
42+
43+
serverConnectionHandle = res.socket._handle;
44+
res.write(content);
45+
res.end();
46+
}));
47+
server.setTimeout(serverTimeout);
48+
server.on('timeout', () => {
49+
assert.strictEqual(didReceiveData, false, 'Should not timeout');
50+
});
51+
52+
server.listen(0, common.mustCall(() => {
53+
https.get({
54+
path: '/',
55+
port: server.address().port,
56+
rejectUnauthorized: false
57+
}, common.mustCall((res) => {
58+
const resume = () => res.resume();
59+
let receivedBufferLength = 0;
60+
let firstReceivedAt;
61+
res.on('data', common.mustCallAtLeast((buf) => {
62+
if (receivedBufferLength === 0) {
63+
currentWriteSize = Math.max(
64+
minReadSize,
65+
writeSize - serverConnectionHandle.writeQueueSize
66+
);
67+
didReceiveData = false;
68+
firstReceivedAt = Date.now();
69+
}
70+
receivedBufferLength += buf.length;
71+
if (receivedBufferLength >= currentWriteSize) {
72+
didReceiveData = true;
73+
writeSize = serverConnectionHandle.writeQueueSize;
74+
receivedBufferLength = 0;
75+
res.pause();
76+
setTimeout(
77+
resume,
78+
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
79+
);
80+
offsetTimeout = 0;
81+
}
82+
}, 1));
83+
res.on('end', common.mustCall(() => {
84+
server.close();
85+
}));
86+
}));
87+
}));

0 commit comments

Comments
 (0)