Skip to content

Commit f9a17f5

Browse files
apapirovskiaddaleax
authored andcommitted
net,src: refactor writeQueueSize tracking
Currently, writeQueueSize is never used in C++ and barely used within JS. Instead of constantly updating the value on the JS object, create a getter that will retrieve the most up-to-date value from C++. For the vast majority of cases though, create a new prop on Socket.prototype[kLastWriteQueueSize] using a Symbol. Use this to track the current write size, entirely in JS land. PR-URL: nodejs#17650
1 parent 71a7ede commit f9a17f5

14 files changed

+126
-185
lines changed

lib/_tls_wrap.js

-5
Original file line numberDiff line numberDiff line change
@@ -460,11 +460,6 @@ TLSSocket.prototype._init = function(socket, wrap) {
460460
var options = this._tlsOptions;
461461
var ssl = this._handle;
462462

463-
// lib/net.js expect this value to be non-zero if write hasn't been flushed
464-
// immediately. After the handshake is done this will represent the actual
465-
// write queue size
466-
ssl.writeQueueSize = 1;
467-
468463
this.server = options.server;
469464

470465
// For clients, we will always have either a given ca list or be using

lib/net.js

+21-12
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ const { nextTick } = require('internal/process/next_tick');
4848
const errors = require('internal/errors');
4949
const dns = require('dns');
5050

51+
const kLastWriteQueueSize = Symbol('lastWriteQueueSize');
52+
5153
// `cluster` is only used by `listenInCluster` so for startup performance
5254
// reasons it's lazy loaded.
5355
var cluster = null;
@@ -198,6 +200,7 @@ function Socket(options) {
198200
this._handle = null;
199201
this._parent = null;
200202
this._host = null;
203+
this[kLastWriteQueueSize] = 0;
201204

202205
if (typeof options === 'number')
203206
options = { fd: options }; // Legacy interface.
@@ -398,12 +401,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {
398401

399402

400403
Socket.prototype._onTimeout = function() {
401-
if (this._handle) {
402-
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
404+
const handle = this._handle;
405+
const lastWriteQueueSize = this[kLastWriteQueueSize];
406+
if (lastWriteQueueSize > 0 && handle) {
407+
// `lastWriteQueueSize !== writeQueueSize` means there is
403408
// an active write in progress, so we suppress the timeout.
404-
const prevWriteQueueSize = this._handle.writeQueueSize;
405-
if (prevWriteQueueSize > 0 &&
406-
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
409+
const writeQueueSize = handle.writeQueueSize;
410+
if (lastWriteQueueSize !== writeQueueSize) {
411+
this[kLastWriteQueueSize] = writeQueueSize;
407412
this._unrefTimer();
408413
return;
409414
}
@@ -473,7 +478,7 @@ Object.defineProperty(Socket.prototype, 'readyState', {
473478
Object.defineProperty(Socket.prototype, 'bufferSize', {
474479
get: function() {
475480
if (this._handle) {
476-
return this._handle.writeQueueSize + this.writableLength;
481+
return this[kLastWriteQueueSize] + this.writableLength;
477482
}
478483
}
479484
});
@@ -764,12 +769,13 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
764769

765770
this._bytesDispatched += req.bytes;
766771

767-
// If it was entirely flushed, we can write some more right now.
768-
// However, if more is left in the queue, then wait until that clears.
769-
if (req.async && this._handle.writeQueueSize !== 0)
770-
req.cb = cb;
771-
else
772+
if (!req.async) {
772773
cb();
774+
return;
775+
}
776+
777+
req.cb = cb;
778+
this[kLastWriteQueueSize] = req.bytes;
773779
};
774780

775781

@@ -853,6 +859,9 @@ function afterWrite(status, handle, req, err) {
853859
if (self !== process.stderr && self !== process.stdout)
854860
debug('afterWrite', status);
855861

862+
if (req.async)
863+
self[kLastWriteQueueSize] = 0;
864+
856865
// callback may come after call to destroy.
857866
if (self.destroyed) {
858867
debug('afterWrite destroyed');
@@ -872,7 +881,7 @@ function afterWrite(status, handle, req, err) {
872881
debug('afterWrite call cb');
873882

874883
if (req.cb)
875-
req.cb.call(self);
884+
req.cb.call(undefined);
876885
}
877886

878887

src/pipe_wrap.cc

-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ PipeWrap::PipeWrap(Environment* env,
165165
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
166166
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
167167
// Suggestion: uv_pipe_init() returns void.
168-
UpdateWriteQueueSize();
169168
}
170169

171170

src/stream_base.cc

+10-3
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
193193
}
194194

195195
err = DoWrite(req_wrap, buf_list, count, nullptr);
196-
req_wrap_obj->Set(env->async(), True(env->isolate()));
196+
if (HasWriteQueue())
197+
req_wrap_obj->Set(env->async(), True(env->isolate()));
197198

198199
if (err)
199200
req_wrap->Dispose();
@@ -249,7 +250,8 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
249250
req_wrap = WriteWrap::New(env, req_wrap_obj, this);
250251

251252
err = DoWrite(req_wrap, bufs, count, nullptr);
252-
req_wrap_obj->Set(env->async(), True(env->isolate()));
253+
if (HasWriteQueue())
254+
req_wrap_obj->Set(env->async(), True(env->isolate()));
253255
req_wrap_obj->Set(env->buffer_string(), args[1]);
254256

255257
if (err)
@@ -373,7 +375,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
373375
reinterpret_cast<uv_stream_t*>(send_handle));
374376
}
375377

376-
req_wrap_obj->Set(env->async(), True(env->isolate()));
378+
if (HasWriteQueue())
379+
req_wrap_obj->Set(env->async(), True(env->isolate()));
377380

378381
if (err)
379382
req_wrap->Dispose();
@@ -467,6 +470,10 @@ int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
467470
return 0;
468471
}
469472

473+
bool StreamResource::HasWriteQueue() {
474+
return true;
475+
}
476+
470477

471478
const char* StreamResource::Error() const {
472479
return nullptr;

src/stream_base.h

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ class StreamResource {
162162
uv_buf_t* bufs,
163163
size_t count,
164164
uv_stream_t* send_handle) = 0;
165+
virtual bool HasWriteQueue();
165166
virtual const char* Error() const;
166167
virtual void ClearError();
167168

src/stream_wrap.cc

+28-20
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,15 @@
4040
namespace node {
4141

4242
using v8::Context;
43+
using v8::DontDelete;
4344
using v8::EscapableHandleScope;
4445
using v8::FunctionCallbackInfo;
4546
using v8::FunctionTemplate;
4647
using v8::HandleScope;
47-
using v8::Integer;
4848
using v8::Local;
4949
using v8::Object;
50+
using v8::ReadOnly;
51+
using v8::Signature;
5052
using v8::Value;
5153

5254

@@ -99,7 +101,16 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
99101
void LibuvStreamWrap::AddMethods(Environment* env,
100102
v8::Local<v8::FunctionTemplate> target,
101103
int flags) {
102-
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
104+
Local<FunctionTemplate> get_write_queue_size =
105+
FunctionTemplate::New(env->isolate(),
106+
GetWriteQueueSize,
107+
env->as_external(),
108+
Signature::New(env->isolate(), target));
109+
target->PrototypeTemplate()->SetAccessorProperty(
110+
env->write_queue_size_string(),
111+
get_write_queue_size,
112+
Local<FunctionTemplate>(),
113+
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
103114
env->SetProtoMethod(target, "setBlocking", SetBlocking);
104115
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
105116
}
@@ -135,17 +146,6 @@ bool LibuvStreamWrap::IsIPCPipe() {
135146
}
136147

137148

138-
uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
139-
HandleScope scope(env()->isolate());
140-
uint32_t write_queue_size = stream()->write_queue_size;
141-
object()->Set(env()->context(),
142-
env()->write_queue_size_string(),
143-
Integer::NewFromUnsigned(env()->isolate(),
144-
write_queue_size)).FromJust();
145-
return write_queue_size;
146-
}
147-
148-
149149
int LibuvStreamWrap::ReadStart() {
150150
return uv_read_start(stream(), OnAlloc, OnRead);
151151
}
@@ -267,13 +267,18 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
267267
}
268268

269269

270-
void LibuvStreamWrap::UpdateWriteQueueSize(
271-
const FunctionCallbackInfo<Value>& args) {
270+
void LibuvStreamWrap::GetWriteQueueSize(
271+
const FunctionCallbackInfo<Value>& info) {
272272
LibuvStreamWrap* wrap;
273-
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
273+
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
274+
275+
if (wrap->stream() == nullptr) {
276+
info.GetReturnValue().Set(0);
277+
return;
278+
}
274279

275-
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
276-
args.GetReturnValue().Set(write_queue_size);
280+
uint32_t write_queue_size = wrap->stream()->write_queue_size;
281+
info.GetReturnValue().Set(write_queue_size);
277282
}
278283

279284

@@ -370,12 +375,16 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
370375
}
371376

372377
w->Dispatched();
373-
UpdateWriteQueueSize();
374378

375379
return r;
376380
}
377381

378382

383+
bool LibuvStreamWrap::HasWriteQueue() {
384+
return stream()->write_queue_size > 0;
385+
}
386+
387+
379388
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
380389
WriteWrap* req_wrap = WriteWrap::from_req(req);
381390
CHECK_NE(req_wrap, nullptr);
@@ -387,7 +396,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
387396

388397
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
389398
StreamBase::AfterWrite(w, status);
390-
UpdateWriteQueueSize();
391399
}
392400

393401
} // namespace node

src/stream_wrap.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
5555
uv_buf_t* bufs,
5656
size_t count,
5757
uv_stream_t* send_handle) override;
58+
bool HasWriteQueue() override;
5859

5960
inline uv_stream_t* stream() const {
6061
return stream_;
@@ -83,15 +84,14 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
8384
}
8485

8586
AsyncWrap* GetAsyncWrap() override;
86-
uint32_t UpdateWriteQueueSize();
8787

8888
static void AddMethods(Environment* env,
8989
v8::Local<v8::FunctionTemplate> target,
9090
int flags = StreamBase::kFlagNone);
9191

9292
private:
93-
static void UpdateWriteQueueSize(
94-
const v8::FunctionCallbackInfo<v8::Value>& args);
93+
static void GetWriteQueueSize(
94+
const v8::FunctionCallbackInfo<v8::Value>& info);
9595
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
9696

9797
// Callbacks for libuv

src/tcp_wrap.cc

-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
169169
int r = uv_tcp_init(env->event_loop(), &handle_);
170170
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
171171
// Suggestion: uv_tcp_init() returns void.
172-
UpdateWriteQueueSize();
173172
}
174173

175174

src/tls_wrap.cc

+23-22
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,16 @@ namespace node {
3535
using crypto::SecureContext;
3636
using crypto::SSLWrap;
3737
using v8::Context;
38+
using v8::DontDelete;
3839
using v8::EscapableHandleScope;
3940
using v8::Exception;
4041
using v8::Function;
4142
using v8::FunctionCallbackInfo;
4243
using v8::FunctionTemplate;
43-
using v8::Integer;
4444
using v8::Local;
4545
using v8::Object;
46+
using v8::ReadOnly;
47+
using v8::Signature;
4648
using v8::String;
4749
using v8::Value;
4850

@@ -309,7 +311,6 @@ void TLSWrap::EncOut() {
309311

310312
// No data to write
311313
if (BIO_pending(enc_out_) == 0) {
312-
UpdateWriteQueueSize();
313314
if (clear_in_->Length() == 0)
314315
InvokeQueued(0);
315316
return;
@@ -555,17 +556,6 @@ bool TLSWrap::IsClosing() {
555556
}
556557

557558

558-
uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) {
559-
HandleScope scope(env()->isolate());
560-
if (write_queue_size == 0)
561-
write_queue_size = BIO_pending(enc_out_);
562-
object()->Set(env()->context(),
563-
env()->write_queue_size_string(),
564-
Integer::NewFromUnsigned(env()->isolate(),
565-
write_queue_size)).FromJust();
566-
return write_queue_size;
567-
}
568-
569559

570560
int TLSWrap::ReadStart() {
571561
if (stream_ != nullptr)
@@ -612,9 +602,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
612602
// However, if there is any data that should be written to the socket,
613603
// the callback should not be invoked immediately
614604
if (BIO_pending(enc_out_) == 0) {
615-
// net.js expects writeQueueSize to be > 0 if the write isn't
616-
// immediately flushed
617-
UpdateWriteQueueSize(1);
618605
return stream_->DoWrite(w, bufs, count, send_handle);
619606
}
620607
}
@@ -666,7 +653,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
666653

667654
// Try writing data immediately
668655
EncOut();
669-
UpdateWriteQueueSize();
670656

671657
return 0;
672658
}
@@ -938,12 +924,17 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
938924
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
939925

940926

941-
void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
927+
void TLSWrap::GetWriteQueueSize(const FunctionCallbackInfo<Value>& info) {
942928
TLSWrap* wrap;
943-
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
929+
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
944930

945-
uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
946-
args.GetReturnValue().Set(write_queue_size);
931+
if (wrap->clear_in_ == nullptr) {
932+
info.GetReturnValue().Set(0);
933+
return;
934+
}
935+
936+
uint32_t write_queue_size = BIO_pending(wrap->enc_out_);
937+
info.GetReturnValue().Set(write_queue_size);
947938
}
948939

949940

@@ -966,14 +957,24 @@ void TLSWrap::Initialize(Local<Object> target,
966957
t->InstanceTemplate()->SetInternalFieldCount(1);
967958
t->SetClassName(tlsWrapString);
968959

960+
Local<FunctionTemplate> get_write_queue_size =
961+
FunctionTemplate::New(env->isolate(),
962+
GetWriteQueueSize,
963+
env->as_external(),
964+
Signature::New(env->isolate(), t));
965+
t->PrototypeTemplate()->SetAccessorProperty(
966+
env->write_queue_size_string(),
967+
get_write_queue_size,
968+
Local<FunctionTemplate>(),
969+
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
970+
969971
AsyncWrap::AddWrapMethods(env, t, AsyncWrap::kFlagHasReset);
970972
env->SetProtoMethod(t, "receive", Receive);
971973
env->SetProtoMethod(t, "start", Start);
972974
env->SetProtoMethod(t, "setVerifyMode", SetVerifyMode);
973975
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
974976
env->SetProtoMethod(t, "destroySSL", DestroySSL);
975977
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
976-
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);
977978

978979
StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
979980
SSLWrap<TLSWrap>::AddMethods(env, t);

0 commit comments

Comments
 (0)