Skip to content

Commit 008336c

Browse files
addaleaxMylesBorins
authored andcommittedJan 8, 2018
src: minor refactoring to StreamBase writes
Instead of having per-request callbacks, always call a callback on the `StreamBase` instance itself for `WriteWrap` and `ShutdownWrap`. This makes `WriteWrap` cleanup consistent for all stream classes, since the after-write callback is always the same now. If special handling is needed for writes that happen to a sub-class, `AfterWrite` can be overridden by that class, rather than that class providing its own callback (e.g. updating the write queue size for libuv streams). If special handling is needed for writes that happen on another stream instance, the existing `after_write_cb()` callback is used for that (e.g. custom code after writing to the transport from a TLS stream). As a nice bonus, this also makes `WriteWrap` and `ShutdownWrap` instances slightly smaller. PR-URL: #17564 Reviewed-By: Anatoli Papirovski <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 7ed9e5d commit 008336c

9 files changed

+73
-87
lines changed
 

‎src/js_stream.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
176176
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
177177
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());
178178

179-
wrap->OnAfterWrite(w);
179+
w->Done(0);
180180
}
181181

182182

‎src/node_http2.cc

+1-4
Original file line numberDiff line numberDiff line change
@@ -975,9 +975,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {
975975

976976
WriteWrap* Http2Session::AllocateSend() {
977977
HandleScope scope(env()->isolate());
978-
auto AfterWrite = [](WriteWrap* req, int status) {
979-
req->Dispose();
980-
};
981978
Local<Object> obj =
982979
env()->write_wrap_constructor_function()
983980
->NewInstance(env()->context()).ToLocalChecked();
@@ -987,7 +984,7 @@ WriteWrap* Http2Session::AllocateSend() {
987984
session(),
988985
NGHTTP2_SETTINGS_MAX_FRAME_SIZE);
989986
// Max frame size + 9 bytes for the header
990-
return WriteWrap::New(env(), obj, stream_, AfterWrite, size + 9);
987+
return WriteWrap::New(env(), obj, stream_, size + 9);
991988
}
992989

993990
void Http2Session::Send(WriteWrap* req, char* buf, size_t length) {

‎src/stream_base-inl.h

+10-2
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,19 @@ void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
143143
}
144144

145145

146+
inline void ShutdownWrap::OnDone(int status) {
147+
stream()->AfterShutdown(this, status);
148+
}
149+
150+
146151
WriteWrap* WriteWrap::New(Environment* env,
147152
Local<Object> obj,
148153
StreamBase* wrap,
149-
DoneCb cb,
150154
size_t extra) {
151155
size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra;
152156
char* storage = new char[storage_size];
153157

154-
return new(storage) WriteWrap(env, obj, wrap, cb, storage_size);
158+
return new(storage) WriteWrap(env, obj, wrap, storage_size);
155159
}
156160

157161

@@ -171,6 +175,10 @@ size_t WriteWrap::ExtraSize() const {
171175
return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize);
172176
}
173177

178+
inline void WriteWrap::OnDone(int status) {
179+
stream()->AfterWrite(this, status);
180+
}
181+
174182
} // namespace node
175183

176184
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

‎src/stream_base.cc

+9-12
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
5555
env->set_init_trigger_async_id(wrap->get_async_id());
5656
ShutdownWrap* req_wrap = new ShutdownWrap(env,
5757
req_wrap_obj,
58-
this,
59-
AfterShutdown);
58+
this);
6059

6160
int err = DoShutdown(req_wrap);
6261
if (err)
@@ -66,7 +65,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
6665

6766

6867
void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
69-
StreamBase* wrap = req_wrap->wrap();
7068
Environment* env = req_wrap->env();
7169

7270
// The wrap and request objects should still be there.
@@ -78,7 +76,7 @@ void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
7876
Local<Object> req_wrap_obj = req_wrap->object();
7977
Local<Value> argv[3] = {
8078
Integer::New(env->isolate(), status),
81-
wrap->GetObject(),
79+
GetObject(),
8280
req_wrap_obj
8381
};
8482

@@ -158,7 +156,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
158156
wrap = GetAsyncWrap();
159157
CHECK_NE(wrap, nullptr);
160158
env->set_init_trigger_async_id(wrap->get_async_id());
161-
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
159+
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
162160

