Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tls: fix writeQueueSize prop, long write timeouts #15791

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
@@ -455,9 +455,8 @@ TLSSocket.prototype._init = function(socket, wrap) {
var ssl = this._handle;

// lib/net.js expect this value to be non-zero if write hasn't been flushed
// immediately
// TODO(indutny): revise this solution, it might be 1 before handshake and
// represent real writeQueueSize during regular writes.
// immediately. After the handshake is done this will represent the actual
// write queue size
ssl.writeQueueSize = 1;

this.server = options.server;
8 changes: 8 additions & 0 deletions lib/net.js
Original file line number Diff line number Diff line change
@@ -397,6 +397,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {


Socket.prototype._onTimeout = function() {
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
// an active write in progress, so we suppress the timeout.
const prevWriteQueueSize = this._handle.writeQueueSize;
if (prevWriteQueueSize > 0 &&
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
this._unrefTimer();
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unref? That doesn't quite match what I understood when I read "suppress the timeout"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll call _unrefActive so that it can come back and check this condition again in whatever the _idleTimeout is, if need be. Possible the comment is confusing but I'm obviously familiar with what it does so I'm a bad judge.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So … I might be wrong here, but judging from the naming I’d think _unrefActive only affects whether the timer handle is ref()ed or not, i.e. whether it is keeping the process alive or not … does that sound about right?

Copy link
Contributor Author

@apapirovski apapirovski Oct 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, missed the notification — _unrefActive re-schedules a timer for a timer that's a part of the unrefedLists.

node/lib/timers.js

Lines 141 to 151 in 686e092

// Schedule or re-schedule a timer.
// The item must have been enroll()'d first.
const active = exports.active = function(item) {
insert(item, false);
};
// Internal APIs that need timeouts should use `_unrefActive()` instead of
// `active()` so that they do not unnecessarily keep the process open.
exports._unrefActive = function(item) {
insert(item, true);
};

Not sure if that's a great explanation... ? Let me know if it isn't.

(Admittedly the name _unrefTimer in net.js is rather misleading given that it's more like _activeTimer but I didn't name it... :()

Also, let me know once you're done reviewing and if all looks good. Planning to merge this weekend.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, don’t feel like you need to wait on anything from me! There’s nothing obviously wrong in this PR.

And yes, this should just be renamed…

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Just being extra safe since this will go into LTS and is a fairly major change to something that's been broken for quite a while. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific time frame? You can just add a baking-for-lts label to indicate that the releaser (likely @gibfahn) should let this sit a while before pulling it in, if you would feel more comfortable with that.

(On the other hand, if you feel comfortable not waiting any longer than usual, feel free to voice that as well!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a reasonably serious bug fix, it hasn't broken CITGM and no one else has expressed desire for having it bake, I think having it go in at the next opportunity should be fine. Thanks again for reviewing ❤️

}
debug('_onTimeout');
this.emit('timeout');
};
22 changes: 18 additions & 4 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
@@ -104,6 +104,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
void LibuvStreamWrap::AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags) {
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
env->SetProtoMethod(target, "setBlocking", SetBlocking);
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
}
@@ -144,11 +145,14 @@ bool LibuvStreamWrap::IsIPCPipe() {
}


void LibuvStreamWrap::UpdateWriteQueueSize() {
uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
HandleScope scope(env()->isolate());
Local<Integer> write_queue_size =
Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
object()->Set(env()->write_queue_size_string(), write_queue_size);
uint32_t write_queue_size = stream()->write_queue_size;
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


@@ -273,6 +277,16 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
}


void LibuvStreamWrap::UpdateWriteQueueSize(
const FunctionCallbackInfo<Value>& args) {
LibuvStreamWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
}


void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
LibuvStreamWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
4 changes: 3 additions & 1 deletion src/stream_wrap.h
Original file line number Diff line number Diff line change
@@ -84,13 +84,15 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
}

AsyncWrap* GetAsyncWrap() override;
void UpdateWriteQueueSize();
uint32_t UpdateWriteQueueSize();

static void AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags = StreamBase::kFlagNone);

private:
static void UpdateWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);

