Skip to content

Commit 2347ce8

Browse files
committed
src: unify thread pool work
Instead of using the libuv mechanism directly, provide an internal `ThreadPoolWork` wrapper that takes care of increasing/decreasing the waiting request counter. PR-URL: #19377 Reviewed-By: Ben Noordhuis <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 7153bec commit 2347ce8

File tree

4 files changed

+108
-128
lines changed

4 files changed

+108
-128
lines changed

src/node_api.cc

+28-45
Original file line numberDiff line numberDiff line change
@@ -3338,7 +3338,7 @@ static napi_status ConvertUVErrorCode(int code) {
33383338
}
33393339

33403340
// Wrapper around uv_work_t which calls user-provided callbacks.
3341-
class Work : public node::AsyncResource {
3341+
class Work : public node::AsyncResource, public node::ThreadPoolWork {
33423342
private:
33433343
explicit Work(napi_env env,
33443344
v8::Local<v8::Object> async_resource,
@@ -3349,15 +3349,14 @@ class Work : public node::AsyncResource {
33493349
: AsyncResource(env->isolate,
33503350
async_resource,
33513351
*v8::String::Utf8Value(env->isolate, async_resource_name)),
3352-
_env(env),
3353-
_data(data),
3354-
_execute(execute),
3355-
_complete(complete) {
3356-
memset(&_request, 0, sizeof(_request));
3357-
_request.data = this;
3352+
ThreadPoolWork(node::Environment::GetCurrent(env->isolate)),
3353+
_env(env),
3354+
_data(data),
3355+
_execute(execute),
3356+
_complete(complete) {
33583357
}
33593358

3360-
~Work() { }
3359+
virtual ~Work() { }
33613360

33623361
public:
33633362
static Work* New(napi_env env,
@@ -3374,47 +3373,36 @@ class Work : public node::AsyncResource {
33743373
delete work;
33753374
}
33763375

3377-
static void ExecuteCallback(uv_work_t* req) {
3378-
Work* work = static_cast<Work*>(req->data);
3379-
work->_execute(work->_env, work->_data);
3376+
void DoThreadPoolWork() override {
3377+
_execute(_env, _data);
33803378
}
33813379

3382-
static void CompleteCallback(uv_work_t* req, int status) {
3383-
Work* work = static_cast<Work*>(req->data);
3380+
void AfterThreadPoolWork(int status) {
3381+
if (_complete == nullptr)
3382+
return;
33843383

3385-
if (work->_complete != nullptr) {
3386-
napi_env env = work->_env;
3384+
// Establish a handle scope here so that every callback doesn't have to.
3385+
// Also it is needed for the exception-handling below.
3386+
v8::HandleScope scope(_env->isolate);
33873387

3388-
// Establish a handle scope here so that every callback doesn't have to.
3389-
// Also it is needed for the exception-handling below.
3390-
v8::HandleScope scope(env->isolate);
3391-
node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
3392-
env_->DecreaseWaitingRequestCounter();
3388+
CallbackScope callback_scope(this);
33933389

3394-
CallbackScope callback_scope(work);
3390+
NAPI_CALL_INTO_MODULE(_env,
3391+
_complete(_env, ConvertUVErrorCode(status), _data),
3392+
[this] (v8::Local<v8::Value> local_err) {
3393+
// If there was an unhandled exception in the complete callback,
3394+
// report it as a fatal exception. (There is no JavaScript on the
3395+
// callstack that can possibly handle it.)
3396+
v8impl::trigger_fatal_exception(_env, local_err);
3397+
});
33953398

3396-
NAPI_CALL_INTO_MODULE(env,
3397-
work->_complete(env, ConvertUVErrorCode(status), work->_data),
3398-
[env] (v8::Local<v8::Value> local_err) {
3399-
// If there was an unhandled exception in the complete callback,
3400-
// report it as a fatal exception. (There is no JavaScript on the
3401-
// callstack that can possibly handle it.)
3402-
v8impl::trigger_fatal_exception(env, local_err);
3403-
});
3404-
3405-
// Note: Don't access `work` after this point because it was
3406-
// likely deleted by the complete callback.
3407-
}
3408-
}
3409-
3410-
uv_work_t* Request() {
3411-
return &_request;
3399+
// Note: Don't access `work` after this point because it was
3400+
// likely deleted by the complete callback.
34123401
}
34133402

34143403
private:
34153404
napi_env _env;
34163405
void* _data;
3417-
uv_work_t _request;
34183406
napi_async_execute_callback _execute;
34193407
napi_async_complete_callback _complete;
34203408
};
@@ -3491,12 +3479,7 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
34913479

34923480
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
34933481

3494-
node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
3495-
env_->IncreaseWaitingRequestCounter();
3496-
CALL_UV(env, uv_queue_work(event_loop,
3497-
w->Request(),
3498-
uvimpl::Work::ExecuteCallback,
3499-
uvimpl::Work::CompleteCallback));
3482+
w->ScheduleWork();
35003483

35013484
return napi_clear_last_error(env);
35023485
}
@@ -3507,7 +3490,7 @@ napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
35073490

35083491
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
35093492

3510-
CALL_UV(env, uv_cancel(reinterpret_cast<uv_req_t*>(w->Request())));
3493+
CALL_UV(env, w->CancelWork());
35113494

35123495
return napi_clear_last_error(env);
35133496
}

src/node_crypto.cc

+30-69
Original file line numberDiff line numberDiff line change
@@ -4562,7 +4562,7 @@ bool ECDH::IsKeyPairValid() {
45624562
}
45634563

45644564

4565-
class PBKDF2Request : public AsyncWrap {
4565+
class PBKDF2Request : public AsyncWrap, public ThreadPoolWork {
45664566
public:
45674567
PBKDF2Request(Environment* env,
45684568
Local<Object> object,
@@ -4572,6 +4572,7 @@ class PBKDF2Request : public AsyncWrap {
45724572
int keylen,
45734573
int iteration_count)
45744574
: AsyncWrap(env, object, AsyncWrap::PROVIDER_PBKDF2REQUEST),
4575+
ThreadPoolWork(env),
45754576
digest_(digest),
45764577
success_(false),
45774578
pass_(std::move(pass)),
@@ -4580,21 +4581,14 @@ class PBKDF2Request : public AsyncWrap {
45804581
iteration_count_(iteration_count) {
45814582
}
45824583

4583-
uv_work_t* work_req() {
4584-
return &work_req_;
4585-
}
4586-
45874584
size_t self_size() const override { return sizeof(*this); }
45884585

4589-
static void Work(uv_work_t* work_req);
4590-
void Work();
4586+
void DoThreadPoolWork() override;
4587+
void AfterThreadPoolWork(int status) override;
45914588

4592-
static void After(uv_work_t* work_req, int status);
45934589
void After(Local<Value> (*argv)[2]);
4594-
void After();
45954590

45964591
private:
4597-
uv_work_t work_req_;
45984592
const EVP_MD* digest_;
45994593
bool success_;
46004594
MallocedBuffer<char> pass_;
@@ -4604,7 +4598,7 @@ class PBKDF2Request : public AsyncWrap {
46044598
};
46054599

46064600

4607-
void PBKDF2Request::Work() {
4601+
void PBKDF2Request::DoThreadPoolWork() {
46084602
success_ =
46094603
PKCS5_PBKDF2_HMAC(
46104604
pass_.data, pass_.size,
@@ -4617,12 +4611,6 @@ void PBKDF2Request::Work() {
46174611
}
46184612

46194613

4620-
void PBKDF2Request::Work(uv_work_t* work_req) {
4621-
PBKDF2Request* req = ContainerOf(&PBKDF2Request::work_req_, work_req);
4622-
req->Work();
4623-
}
4624-
4625-
46264614
void PBKDF2Request::After(Local<Value> (*argv)[2]) {
46274615
if (success_) {
46284616
(*argv)[0] = Null(env()->isolate());
@@ -4635,7 +4623,12 @@ void PBKDF2Request::After(Local<Value> (*argv)[2]) {
46354623
}
46364624

46374625

4638-
void PBKDF2Request::After() {
4626+
void PBKDF2Request::AfterThreadPoolWork(int status) {
4627+
std::unique_ptr<PBKDF2Request> req(this);
4628+
if (status == UV_ECANCELED)
4629+
return;
4630+
CHECK_EQ(status, 0);
4631+
46394632
HandleScope handle_scope(env()->isolate());
46404633
Context::Scope context_scope(env()->context());
46414634
Local<Value> argv[2];
@@ -4644,17 +4637,6 @@ void PBKDF2Request::After() {
46444637
}
46454638

46464639

4647-
void PBKDF2Request::After(uv_work_t* work_req, int status) {
4648-
std::unique_ptr<PBKDF2Request> req(
4649-
ContainerOf(&PBKDF2Request::work_req_, work_req));
4650-
req->env()->DecreaseWaitingRequestCounter();
4651-
if (status == UV_ECANCELED)
4652-
return;
4653-
CHECK_EQ(status, 0);
4654-
req->After();
4655-
}
4656-
4657-
46584640
void PBKDF2(const FunctionCallbackInfo<Value>& args) {
46594641
Environment* env = Environment::GetCurrent(args);
46604642

@@ -4701,14 +4683,10 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
47014683
if (args[5]->IsFunction()) {
47024684
obj->Set(env->context(), env->ondone_string(), args[5]).FromJust();
47034685

4704-
env->IncreaseWaitingRequestCounter();
4705-
uv_queue_work(env->event_loop(),
4706-
req.release()->work_req(),
4707-
PBKDF2Request::Work,
4708-
PBKDF2Request::After);
4686+
req.release()->ScheduleWork();
47094687
} else {
47104688
env->PrintSyncTrace();
4711-
req->Work();
4689+
req->DoThreadPoolWork();
47124690
Local<Value> argv[2];
47134691
req->After(&argv);
47144692

@@ -4721,7 +4699,7 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
47214699

47224700

47234701
// Only instantiate within a valid HandleScope.
4724-
class RandomBytesRequest : public AsyncWrap {
4702+
class RandomBytesRequest : public AsyncWrap, public ThreadPoolWork {
47254703
public:
47264704
enum FreeMode { FREE_DATA, DONT_FREE_DATA };
47274705

@@ -4731,16 +4709,13 @@ class RandomBytesRequest : public AsyncWrap {
47314709
char* data,
47324710
FreeMode free_mode)
47334711
: AsyncWrap(env, object, AsyncWrap::PROVIDER_RANDOMBYTESREQUEST),
4712+
ThreadPoolWork(env),
47344713
error_(0),
47354714
size_(size),
47364715
data_(data),
47374716
free_mode_(free_mode) {
47384717
}
47394718

4740-
uv_work_t* work_req() {
4741-
return &work_req_;
4742-
}
4743-
47444719
inline size_t size() const {
47454720
return size_;
47464721
}
@@ -4778,7 +4753,8 @@ class RandomBytesRequest : public AsyncWrap {
47784753

47794754
size_t self_size() const override { return sizeof(*this); }
47804755

4781-
uv_work_t work_req_;
4756+
void DoThreadPoolWork() override;
4757+
void AfterThreadPoolWork(int status) override;
47824758

47834759
private:
47844760
unsigned long error_; // NOLINT(runtime/int)
@@ -4788,21 +4764,17 @@ class RandomBytesRequest : public AsyncWrap {
47884764
};
47894765

47904766

4791-
void RandomBytesWork(uv_work_t* work_req) {
4792-
RandomBytesRequest* req =
4793-
ContainerOf(&RandomBytesRequest::work_req_, work_req);
4794-
4767+
void RandomBytesRequest::DoThreadPoolWork() {
47954768
// Ensure that OpenSSL's PRNG is properly seeded.
47964769
CheckEntropy();
47974770

4798-
const int r = RAND_bytes(reinterpret_cast<unsigned char*>(req->data()),
4799-
req->size());
4771+
const int r = RAND_bytes(reinterpret_cast<unsigned char*>(data_), size_);
48004772

48014773
// RAND_bytes() returns 0 on error.
48024774
if (r == 0) {
4803-
req->set_error(ERR_get_error()); // NOLINT(runtime/int)
4775+
set_error(ERR_get_error()); // NOLINT(runtime/int)
48044776
} else if (r == -1) {
4805-
req->set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int)
4777+
set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int)
48064778
}
48074779
}
48084780

@@ -4840,27 +4812,24 @@ void RandomBytesCheck(RandomBytesRequest* req, Local<Value> (*argv)[2]) {
48404812
}
48414813

48424814

4843-
void RandomBytesAfter(uv_work_t* work_req, int status) {
4844-
std::unique_ptr<RandomBytesRequest> req(
4845-
ContainerOf(&RandomBytesRequest::work_req_, work_req));
4846-
Environment* env = req->env();
4847-
env->DecreaseWaitingRequestCounter();
4815+
void RandomBytesRequest::AfterThreadPoolWork(int status) {
4816+
std::unique_ptr<RandomBytesRequest> req(this);
48484817
if (status == UV_ECANCELED)
48494818
return;
48504819
CHECK_EQ(status, 0);
4851-
HandleScope handle_scope(env->isolate());
4852-
Context::Scope context_scope(env->context());
4820+
HandleScope handle_scope(env()->isolate());
4821+
Context::Scope context_scope(env()->context());
48534822
Local<Value> argv[2];
4854-
RandomBytesCheck(req.get(), &argv);
4855-
req->MakeCallback(env->ondone_string(), arraysize(argv), argv);
4823+
RandomBytesCheck(this, &argv);
4824+
MakeCallback(env()->ondone_string(), arraysize(argv), argv);
48564825
}
48574826

48584827

48594828
void RandomBytesProcessSync(Environment* env,
48604829
std::unique_ptr<RandomBytesRequest> req,
48614830
Local<Value> (*argv)[2]) {
48624831
env->PrintSyncTrace();
4863-
RandomBytesWork(req->work_req());
4832+
req->DoThreadPoolWork();
48644833
RandomBytesCheck(req.get(), argv);
48654834

48664835
if (!(*argv)[0]->IsNull())
@@ -4887,11 +4856,7 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
48874856
if (args[1]->IsFunction()) {
48884857
obj->Set(env->context(), env->ondone_string(), args[1]).FromJust();
48894858

4890-
env->IncreaseWaitingRequestCounter();
4891-
uv_queue_work(env->event_loop(),
4892-
req.release()->work_req(),
4893-
RandomBytesWork,
4894-
RandomBytesAfter);
4859+
req.release()->ScheduleWork();
48954860
args.GetReturnValue().Set(obj);
48964861
} else {
48974862
Local<Value> argv[2];
@@ -4927,11 +4892,7 @@ void RandomBytesBuffer(const FunctionCallbackInfo<Value>& args) {
49274892
if (args[3]->IsFunction()) {
49284893
obj->Set(env->context(), env->ondone_string(), args[3]).FromJust();
49294894

4930-
env->IncreaseWaitingRequestCounter();
4931-
uv_queue_work(env->event_loop(),
4932-
req.release()->work_req(),
4933-
RandomBytesWork,
4934-
RandomBytesAfter);
4895+
req.release()->ScheduleWork();
49354896
args.GetReturnValue().Set(obj);
49364897
} else {
49374898
Local<Value> argv[2];

src/node_internals.h

+35
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,41 @@ class InternalCallbackScope {
508508
bool closed_ = false;
509509
};
510510

511+
class ThreadPoolWork {
512+
public:
513+
explicit inline ThreadPoolWork(Environment* env) : env_(env) {}
514+
inline void ScheduleWork();
515+
inline int CancelWork();
516+
517+
virtual void DoThreadPoolWork() = 0;
518+
virtual void AfterThreadPoolWork(int status) = 0;
519+
520+
private:
521+
Environment* env_;
522+
uv_work_t work_req_;
523+
};
524+
525+
void ThreadPoolWork::ScheduleWork() {
526+
env_->IncreaseWaitingRequestCounter();
527+
int status = uv_queue_work(
528+
env_->event_loop(),
529+
&work_req_,
530+
[](uv_work_t* req) {
531+
ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
532+
self->DoThreadPoolWork();
533+
},
534+
[](uv_work_t* req, int status) {
535+
ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
536+
self->env_->DecreaseWaitingRequestCounter();
537+
self->AfterThreadPoolWork(status);
538+
});
539+
CHECK_EQ(status, 0);
540+
}
541+
542+
int ThreadPoolWork::CancelWork() {
543+
return uv_cancel(reinterpret_cast<uv_req_t*>(&work_req_));
544+
}
545+
511546
static inline const char *errno_string(int errorno) {
512547
#define ERRNO_CASE(e) case e: return #e;
513548
switch (errorno) {

0 commit comments

Comments
 (0)