163161
offset = 0;
164162
if (!all_buffers) {
@@ -248,7 +246,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
248246
if (wrap != nullptr)
249247
env->set_init_trigger_async_id(wrap->get_async_id());
250248
// Allocate, or write rest
251-
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
249+
req_wrap = WriteWrap::New(env, req_wrap_obj, this);
252250

253251
err = DoWrite(req_wrap, bufs, count, nullptr);
254252
req_wrap_obj->Set(env->async(), True(env->isolate()));
@@ -332,7 +330,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
332330
wrap = GetAsyncWrap();
333331
if (wrap != nullptr)
334332
env->set_init_trigger_async_id(wrap->get_async_id());
335-
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
333+
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
336334

337335
data = req_wrap->Extra();
338336

@@ -393,7 +391,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
393391

394392

395393
void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
396-
StreamBase* wrap = req_wrap->wrap();
397394
Environment* env = req_wrap->env();
398395

399396
HandleScope handle_scope(env->isolate());
@@ -405,19 +402,19 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
405402
// Unref handle property
406403
Local<Object> req_wrap_obj = req_wrap->object();
407404
req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
408-
wrap->OnAfterWrite(req_wrap);
405+
OnAfterWrite(req_wrap, status);
409406

410407
Local<Value> argv[] = {
411408
Integer::New(env->isolate(), status),
412-
wrap->GetObject(),
409+
GetObject(),
413410
req_wrap_obj,
414411
Undefined(env->isolate())
415412
};
416413

417-
const char* msg = wrap->Error();
414+
const char* msg = Error();
418415
if (msg != nullptr) {
419416
argv[3] = OneByteString(env->isolate(), msg);
420-
wrap->ClearError();
417+
ClearError();
421418
}
422419

423420
if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())

‎src/stream_base.h

