Skip to content

Commit 25ce458

Browse files
apapirovskiMylesBorins
authored andcommitted
net,src: refactor writeQueueSize tracking
Currently, writeQueueSize is never used in C++ and barely used within JS. Instead of constantly updating the value on the JS object, create a getter that will retrieve the most up-to-date value from C++. For the vast majority of cases though, create a new prop on Socket.prototype[kLastWriteQueueSize] using a Symbol. Use this to track the current write size, entirely in JS land. Backport-PR-URL: #18084 PR-URL: #17650 Reviewed-By: Anna Henningsen <[email protected]>
1 parent 81da708 commit 25ce458

14 files changed

+126
-185
lines changed

lib/_tls_wrap.js

-5
Original file line numberDiff line numberDiff line change
@@ -460,11 +460,6 @@ TLSSocket.prototype._init = function(socket, wrap) {
460460
var options = this._tlsOptions;
461461
var ssl = this._handle;
462462

463-
// lib/net.js expect this value to be non-zero if write hasn't been flushed
464-
// immediately. After the handshake is done this will represent the actual
465-
// write queue size
466-
ssl.writeQueueSize = 1;
467-
468463
this.server = options.server;
469464

470465
// For clients, we will always have either a given ca list or be using

lib/net.js

+21-12
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ const { nextTick } = require('internal/process/next_tick');
4848
const errors = require('internal/errors');
4949
const dns = require('dns');
5050

51+
const kLastWriteQueueSize = Symbol('lastWriteQueueSize');
52+
5153
// `cluster` is only used by `listenInCluster` so for startup performance
5254
// reasons it's lazy loaded.
5355
var cluster = null;
@@ -198,6 +200,7 @@ function Socket(options) {
198200
this._handle = null;
199201
this._parent = null;
200202
this._host = null;
203+
this[kLastWriteQueueSize] = 0;
201204

202205
if (typeof options === 'number')
203206
options = { fd: options }; // Legacy interface.
@@ -401,12 +404,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {
401404

402405

403406
Socket.prototype._onTimeout = function() {
404-
if (this._handle) {
405-
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
407+
const handle = this._handle;
408+
const lastWriteQueueSize = this[kLastWriteQueueSize];
409+
if (lastWriteQueueSize > 0 && handle) {
410+
// `lastWriteQueueSize !== writeQueueSize` means there is
406411
// an active write in progress, so we suppress the timeout.
407-
const prevWriteQueueSize = this._handle.writeQueueSize;
408-
if (prevWriteQueueSize > 0 &&
409-
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
412+
const writeQueueSize = handle.writeQueueSize;
413+
if (lastWriteQueueSize !== writeQueueSize) {
414+
this[kLastWriteQueueSize] = writeQueueSize;
410415
this._unrefTimer();
411416
return;
412417
}
@@ -476,7 +481,7 @@ Object.defineProperty(Socket.prototype, 'readyState', {
476481
Object.defineProperty(Socket.prototype, 'bufferSize', {
477482
get: function() {
478483
if (this._handle) {
479-
return this._handle.writeQueueSize + this.writableLength;
484+
return this[kLastWriteQueueSize] + this.writableLength;
480485
}
481486
}
482487
});
@@ -767,12 +772,13 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
767772

768773
this._bytesDispatched += req.bytes;
769774

770-
// If it was entirely flushed, we can write some more right now.
771-
// However, if more is left in the queue, then wait until that clears.
772-
if (req.async && this._handle.writeQueueSize !== 0)
773-
req.cb = cb;
774-
else
775+
if (!req.async) {
775776
cb();
777+
return;
778+
}
779+
780+
req.cb = cb;
781+
this[kLastWriteQueueSize] = req.bytes;
776782
};
777783

778784

@@ -856,6 +862,9 @@ function afterWrite(status, handle, req, err) {
856862
if (self !== process.stderr && self !== process.stdout)
857863
debug('afterWrite', status);
858864

865+
if (req.async)
866+
self[kLastWriteQueueSize] = 0;
867+
859868
// callback may come after call to destroy.
860869
if (self.destroyed) {
861870
debug('afterWrite destroyed');
@@ -875,7 +884,7 @@ function afterWrite(status, handle, req, err) {
875884
debug('afterWrite call cb');
876885

877886
if (req.cb)
878-
req.cb.call(self);
887+
req.cb.call(undefined);
879888
}
880889

881890

src/pipe_wrap.cc

-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ PipeWrap::PipeWrap(Environment* env,
166166
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
167167
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
168168
// Suggestion: uv_pipe_init() returns void.
169-
UpdateWriteQueueSize();
170169
}
171170

172171

src/stream_base.cc

+10-3
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
195195
}
196196

197197
err = DoWrite(req_wrap, buf_list, count, nullptr);
198-
req_wrap_obj->Set(env->async(), True(env->isolate()));
198+
if (HasWriteQueue())
199+
req_wrap_obj->Set(env->async(), True(env->isolate()));
199200

200201
if (err)
201202
req_wrap->Dispose();
@@ -253,7 +254,8 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
253254
}
254255

255256
err = DoWrite(req_wrap, bufs, count, nullptr);
256-
req_wrap_obj->Set(env->async(), True(env->isolate()));
257+
if (HasWriteQueue())
258+
req_wrap_obj->Set(env->async(), True(env->isolate()));
257259
req_wrap_obj->Set(env->buffer_string(), args[1]);
258260

259261
if (err)
@@ -379,7 +381,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
379381
reinterpret_cast<uv_stream_t*>(send_handle));
380382
}
381383

382-
req_wrap_obj->Set(env->async(), True(env->isolate()));
384+
if (HasWriteQueue())
385+
req_wrap_obj->Set(env->async(), True(env->isolate()));
383386

384387
if (err)
385388
req_wrap->Dispose();
@@ -473,6 +476,10 @@ int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
473476
return 0;
474477
}
475478

479+
bool StreamResource::HasWriteQueue() {
480+
return true;
481+
}
482+
476483

477484
const char* StreamResource::Error() const {
478485
return nullptr;

src/stream_base.h

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ class StreamResource {
162162
uv_buf_t* bufs,
163163
size_t count,
164164
uv_stream_t* send_handle) = 0;
165+
virtual bool HasWriteQueue();
165166
virtual const char* Error() const;
166167
virtual void ClearError();
167168

src/stream_wrap.cc

+28-20
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,15 @@
4040
namespace node {
4141

4242
using v8::Context;
43+
using v8::DontDelete;
4344
using v8::EscapableHandleScope;
4445
using v8::FunctionCallbackInfo;
4546
using v8::FunctionTemplate;
4647
using v8::HandleScope;
47-
using v8::Integer;
4848
using v8::Local;
4949
using v8::Object;
50+
using v8::ReadOnly;
51+
using v8::Signature;
5052
using v8::Value;
5153

5254

@@ -99,7 +101,16 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
99101
void LibuvStreamWrap::AddMethods(Environment* env,
100102
v8::Local<v8::FunctionTemplate> target,
101103
int flags) {
102-
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
104+
Local<FunctionTemplate> get_write_queue_size =
105+
FunctionTemplate::New(env->isolate(),
106+
GetWriteQueueSize,
107+
env->as_external(),
108+
Signature::New(env->isolate(), target));
109+
target->PrototypeTemplate()->SetAccessorProperty(
110+
env->write_queue_size_string(),
111+
get_write_queue_size,
112+
Local<FunctionTemplate>(),
113+
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
103114
env->SetProtoMethod(target, "setBlocking", SetBlocking);
104115
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
105116
}
@@ -135,17 +146,6 @@ bool LibuvStreamWrap::IsIPCPipe() {
135146
}
136147

137148

138-
uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
139-
HandleScope scope(env()->isolate());
140-
uint32_t write_queue_size = stream()->write_queue_size;
141-
object()->Set(env()->context(),
142-
env()->write_queue_size_string(),
143-
Integer::NewFromUnsigned(env()->isolate(),
144-
write_queue_size)).FromJust();
145-
return write_queue_size;
146-
}
147-
148-
149149
int LibuvStreamWrap::ReadStart() {
150150
return uv_read_start(stream(), OnAlloc, OnRead);
151151
}
@@ -267,13 +267,18 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
267267
}
268268

269269

270-
void LibuvStreamWrap::UpdateWriteQueueSize(
271-
const FunctionCallbackInfo<Value>& args) {
270+
void LibuvStreamWrap::GetWriteQueueSize(
271+
const FunctionCallbackInfo<Value>& info) {
272272
LibuvStreamWrap* wrap;
273-
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
273+
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
274+
275+
if (wrap->stream() == nullptr) {
276+
info.GetReturnValue().Set(0);
277+
return;
278+
}
274279

275-
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
276-
args.GetReturnValue().Set(write_queue_size);
280+
uint32_t write_queue_size = wrap->stream()->write_queue_size;
281+
info.GetReturnValue().Set(write_queue_size);
277282
}
278283

279284

@@ -370,12 +375,16 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
370375
}
371376

372377
w->Dispatched();
373-
UpdateWriteQueueSize();
374378

375379
return r;
376380
}
377381

378382

383+
bool LibuvStreamWrap::HasWriteQueue() {
384+
return stream()->write_queue_size > 0;
385+
}
386+
387+
379388
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
380389
WriteWrap* req_wrap = WriteWrap::from_req(req);
381390
CHECK_NE(req_wrap, nullptr);
@@ -387,7 +396,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
387396

388397
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
389398
StreamBase::AfterWrite(w, status);
390-
UpdateWriteQueueSize();
391399
}
392400

393401
} // namespace node

src/stream_wrap.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
5555
uv_buf_t* bufs,
5656
size_t count,
5757
uv_stream_t* send_handle) override;
58+
bool HasWriteQueue() override;
5859

5960
inline uv_stream_t* stream() const {
6061
return stream_;
@@ -83,15 +84,14 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
8384
}
8485

8586
AsyncWrap* GetAsyncWrap() override;
86-
uint32_t UpdateWriteQueueSize();
8787

8888
static void AddMethods(Environment* env,
8989
v8::Local<v8::FunctionTemplate> target,
9090
int flags = StreamBase::kFlagNone);
9191

9292
private:
93-
static void UpdateWriteQueueSize(
94-
const v8::FunctionCallbackInfo<v8::Value>& args);
93+
static void GetWriteQueueSize(
94+
const v8::FunctionCallbackInfo<v8::Value>& info);
9595
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
9696

9797
// Callbacks for libuv

src/tcp_wrap.cc

-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,6 @@ TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
170170
int r = uv_tcp_init(env->event_loop(), &handle_);
171171
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
172172
// Suggestion: uv_tcp_init() returns void.
173-
UpdateWriteQueueSize();
174173
}
175174

176175

src/tls_wrap.cc

+23-22
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,16 @@ namespace node {
3535
using crypto::SecureContext;
3636
using crypto::SSLWrap;
3737
using v8::Context;
38+
using v8::DontDelete;
3839
using v8::EscapableHandleScope;
3940
using v8::Exception;
4041
using v8::Function;
4142
using v8::FunctionCallbackInfo;
4243
using v8::FunctionTemplate;
43-
using v8::Integer;
4444
using v8::Local;
4545
using v8::Object;
46+
using v8::ReadOnly;
47+
using v8::Signature;
4648
using v8::String;
4749
using v8::Value;
4850

@@ -307,7 +309,6 @@ void TLSWrap::EncOut() {
307309

308310
// No data to write
309311
if (BIO_pending(enc_out_) == 0) {
310-
UpdateWriteQueueSize();
311312
if (clear_in_->Length() == 0)
312313
InvokeQueued(0);
313314
return;
@@ -553,17 +554,6 @@ bool TLSWrap::IsClosing() {
553554
}
554555

555556

556-
uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) {
557-
HandleScope scope(env()->isolate());
558-
if (write_queue_size == 0)
559-
write_queue_size = BIO_pending(enc_out_);
560-
object()->Set(env()->context(),
561-
env()->write_queue_size_string(),
562-
Integer::NewFromUnsigned(env()->isolate(),
563-
write_queue_size)).FromJust();
564-
return write_queue_size;
565-
}
566-
567557

568558
int TLSWrap::ReadStart() {
569559
if (stream_ != nullptr)
@@ -610,9 +600,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
610600
// However, if there is any data that should be written to the socket,
611601
// the callback should not be invoked immediately
612602
if (BIO_pending(enc_out_) == 0) {
613-
// net.js expects writeQueueSize to be > 0 if the write isn't
614-
// immediately flushed
615-
UpdateWriteQueueSize(1);
616603
return stream_->DoWrite(w, bufs, count, send_handle);
617604
}
618605
}
@@ -665,7 +652,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
665652

666653
// Try writing data immediately
667654
EncOut();
668-
UpdateWriteQueueSize();
669655

670656
return 0;
671657
}
@@ -937,12 +923,17 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
937923
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
938924

939925

940-
void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
926+
void TLSWrap::GetWriteQueueSize(const FunctionCallbackInfo<Value>& info) {
941927
TLSWrap* wrap;
942-
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
928+
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
943929

944-
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
945-
args.GetReturnValue().Set(write_queue_size);
930+
if (wrap->clear_in_ == nullptr) {
931+
info.GetReturnValue().Set(0);
932+
return;
933+
}
934+
935+
uint32_t write_queue_size = BIO_pending(wrap->enc_out_);
936+
info.GetReturnValue().Set(write_queue_size);
946937
}
947938

948939

@@ -965,14 +956,24 @@ void TLSWrap::Initialize(Local<Object> target,
965956
t->InstanceTemplate()->SetInternalFieldCount(1);
966957
t->SetClassName(tlsWrapString);
967958

959+
Local<FunctionTemplate> get_write_queue_size =
960+
FunctionTemplate::New(env->isolate(),
961+
GetWriteQueueSize,
962+
env->as_external(),
963+
Signature::New(env->isolate(), t));
964+
t->PrototypeTemplate()->SetAccessorProperty(
965+
env->write_queue_size_string(),
966+
get_write_queue_size,
967+
Local<FunctionTemplate>(),
968+
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
969+
968970
AsyncWrap::AddWrapMethods(env, t, AsyncWrap::kFlagHasReset);
969971
env->SetProtoMethod(t, "receive", Receive);
970972
env->SetProtoMethod(t, "start", Start);
971973
env->SetProtoMethod(t, "setVerifyMode", SetVerifyMode);
972974
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
973975
env->SetProtoMethod(t, "destroySSL", DestroySSL);
974976
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
975-
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);
976977

977978
StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
978979
SSLWrap<TLSWrap>::AddMethods(env, t);

0 commit comments

Comments
 (0)