// Callbacks for libuv
34 changes: 32 additions & 2 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::String;
@@ -297,6 +298,7 @@ void TLSWrap::EncOut() {

// No data to write
if (BIO_pending(enc_out_) == 0) {
UpdateWriteQueueSize();
if (clear_in_->Length() == 0)
InvokeQueued(0);
return;
@@ -551,6 +553,18 @@ bool TLSWrap::IsClosing() {
}


uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) {
HandleScope scope(env()->isolate());
if (write_queue_size == 0)
write_queue_size = BIO_pending(enc_out_);
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


int TLSWrap::ReadStart() {
return stream_->ReadStart();
}
@@ -591,8 +605,12 @@ int TLSWrap::DoWrite(WriteWrap* w,
ClearOut();
// However, if there is any data that should be written to the socket,
// the callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0)
if (BIO_pending(enc_out_) == 0) {
// net.js expects writeQueueSize to be > 0 if the write isn't
// immediately flushed
UpdateWriteQueueSize(1);
return stream_->DoWrite(w, bufs, count, send_handle);
}
}

// Queue callback to execute it on next tick
@@ -642,13 +660,15 @@ int TLSWrap::DoWrite(WriteWrap* w,

// Try writing data immediately
EncOut();
UpdateWriteQueueSize();

return 0;
}


void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
// Intentionally empty
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
wrap->UpdateWriteQueueSize();
}


@@ -912,6 +932,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB


void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
TLSWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
}


void TLSWrap::Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context) {
@@ -938,6 +967,7 @@ void TLSWrap::Initialize(Local<Object> target,
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
env->SetProtoMethod(t, "destroySSL", DestroySSL);
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);

StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
SSLWrap<TLSWrap>::AddMethods(env, t);
5 changes: 5 additions & 0 deletions src/tls_wrap.h
Original file line number Diff line number Diff line change
@@ -132,6 +132,7 @@ class TLSWrap : public AsyncWrap,

AsyncWrap* GetAsyncWrap() override;
bool IsIPCPipe() override;
uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0);

// Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
@@ -187,6 +188,10 @@ class TLSWrap : public AsyncWrap,
// If true - delivered EOF to the js-land, either after `close_notify`, or
// after the `UV_EOF` on socket.
bool eof_;

private:
static void UpdateWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& args);
};

} // namespace node
43 changes: 43 additions & 0 deletions test/parallel/test-tls-buffersize.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const fixtures = require('../common/fixtures');
const tls = require('tls');

const iter = 10;
const overhead = 30;

const server = tls.createServer({
key: fixtures.readKey('agent2-key.pem'),
cert: fixtures.readKey('agent2-cert.pem')
}, common.mustCall((socket) => {
socket.on('readable', common.mustCallAtLeast(() => {
socket.read();
}, 1));

socket.on('end', common.mustCall(() => {
server.close();
}));
}));

server.listen(0, common.mustCall(() => {
const client = tls.connect({
port: server.address().port,
rejectUnauthorized: false
}, common.mustCall(() => {
assert.strictEqual(client.bufferSize, 0);

for (let i = 1; i < iter; i++) {
client.write('a');
assert.strictEqual(client.bufferSize, i + overhead);
}

client.on('finish', common.mustCall(() => {
assert.strictEqual(client.bufferSize, 0);
}));

client.end();
}));
}));
80 changes: 80 additions & 0 deletions test/sequential/test-http-keep-alive-large-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');

// This test assesses whether long-running writes can complete
// or timeout because the socket is not aware that the backing
// stream is still writing.
// To simulate a slow client, we write a really large chunk and
// then proceed through the following cycle:
// 1) Receive first 'data' event and record currently written size
// 2) Once we've read up to currently written size recorded above,
// we pause the stream and wait longer than the server timeout
// 3) Socket.prototype._onTimeout triggers and should confirm
// that the backing stream is still active and writing
// 4) Our timer fires, we resume the socket and start at 1)

const minReadSize = 250000;
const serverTimeout = common.platformTimeout(500);
let offsetTimeout = common.platformTimeout(100);
let serverConnectionHandle;
let writeSize = 3000000;
let didReceiveData = false;
// this represents each cycles write size, where the cycle consists
// of `write > read > _onTimeout`
let currentWriteSize = 0;