+27-39
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,37 @@ namespace node {
1616
// Forward declarations
1717
class StreamBase;
1818

19-
template <class Req>
19+
template<typename Base>
2020
class StreamReq {
2121
public:
22-
typedef void (*DoneCb)(Req* req, int status);
23-
24-
explicit StreamReq(DoneCb cb) : cb_(cb) {
22+
explicit StreamReq(StreamBase* stream) : stream_(stream) {
2523
}
2624

2725
inline void Done(int status, const char* error_str = nullptr) {
28-
Req* req = static_cast<Req*>(this);
26+
Base* req = static_cast<Base*>(this);
2927
Environment* env = req->env();
3028
if (error_str != nullptr) {
3129
req->object()->Set(env->error_string(),
3230
OneByteString(env->isolate(), error_str));
3331
}
3432

35-
cb_(req, status);
33+
req->OnDone(status);
3634
}
3735

36+
inline StreamBase* stream() const { return stream_; }
37+
3838
private:
39-
DoneCb cb_;
39+
StreamBase* const stream_;
4040
};
4141

4242
class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
4343
public StreamReq<ShutdownWrap> {
4444
public:
4545
ShutdownWrap(Environment* env,
4646
v8::Local<v8::Object> req_wrap_obj,
47-
StreamBase* wrap,
48-
DoneCb cb)
47+
StreamBase* stream)
4948
: ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
50-
StreamReq<ShutdownWrap>(cb),
51-
wrap_(wrap) {
49+
StreamReq<ShutdownWrap>(stream) {
5250
Wrap(req_wrap_obj, this);
5351
}
5452

@@ -60,27 +58,22 @@ class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
6058
return ContainerOf(&ShutdownWrap::req_, req);
6159
}
6260

63-
inline StreamBase* wrap() const { return wrap_; }
6461
size_t self_size() const override { return sizeof(*this); }
6562

66-
private:
67-
StreamBase* const wrap_;
63+
inline void OnDone(int status); // Just calls stream()->AfterShutdown()
6864
};
6965

70-
class WriteWrap: public ReqWrap<uv_write_t>,
71-
public StreamReq<WriteWrap> {
66+
class WriteWrap : public ReqWrap<uv_write_t>,
67+
public StreamReq<WriteWrap> {
7268
public:
7369
static inline WriteWrap* New(Environment* env,
7470
v8::Local<v8::Object> obj,
75-
StreamBase* wrap,
76-
DoneCb cb,
71+
StreamBase* stream,
7772
size_t extra = 0);
7873
inline void Dispose();
7974
inline char* Extra(size_t offset = 0);
8075
inline size_t ExtraSize() const;
8176

82-
inline StreamBase* wrap() const { return wrap_; }
83-
8477
size_t self_size() const override { return storage_size_; }
8578

8679
static WriteWrap* from_req(uv_write_t* req) {
@@ -91,24 +84,22 @@ class WriteWrap: public ReqWrap<uv_write_t>,
9184

9285
WriteWrap(Environment* env,
9386
v8::Local<v8::Object> obj,
94-
StreamBase* wrap,
95-
DoneCb cb)
87+
StreamBase* stream)
9688
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
97-
StreamReq<WriteWrap>(cb),
98-
wrap_(wrap),
89+
StreamReq<WriteWrap>(stream),
9990
storage_size_(0) {
10091
Wrap(obj, this);
10192
}
10293

94+
inline void OnDone(int status); // Just calls stream()->AfterWrite()
95+
10396
protected:
10497
WriteWrap(Environment* env,
10598
v8::Local<v8::Object> obj,
106-
StreamBase* wrap,
107-
DoneCb cb,
99+
StreamBase* stream,
108100
size_t storage_size)
109101
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
110-
StreamReq<WriteWrap>(cb),
111-
wrap_(wrap),
102+
StreamReq<WriteWrap>(stream),
112103
storage_size_(storage_size) {
113104
Wrap(obj, this);
114105
}
@@ -129,7 +120,6 @@ class WriteWrap: public ReqWrap<uv_write_t>,
129120
// WriteWrap. Ensure this never happens.
130121
void operator delete(void* ptr) { UNREACHABLE(); }
131122

132-
StreamBase* const wrap_;
133123
const size_t storage_size_;
134124
};
135125

@@ -151,7 +141,7 @@ class StreamResource {
151141
void* ctx;
152142
};
153143

154-
typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
144+
typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx);
155145
typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
156146
typedef void (*ReadCb)(ssize_t nread,
157147
const uv_buf_t* buf,
@@ -176,9 +166,9 @@ class StreamResource {
176166
virtual void ClearError();
177167

178168
// Events
179-
inline void OnAfterWrite(WriteWrap* w) {
169+
inline void OnAfterWrite(WriteWrap* w, int status) {
180170
if (!after_write_cb_.is_empty())
181-
after_write_cb_.fn(w, after_write_cb_.ctx);
171+
after_write_cb_.fn(w, status, after_write_cb_.ctx);
182172
}
183173

184174
inline void OnAlloc(size_t size, uv_buf_t* buf) {
@@ -208,14 +198,12 @@ class StreamResource {
208198
inline Callback<ReadCb> read_cb() { return read_cb_; }
209199
inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }
210200

211-
private:
201+
protected:
212202
Callback<AfterWriteCb> after_write_cb_;
213203
Callback<AllocCb> alloc_cb_;
214204
Callback<ReadCb> read_cb_;
215205
Callback<DestructCb> destruct_cb_;
216206
uint64_t bytes_read_;
217-
218-
friend class StreamBase;
219207
};
220208

221209
class StreamBase : public StreamResource {
@@ -253,6 +241,10 @@ class StreamBase : public StreamResource {
253241
v8::Local<v8::Object> buf,
254242
v8::Local<v8::Object> handle);
255243

244+
// These are called by the respective {Write,Shutdown}Wrap class.
245+
virtual void AfterShutdown(ShutdownWrap* req, int status);
246+
virtual void AfterWrite(WriteWrap* req, int status);
247+
256248
protected:
257249
explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
258250
}
@@ -263,10 +255,6 @@ class StreamBase : public StreamResource {
263255
virtual AsyncWrap* GetAsyncWrap() = 0;
264256
virtual v8::Local<v8::Object> GetObject();
265257

266-
// Libuv callbacks
267-
static void AfterShutdown(ShutdownWrap* req, int status);
268-
static void AfterWrite(WriteWrap* req, int status);
269-
270258
// JS Methods
271259
int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
272260
int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);

‎src/stream_wrap.cc

+8-9
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
9191
provider),
9292
StreamBase(env),
9393
stream_(stream) {
94-
set_after_write_cb({ OnAfterWriteImpl, this });
9594
set_alloc_cb({ OnAllocImpl, this });
9695
set_read_cb({ OnReadImpl, this });
9796
}
@@ -293,13 +292,13 @@ void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
293292

294293
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
295294
int err;
296-
err = uv_shutdown(req_wrap->req(), stream(), AfterShutdown);
295+
err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown);
297296
req_wrap->Dispatched();
298297
return err;
299298
}
300299

301300

302-
void LibuvStreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
301+
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
303302
ShutdownWrap* req_wrap = ShutdownWrap::from_req(req);
304303
CHECK_NE(req_wrap, nullptr);
305304
HandleScope scope(req_wrap->env()->isolate());
@@ -354,9 +353,9 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
354353
uv_stream_t* send_handle) {
355354
int r;
356355
if (send_handle == nullptr) {
357-
r = uv_write(w->req(), stream(), bufs, count, AfterWrite);
356+
r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite);
358357
} else {
359-
r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite);
358+
r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterUvWrite);
360359
}
361360

362361
if (!r) {
@@ -377,7 +376,7 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
377376
}
378377

379378

380-
void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) {
379+
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
381380
WriteWrap* req_wrap = WriteWrap::from_req(req);
382381
CHECK_NE(req_wrap, nullptr);
383382
HandleScope scope(req_wrap->env()->isolate());
@@ -386,9 +385,9 @@ void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) {
386385
}
387386

388387

389-
void LibuvStreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
390-
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(ctx);
391-
wrap->UpdateWriteQueueSize();
388+
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
389+
StreamBase::AfterWrite(w, status);
390+
UpdateWriteQueueSize();
392391
}
393392

394393
} // namespace node

‎src/stream_wrap.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,18 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
102102
static void OnRead(uv_stream_t* handle,
103103
ssize_t nread,
104104
const uv_buf_t* buf);
105-
static void AfterWrite(uv_write_t* req, int status);
106-
static void AfterShutdown(uv_shutdown_t* req, int status);
105+
static void AfterUvWrite(uv_write_t* req, int status);
106+
static void AfterUvShutdown(uv_shutdown_t* req, int status);
107107

108108
// Resource interface implementation
109-
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
110109
static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
111110
static void OnReadImpl(ssize_t nread,
112111
const uv_buf_t* buf,
113112
uv_handle_type pending,
114113
void* ctx);
115114

115+
void AfterWrite(WriteWrap* req_wrap, int status) override;
116+
116117
uv_stream_t* const stream_;
117118
};
118119

‎src/tls_wrap.cc

+11-15
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,7 @@ void TLSWrap::EncOut() {
328328
->NewInstance(env()->context()).ToLocalChecked();
329329
WriteWrap* write_req = WriteWrap::New(env(),
330330
req_wrap_obj,
331-
this,
332-
EncOutCb);
331+
stream_);
333332

334333
uv_buf_t buf[arraysize(data)];
335334
for (size_t i = 0; i < count; i++)
@@ -346,34 +345,31 @@ void TLSWrap::EncOut() {
346345
}
347346

348347

349-
void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) {
350-
TLSWrap* wrap = static_cast<TLSWrap*>(req_wrap->wrap());
351-
req_wrap->Dispose();
352-
348+
void TLSWrap::EncOutAfterWrite(WriteWrap* req_wrap, int status) {
353349
// We should not be getting here after `DestroySSL`, because all queued writes
354350
// must be invoked with UV_ECANCELED
355-
CHECK_NE(wrap->ssl_, nullptr);
351+
CHECK_NE(ssl_, nullptr);
356352

357353
// Handle error
358354
if (status) {
359355
// Ignore errors after shutdown
360-
if (wrap->shutdown_)
356+
if (shutdown_)
361357
return;
362358

363359
// Notify about error
364-
wrap->InvokeQueued(status);
360+
InvokeQueued(status);
365361
return;
366362
}
367363

368364
// Commit
369-
crypto::NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_);
365+
crypto::NodeBIO::FromBIO(enc_out_)->Read(nullptr, write_size_);
370366

371367
// Ensure that the progress will be made and `InvokeQueued` will be called.
372-
wrap->ClearIn();
368+
ClearIn();
373369

374370
// Try writing more data
375-
wrap->write_size_ = 0;
376-
wrap->EncOut();
371+
write_size_ = 0;
372+
EncOut();
377373
}
378374

379375

@@ -676,9 +672,9 @@ int TLSWrap::DoWrite(WriteWrap* w,
676672
}
677673

678674

679-
void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
675+
void TLSWrap::OnAfterWriteImpl(WriteWrap* w, int status, void* ctx) {
680676
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
681-
wrap->UpdateWriteQueueSize();
677+
wrap->EncOutAfterWrite(w, status);
682678
}
683679

684680

‎src/tls_wrap.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class TLSWrap : public AsyncWrap,
111111
static void SSLInfoCallback(const SSL* ssl_, int where, int ret);
112112
void InitSSL();
113113
void EncOut();
114-
static void EncOutCb(WriteWrap* req_wrap, int status);
114+
void EncOutAfterWrite(WriteWrap* req_wrap, int status);
115115
bool ClearIn();
116116
void ClearOut();
117117
void MakePending();
@@ -134,7 +134,7 @@ class TLSWrap : public AsyncWrap,
134134
uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0);
135135

136136
// Resource implementation
137-
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
137+
static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx);
138138
static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
139139
static void OnReadImpl(ssize_t nread,
140140
const uv_buf_t* buf,

0 commit comments

Comments
 (0)
Please sign in to comment.