const server = http.createServer(common.mustCall((req, res) => {
const content = Buffer.alloc(writeSize, 0x44);

res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Length': content.length.toString(),
'Vary': 'Accept-Encoding'
});

serverConnectionHandle = res.socket._handle;
res.write(content);
res.end();
}));
server.setTimeout(serverTimeout);
server.on('timeout', () => {
assert.strictEqual(didReceiveData, false, 'Should not timeout');
});

server.listen(0, common.mustCall(() => {
http.get({
path: '/',
port: server.address().port
}, common.mustCall((res) => {
const resume = () => res.resume();
let receivedBufferLength = 0;
let firstReceivedAt;
res.on('data', common.mustCallAtLeast((buf) => {
if (receivedBufferLength === 0) {
currentWriteSize = Math.max(
minReadSize,
writeSize - serverConnectionHandle.writeQueueSize
);
didReceiveData = false;
firstReceivedAt = Date.now();
}
receivedBufferLength += buf.length;
if (receivedBufferLength >= currentWriteSize) {
didReceiveData = true;
writeSize = serverConnectionHandle.writeQueueSize;
receivedBufferLength = 0;
res.pause();
setTimeout(
resume,
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you comment on why we need this formula to calculate the throttle time?

Copy link
Contributor Author

@apapirovski apapirovski Oct 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to always wait after the _onTimeout fires but not give it a chance to fire twice. On slower systems (or if this were in parallel) the processing of data can sometimes take long enough that after several iterations of reading, it's offset enough that two timeouts get a chance to fire. It's rare but I'm just being extra defensive here.

);
offsetTimeout = 0;
}
}, 1));
res.on('end', common.mustCall(() => {
server.close();
}));
}));
}));
87 changes: 87 additions & 0 deletions test/sequential/test-https-keep-alive-large-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const fixtures = require('../common/fixtures');
const https = require('https');

// This test assesses whether long-running writes can complete
// or timeout because the socket is not aware that the backing
// stream is still writing.
// To simulate a slow client, we write a really large chunk and
// then proceed through the following cycle:
// 1) Receive first 'data' event and record currently written size
// 2) Once we've read up to currently written size recorded above,
// we pause the stream and wait longer than the server timeout
// 3) Socket.prototype._onTimeout triggers and should confirm
// that the backing stream is still active and writing
// 4) Our timer fires, we resume the socket and start at 1)

const minReadSize = 250000;
const serverTimeout = common.platformTimeout(500);
let offsetTimeout = common.platformTimeout(100);
let serverConnectionHandle;
let writeSize = 2000000;
let didReceiveData = false;
// this represents each cycles write size, where the cycle consists
// of `write > read > _onTimeout`
let currentWriteSize = 0;

const server = https.createServer({
key: fixtures.readKey('agent1-key.pem'),
cert: fixtures.readKey('agent1-cert.pem')
}, common.mustCall((req, res) => {
const content = Buffer.alloc(writeSize, 0x44);

res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Length': content.length.toString(),
'Vary': 'Accept-Encoding'
});

serverConnectionHandle = res.socket._handle;
res.write(content);
res.end();
}));
server.setTimeout(serverTimeout);
server.on('timeout', () => {
assert.strictEqual(didReceiveData, false, 'Should not timeout');
});

server.listen(0, common.mustCall(() => {
https.get({
path: '/',
port: server.address().port,
rejectUnauthorized: false
}, common.mustCall((res) => {
const resume = () => res.resume();
let receivedBufferLength = 0;
let firstReceivedAt;
res.on('data', common.mustCallAtLeast((buf) => {
if (receivedBufferLength === 0) {
currentWriteSize = Math.max(
minReadSize,
writeSize - serverConnectionHandle.writeQueueSize
);
didReceiveData = false;
firstReceivedAt = Date.now();
}
receivedBufferLength += buf.length;
if (receivedBufferLength >= currentWriteSize) {
didReceiveData = true;
writeSize = serverConnectionHandle.writeQueueSize;
receivedBufferLength = 0;
res.pause();
setTimeout(
resume,
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
);
offsetTimeout = 0;
}
}, 1));
res.on('end', common.mustCall(() => {
server.close();
}));
}));
}));