Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 96213c8

Browse files
Gabriel Schulhoftargos
Gabriel Schulhof
authored andcommittedSep 3, 2018
n-api: clean up thread-safe function
* Move class `TsFn` to name space `v8impl` and rename it to `ThreadSafeFunction` * Remove `NAPI_EXTERN` from API declarations, because it's only needed in the header file. PR-URL: #22259 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Kyle Farnung <[email protected]> Reviewed-By: Michael Dawson <[email protected]>
1 parent 5c2a6d8 commit 96213c8

File tree

1 file changed

+439
-434
lines changed

1 file changed

+439
-434
lines changed
 

‎src/node_api.cc

+439-434
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,336 @@ napi_status ConcludeDeferred(napi_env env,
827827
return GET_RETURN_STATUS(env);
828828
}
829829

830+
class ThreadSafeFunction : public node::AsyncResource {
831+
public:
832+
ThreadSafeFunction(v8::Local<v8::Function> func,
833+
v8::Local<v8::Object> resource,
834+
v8::Local<v8::String> name,
835+
size_t thread_count_,
836+
void* context_,
837+
size_t max_queue_size_,
838+
napi_env env_,
839+
void* finalize_data_,
840+
napi_finalize finalize_cb_,
841+
napi_threadsafe_function_call_js call_js_cb_):
842+
AsyncResource(env_->isolate,
843+
resource,
844+
*v8::String::Utf8Value(env_->isolate, name)),
845+
thread_count(thread_count_),
846+
is_closing(false),
847+
context(context_),
848+
max_queue_size(max_queue_size_),
849+
env(env_),
850+
finalize_data(finalize_data_),
851+
finalize_cb(finalize_cb_),
852+
call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
853+
handles_closing(false) {
854+
ref.Reset(env->isolate, func);
855+
node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
856+
}
857+
858+
~ThreadSafeFunction() {
859+
node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
860+
}
861+
862+
// These methods can be called from any thread.
863+
864+
napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
865+
node::Mutex::ScopedLock lock(this->mutex);
866+
867+
while (queue.size() >= max_queue_size &&
868+
max_queue_size > 0 &&
869+
!is_closing) {
870+
if (mode == napi_tsfn_nonblocking) {
871+
return napi_queue_full;
872+
}
873+
cond->Wait(lock);
874+
}
875+
876+
if (is_closing) {
877+
if (thread_count == 0) {
878+
return napi_invalid_arg;
879+
} else {
880+
thread_count--;
881+
return napi_closing;
882+
}
883+
} else {
884+
if (uv_async_send(&async) != 0) {
885+
return napi_generic_failure;
886+
}
887+
queue.push(data);
888+
return napi_ok;
889+
}
890+
}
891+
892+
napi_status Acquire() {
893+
node::Mutex::ScopedLock lock(this->mutex);
894+
895+
if (is_closing) {
896+
return napi_closing;
897+
}
898+
899+
thread_count++;
900+
901+
return napi_ok;
902+
}
903+
904+
napi_status Release(napi_threadsafe_function_release_mode mode) {
905+
node::Mutex::ScopedLock lock(this->mutex);
906+
907+
if (thread_count == 0) {
908+
return napi_invalid_arg;
909+
}
910+
911+
thread_count--;
912+
913+
if (thread_count == 0 || mode == napi_tsfn_abort) {
914+
if (!is_closing) {
915+
is_closing = (mode == napi_tsfn_abort);
916+
if (is_closing && max_queue_size > 0) {
917+
cond->Signal(lock);
918+
}
919+
if (uv_async_send(&async) != 0) {
920+
return napi_generic_failure;
921+
}
922+
}
923+
}
924+
925+
return napi_ok;
926+
}
927+
928+
void EmptyQueueAndDelete() {
929+
for (; !queue.empty() ; queue.pop()) {
930+
call_js_cb(nullptr, nullptr, context, queue.front());
931+
}
932+
delete this;
933+
}
934+
935+
// These methods must only be called from the loop thread.
936+
937+
napi_status Init() {
938+
ThreadSafeFunction* ts_fn = this;
939+
940+
if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
941+
if (max_queue_size > 0) {
942+
cond.reset(new node::ConditionVariable);
943+
}
944+
if ((max_queue_size == 0 || cond.get() != nullptr) &&
945+
uv_idle_init(env->loop, &idle) == 0) {
946+
return napi_ok;
947+
}
948+
949+
node::Environment::GetCurrent(env->isolate)->CloseHandle(
950+
reinterpret_cast<uv_handle_t*>(&async),
951+
[](uv_handle_t* handle) -> void {
952+
ThreadSafeFunction* ts_fn =
953+
node::ContainerOf(&ThreadSafeFunction::async,
954+
reinterpret_cast<uv_async_t*>(handle));
955+
delete ts_fn;
956+
});
957+
958+
// Prevent the thread-safe function from being deleted here, because
959+
// the callback above will delete it.
960+
ts_fn = nullptr;
961+
}
962+
963+
delete ts_fn;
964+
965+
return napi_generic_failure;
966+
}
967+
968+
napi_status Unref() {
969+
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
970+
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
971+
972+
return napi_ok;
973+
}
974+
975+
napi_status Ref() {
976+
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
977+
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
978+
979+
return napi_ok;
980+
}
981+
982+
void DispatchOne() {
983+
void* data = nullptr;
984+
bool popped_value = false;
985+
bool idle_stop_failed = false;
986+
987+
{
988+
node::Mutex::ScopedLock lock(this->mutex);
989+
if (is_closing) {
990+
CloseHandlesAndMaybeDelete();
991+
} else {
992+
size_t size = queue.size();
993+
if (size > 0) {
994+
data = queue.front();
995+
queue.pop();
996+
popped_value = true;
997+
if (size == max_queue_size && max_queue_size > 0) {
998+
cond->Signal(lock);
999+
}
1000+
size--;
1001+
}
1002+
1003+
if (size == 0) {
1004+
if (thread_count == 0) {
1005+
is_closing = true;
1006+
if (max_queue_size > 0) {
1007+
cond->Signal(lock);
1008+
}
1009+
CloseHandlesAndMaybeDelete();
1010+
} else {
1011+
if (uv_idle_stop(&idle) != 0) {
1012+
idle_stop_failed = true;
1013+
}
1014+
}
1015+
}
1016+
}
1017+
}
1018+
1019+
if (popped_value || idle_stop_failed) {
1020+
v8::HandleScope scope(env->isolate);
1021+
CallbackScope cb_scope(this);
1022+
1023+
if (idle_stop_failed) {
1024+
CHECK(napi_throw_error(env,
1025+
"ERR_NAPI_TSFN_STOP_IDLE_LOOP",
1026+
"Failed to stop the idle loop") == napi_ok);
1027+
} else {
1028+
v8::Local<v8::Function> js_cb =
1029+
v8::Local<v8::Function>::New(env->isolate, ref);
1030+
call_js_cb(env,
1031+
v8impl::JsValueFromV8LocalValue(js_cb),
1032+
context,
1033+
data);
1034+
}
1035+
}
1036+
}
1037+
1038+
node::Environment* NodeEnv() {
1039+
// For some reason grabbing the Node.js environment requires a handle scope.
1040+
v8::HandleScope scope(env->isolate);
1041+
return node::Environment::GetCurrent(env->isolate);
1042+
}
1043+
1044+
void MaybeStartIdle() {
1045+
if (uv_idle_start(&idle, IdleCb) != 0) {
1046+
v8::HandleScope scope(env->isolate);
1047+
CallbackScope cb_scope(this);
1048+
CHECK(napi_throw_error(env,
1049+
"ERR_NAPI_TSFN_START_IDLE_LOOP",
1050+
"Failed to start the idle loop") == napi_ok);
1051+
}
1052+
}
1053+
1054+
void Finalize() {
1055+
v8::HandleScope scope(env->isolate);
1056+
if (finalize_cb) {
1057+
CallbackScope cb_scope(this);
1058+
finalize_cb(env, finalize_data, context);
1059+
}
1060+
EmptyQueueAndDelete();
1061+
}
1062+
1063+
inline void* Context() {
1064+
return context;
1065+
}
1066+
1067+
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
1068+
if (set_closing) {
1069+
node::Mutex::ScopedLock lock(this->mutex);
1070+
is_closing = true;
1071+
if (max_queue_size > 0) {
1072+
cond->Signal(lock);
1073+
}
1074+
}
1075+
if (handles_closing) {
1076+
return;
1077+
}
1078+
handles_closing = true;
1079+
NodeEnv()->CloseHandle(
1080+
reinterpret_cast<uv_handle_t*>(&async),
1081+
[](uv_handle_t* handle) -> void {
1082+
ThreadSafeFunction* ts_fn =
1083+
node::ContainerOf(&ThreadSafeFunction::async,
1084+
reinterpret_cast<uv_async_t*>(handle));
1085+
ts_fn->NodeEnv()->CloseHandle(
1086+
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
1087+
[](uv_handle_t* handle) -> void {
1088+
ThreadSafeFunction* ts_fn =
1089+
node::ContainerOf(&ThreadSafeFunction::idle,
1090+
reinterpret_cast<uv_idle_t*>(handle));
1091+
ts_fn->Finalize();
1092+
});
1093+
});
1094+
}
1095+
1096+
// Default way of calling into JavaScript. Used when ThreadSafeFunction is
1097+
// without a call_js_cb_.
1098+
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
1099+
if (!(env == nullptr || cb == nullptr)) {
1100+
napi_value recv;
1101+
napi_status status;
1102+
1103+
status = napi_get_undefined(env, &recv);
1104+
if (status != napi_ok) {
1105+
napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
1106+
"Failed to retrieve undefined value");
1107+
return;
1108+
}
1109+
1110+
status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
1111+
if (status != napi_ok && status != napi_pending_exception) {
1112+
napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
1113+
"Failed to call JS callback");
1114+
return;
1115+
}
1116+
}
1117+
}
1118+
1119+
static void IdleCb(uv_idle_t* idle) {
1120+
ThreadSafeFunction* ts_fn =
1121+
node::ContainerOf(&ThreadSafeFunction::idle, idle);
1122+
ts_fn->DispatchOne();
1123+
}
1124+
1125+
static void AsyncCb(uv_async_t* async) {
1126+
ThreadSafeFunction* ts_fn =
1127+
node::ContainerOf(&ThreadSafeFunction::async, async);
1128+
ts_fn->MaybeStartIdle();
1129+
}
1130+
1131+
static void Cleanup(void* data) {
1132+
reinterpret_cast<ThreadSafeFunction*>(data)
1133+
->CloseHandlesAndMaybeDelete(true);
1134+
}
1135+
1136+
private:
1137+
// These are variables protected by the mutex.
1138+
node::Mutex mutex;
1139+
std::unique_ptr<node::ConditionVariable> cond;
1140+
std::queue<void*> queue;
1141+
uv_async_t async;
1142+
uv_idle_t idle;
1143+
size_t thread_count;
1144+
bool is_closing;
1145+
1146+
// These are variables set once, upon creation, and then never again, which
1147+
// means we don't need the mutex to read them.
1148+
void* context;
1149+
size_t max_queue_size;
1150+
1151+
// These are variables accessed only from the loop thread.
1152+
node::Persistent<v8::Function> ref;
1153+
napi_env env;
1154+
void* finalize_data;
1155+
napi_finalize finalize_cb;
1156+
napi_threadsafe_function_call_js call_js_cb;
1157+
bool handles_closing;
1158+
};
1159+
8301160
} // end of namespace v8impl
8311161

8321162
// Intercepts the Node-V8 module registration callback. Converts parameters
@@ -3591,448 +3921,121 @@ napi_status napi_create_async_work(napi_env env,
35913921
return napi_clear_last_error(env);
35923922
}
35933923

3594-
napi_status napi_delete_async_work(napi_env env, napi_async_work work) {
3595-
CHECK_ENV(env);
3596-
CHECK_ARG(env, work);
3597-
3598-
uvimpl::Work::Delete(reinterpret_cast<uvimpl::Work*>(work));
3599-
3600-
return napi_clear_last_error(env);
3601-
}
3602-
3603-
napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) {
3604-
CHECK_ENV(env);
3605-
CHECK_ARG(env, loop);
3606-
*loop = env->loop;
3607-
return napi_clear_last_error(env);
3608-
}
3609-
3610-
napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
3611-
CHECK_ENV(env);
3612-
CHECK_ARG(env, work);
3613-
3614-
napi_status status;
3615-
uv_loop_t* event_loop = nullptr;
3616-
status = napi_get_uv_event_loop(env, &event_loop);
3617-
if (status != napi_ok)
3618-
return napi_set_last_error(env, status);
3619-
3620-
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
3621-
3622-
w->ScheduleWork();
3623-
3624-
return napi_clear_last_error(env);
3625-
}
3626-
3627-
napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
3628-
CHECK_ENV(env);
3629-
CHECK_ARG(env, work);
3630-
3631-
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
3632-
3633-
CALL_UV(env, w->CancelWork());
3634-
3635-
return napi_clear_last_error(env);
3636-
}
3637-
3638-
napi_status napi_create_promise(napi_env env,
3639-
napi_deferred* deferred,
3640-
napi_value* promise) {
3641-
NAPI_PREAMBLE(env);
3642-
CHECK_ARG(env, deferred);
3643-
CHECK_ARG(env, promise);
3644-
3645-
auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext());
3646-
CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure);
3647-
3648-
auto v8_resolver = maybe.ToLocalChecked();
3649-
auto v8_deferred = new node::Persistent<v8::Value>();
3650-
v8_deferred->Reset(env->isolate, v8_resolver);
3651-
3652-
*deferred = v8impl::JsDeferredFromNodePersistent(v8_deferred);
3653-
*promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise());
3654-
return GET_RETURN_STATUS(env);
3655-
}
3656-
3657-
napi_status napi_resolve_deferred(napi_env env,
3658-
napi_deferred deferred,
3659-
napi_value resolution) {
3660-
return v8impl::ConcludeDeferred(env, deferred, resolution, true);
3661-
}
3662-
3663-
napi_status napi_reject_deferred(napi_env env,
3664-
napi_deferred deferred,
3665-
napi_value resolution) {
3666-
return v8impl::ConcludeDeferred(env, deferred, resolution, false);
3667-
}
3668-
3669-
napi_status napi_is_promise(napi_env env,
3670-
napi_value promise,
3671-
bool* is_promise) {
3672-
CHECK_ENV(env);
3673-
CHECK_ARG(env, promise);
3674-
CHECK_ARG(env, is_promise);
3675-
3676-
*is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise();
3677-
3678-
return napi_clear_last_error(env);
3679-
}
3680-
3681-
napi_status napi_run_script(napi_env env,
3682-
napi_value script,
3683-
napi_value* result) {
3684-
NAPI_PREAMBLE(env);
3685-
CHECK_ARG(env, script);
3686-
CHECK_ARG(env, result);
3687-
3688-
v8::Local<v8::Value> v8_script = v8impl::V8LocalValueFromJsValue(script);
3689-
3690-
if (!v8_script->IsString()) {
3691-
return napi_set_last_error(env, napi_string_expected);
3692-
}
3693-
3694-
v8::Local<v8::Context> context = env->isolate->GetCurrentContext();
3695-
3696-
auto maybe_script = v8::Script::Compile(context,
3697-
v8::Local<v8::String>::Cast(v8_script));
3698-
CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure);
3699-
3700-
auto script_result =
3701-
maybe_script.ToLocalChecked()->Run(context);
3702-
CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure);
3703-
3704-
*result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked());
3705-
return GET_RETURN_STATUS(env);
3706-
}
3707-
3708-
class TsFn: public node::AsyncResource {
3709-
public:
3710-
TsFn(v8::Local<v8::Function> func,
3711-
v8::Local<v8::Object> resource,
3712-
v8::Local<v8::String> name,
3713-
size_t thread_count_,
3714-
void* context_,
3715-
size_t max_queue_size_,
3716-
napi_env env_,
3717-
void* finalize_data_,
3718-
napi_finalize finalize_cb_,
3719-
napi_threadsafe_function_call_js call_js_cb_):
3720-
AsyncResource(env_->isolate,
3721-
resource,
3722-
*v8::String::Utf8Value(env_->isolate, name)),
3723-
thread_count(thread_count_),
3724-
is_closing(false),
3725-
context(context_),
3726-
max_queue_size(max_queue_size_),
3727-
env(env_),
3728-
finalize_data(finalize_data_),
3729-
finalize_cb(finalize_cb_),
3730-
call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
3731-
handles_closing(false) {
3732-
ref.Reset(env->isolate, func);
3733-
node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
3734-
}
3735-
3736-
~TsFn() {
3737-
node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
3738-
}
3739-
3740-
// These methods can be called from any thread.
3741-
3742-
napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
3743-
node::Mutex::ScopedLock lock(this->mutex);
3744-
3745-
while (queue.size() >= max_queue_size &&
3746-
max_queue_size > 0 &&
3747-
!is_closing) {
3748-
if (mode == napi_tsfn_nonblocking) {
3749-
return napi_queue_full;
3750-
}
3751-
cond->Wait(lock);
3752-
}
3753-
3754-
if (is_closing) {
3755-
if (thread_count == 0) {
3756-
return napi_invalid_arg;
3757-
} else {
3758-
thread_count--;
3759-
return napi_closing;
3760-
}
3761-
} else {
3762-
if (uv_async_send(&async) != 0) {
3763-
return napi_generic_failure;
3764-
}
3765-
queue.push(data);
3766-
return napi_ok;
3767-
}
3768-
}
3769-
3770-
napi_status Acquire() {
3771-
node::Mutex::ScopedLock lock(this->mutex);
3772-
3773-
if (is_closing) {
3774-
return napi_closing;
3775-
}
3776-
3777-
thread_count++;
3778-
3779-
return napi_ok;
3780-
}
3781-
3782-
napi_status Release(napi_threadsafe_function_release_mode mode) {
3783-
node::Mutex::ScopedLock lock(this->mutex);
3784-
3785-
if (thread_count == 0) {
3786-
return napi_invalid_arg;
3787-
}
3788-
3789-
thread_count--;
3790-
3791-
if (thread_count == 0 || mode == napi_tsfn_abort) {
3792-
if (!is_closing) {
3793-
is_closing = (mode == napi_tsfn_abort);
3794-
if (is_closing && max_queue_size > 0) {
3795-
cond->Signal(lock);
3796-
}
3797-
if (uv_async_send(&async) != 0) {
3798-
return napi_generic_failure;
3799-
}
3800-
}
3801-
}
3802-
3803-
return napi_ok;
3804-
}
3805-
3806-
void EmptyQueueAndDelete() {
3807-
for (; !queue.empty() ; queue.pop()) {
3808-
call_js_cb(nullptr, nullptr, context, queue.front());
3809-
}
3810-
delete this;
3811-
}
3812-
3813-
// These methods must only be called from the loop thread.
3814-
3815-
napi_status Init() {
3816-
TsFn* ts_fn = this;
3924+
napi_status napi_delete_async_work(napi_env env, napi_async_work work) {
3925+
CHECK_ENV(env);
3926+
CHECK_ARG(env, work);
38173927

3818-
if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
3819-
if (max_queue_size > 0) {
3820-
cond.reset(new node::ConditionVariable);
3821-
}
3822-
if ((max_queue_size == 0 || cond.get() != nullptr) &&
3823-
uv_idle_init(env->loop, &idle) == 0) {
3824-
return napi_ok;
3825-
}
3928+
uvimpl::Work::Delete(reinterpret_cast<uvimpl::Work*>(work));
38263929

3827-
node::Environment::GetCurrent(env->isolate)->CloseHandle(
3828-
reinterpret_cast<uv_handle_t*>(&async),
3829-
[] (uv_handle_t* handle) -> void {
3830-
TsFn* ts_fn =
3831-
node::ContainerOf(&TsFn::async,
3832-
reinterpret_cast<uv_async_t*>(handle));
3833-
delete ts_fn;
3834-
});
3930+
return napi_clear_last_error(env);
3931+
}
38353932

3836-
// Prevent the thread-safe function from being deleted here, because
3837-
// the callback above will delete it.
3838-
ts_fn = nullptr;
3839-
}
3933+
napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) {
3934+
CHECK_ENV(env);
3935+
CHECK_ARG(env, loop);
3936+
*loop = env->loop;
3937+
return napi_clear_last_error(env);
3938+
}
38403939

3841-
delete ts_fn;
3940+
napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
3941+
CHECK_ENV(env);
3942+
CHECK_ARG(env, work);
38423943

3843-
return napi_generic_failure;
3844-
}
3944+
napi_status status;
3945+
uv_loop_t* event_loop = nullptr;
3946+
status = napi_get_uv_event_loop(env, &event_loop);
3947+
if (status != napi_ok)
3948+
return napi_set_last_error(env, status);
38453949

3846-
napi_status Unref() {
3847-
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
3848-
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
3950+
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
38493951

3850-
return napi_ok;
3851-
}
3952+
w->ScheduleWork();
38523953

3853-
napi_status Ref() {
3854-
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
3855-
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
3954+
return napi_clear_last_error(env);
3955+
}
38563956

3857-
return napi_ok;
3858-
}
3957+
napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
3958+
CHECK_ENV(env);
3959+
CHECK_ARG(env, work);
38593960

3860-
void DispatchOne() {
3861-
void* data = nullptr;
3862-
bool popped_value = false;
3863-
bool idle_stop_failed = false;
3961+
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
38643962

3865-
{
3866-
node::Mutex::ScopedLock lock(this->mutex);
3867-
if (is_closing) {
3868-
CloseHandlesAndMaybeDelete();
3869-
} else {
3870-
size_t size = queue.size();
3871-
if (size > 0) {
3872-
data = queue.front();
3873-
queue.pop();
3874-
popped_value = true;
3875-
if (size == max_queue_size && max_queue_size > 0) {
3876-
cond->Signal(lock);
3877-
}
3878-
size--;
3879-
}
3963+
CALL_UV(env, w->CancelWork());
38803964

3881-
if (size == 0) {
3882-
if (thread_count == 0) {
3883-
is_closing = true;
3884-
if (max_queue_size > 0) {
3885-
cond->Signal(lock);
3886-
}
3887-
CloseHandlesAndMaybeDelete();
3888-
} else {
3889-
if (uv_idle_stop(&idle) != 0) {
3890-
idle_stop_failed = true;
3891-
}
3892-
}
3893-
}
3894-
}
3895-
}
3965+
return napi_clear_last_error(env);
3966+
}
38963967

3897-
if (popped_value || idle_stop_failed) {
3898-
v8::HandleScope scope(env->isolate);
3899-
CallbackScope cb_scope(this);
3968+
napi_status napi_create_promise(napi_env env,
3969+
napi_deferred* deferred,
3970+
napi_value* promise) {
3971+
NAPI_PREAMBLE(env);
3972+
CHECK_ARG(env, deferred);
3973+
CHECK_ARG(env, promise);
39003974

3901-
if (idle_stop_failed) {
3902-
CHECK(napi_throw_error(env,
3903-
"ERR_NAPI_TSFN_STOP_IDLE_LOOP",
3904-
"Failed to stop the idle loop") == napi_ok);
3905-
} else {
3906-
v8::Local<v8::Function> js_cb =
3907-
v8::Local<v8::Function>::New(env->isolate, ref);
3908-
call_js_cb(env,
3909-
v8impl::JsValueFromV8LocalValue(js_cb),
3910-
context,
3911-
data);
3912-
}
3913-
}
3914-
}
3975+
auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext());
3976+
CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure);
39153977

3916-
node::Environment* NodeEnv() {
3917-
// For some reason grabbing the Node.js environment requires a handle scope.
3918-
v8::HandleScope scope(env->isolate);
3919-
return node::Environment::GetCurrent(env->isolate);
3920-
}
3978+
auto v8_resolver = maybe.ToLocalChecked();
3979+
auto v8_deferred = new node::Persistent<v8::Value>();
3980+
v8_deferred->Reset(env->isolate, v8_resolver);
39213981

3922-
void MaybeStartIdle() {
3923-
if (uv_idle_start(&idle, IdleCb) != 0) {
3924-
v8::HandleScope scope(env->isolate);
3925-
CallbackScope cb_scope(this);
3926-
CHECK(napi_throw_error(env,
3927-
"ERR_NAPI_TSFN_START_IDLE_LOOP",
3928-
"Failed to start the idle loop") == napi_ok);
3929-
}
3930-
}
3982+
*deferred = v8impl::JsDeferredFromNodePersistent(v8_deferred);
3983+
*promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise());
3984+
return GET_RETURN_STATUS(env);
3985+
}
39313986

3932-
void Finalize() {
3933-
v8::HandleScope scope(env->isolate);
3934-
if (finalize_cb) {
3935-
CallbackScope cb_scope(this);
3936-
finalize_cb(env, finalize_data, context);
3937-
}
3938-
EmptyQueueAndDelete();
3939-
}
3987+
napi_status napi_resolve_deferred(napi_env env,
3988+
napi_deferred deferred,
3989+
napi_value resolution) {
3990+
return v8impl::ConcludeDeferred(env, deferred, resolution, true);
3991+
}
39403992

3941-
inline void* Context() {
3942-
return context;
3943-
}
3993+
napi_status napi_reject_deferred(napi_env env,
3994+
napi_deferred deferred,
3995+
napi_value resolution) {
3996+
return v8impl::ConcludeDeferred(env, deferred, resolution, false);
3997+
}
39443998

3945-
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
3946-
if (set_closing) {
3947-
node::Mutex::ScopedLock lock(this->mutex);
3948-
is_closing = true;
3949-
if (max_queue_size > 0) {
3950-
cond->Signal(lock);
3951-
}
3952-
}
3953-
if (handles_closing) {
3954-
return;
3955-
}
3956-
handles_closing = true;
3957-
NodeEnv()->CloseHandle(
3958-
reinterpret_cast<uv_handle_t*>(&async),
3959-
[] (uv_handle_t* handle) -> void {
3960-
TsFn* ts_fn = node::ContainerOf(&TsFn::async,
3961-
reinterpret_cast<uv_async_t*>(handle));
3962-
ts_fn->NodeEnv()->CloseHandle(
3963-
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
3964-
[] (uv_handle_t* handle) -> void {
3965-
TsFn* ts_fn = node::ContainerOf(&TsFn::idle,
3966-
reinterpret_cast<uv_idle_t*>(handle));
3967-
ts_fn->Finalize();
3968-
});
3969-
});
3970-
}
3999+
napi_status napi_is_promise(napi_env env,
4000+
napi_value promise,
4001+
bool* is_promise) {
4002+
CHECK_ENV(env);
4003+
CHECK_ARG(env, promise);
4004+
CHECK_ARG(env, is_promise);
39714005

3972-
// Default way of calling into JavaScript. Used when TsFn is constructed
3973-
// without a call_js_cb_.
3974-
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
3975-
if (!(env == nullptr || cb == nullptr)) {
3976-
napi_value recv;
3977-
napi_status status;
4006+
*is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise();
39784007

3979-
status = napi_get_undefined(env, &recv);
3980-
if (status != napi_ok) {
3981-
napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
3982-
"Failed to retrieve undefined value");
3983-
return;
3984-
}
4008+
return napi_clear_last_error(env);
4009+
}
39854010

3986-
status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
3987-
if (status != napi_ok && status != napi_pending_exception) {
3988-
napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
3989-
"Failed to call JS callback");
3990-
return;
3991-
}
3992-
}
3993-
}
4011+
napi_status napi_run_script(napi_env env,
4012+
napi_value script,
4013+
napi_value* result) {
4014+
NAPI_PREAMBLE(env);
4015+
CHECK_ARG(env, script);
4016+
CHECK_ARG(env, result);
39944017

3995-
static void IdleCb(uv_idle_t* idle) {
3996-
TsFn* ts_fn =
3997-
node::ContainerOf(&TsFn::idle, idle);
3998-
ts_fn->DispatchOne();
3999-
}
4018+
v8::Local<v8::Value> v8_script = v8impl::V8LocalValueFromJsValue(script);
40004019

4001-
static void AsyncCb(uv_async_t* async) {
4002-
TsFn* ts_fn =
4003-
node::ContainerOf(&TsFn::async, async);
4004-
ts_fn->MaybeStartIdle();
4020+
if (!v8_script->IsString()) {
4021+
return napi_set_last_error(env, napi_string_expected);
40054022
}
40064023

4007-
static void Cleanup(void* data) {
4008-
reinterpret_cast<TsFn*>(data)->CloseHandlesAndMaybeDelete(true);
4009-
}
4024+
v8::Local<v8::Context> context = env->isolate->GetCurrentContext();
40104025

4011-
private:
4012-
// These are variables protected by the mutex.
4013-
node::Mutex mutex;
4014-
std::unique_ptr<node::ConditionVariable> cond;
4015-
std::queue<void*> queue;
4016-
uv_async_t async;
4017-
uv_idle_t idle;
4018-
size_t thread_count;
4019-
bool is_closing;
4026+
auto maybe_script = v8::Script::Compile(context,
4027+
v8::Local<v8::String>::Cast(v8_script));
4028+
CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure);
40204029

4021-
// These are variables set once, upon creation, and then never again, which
4022-
// means we don't need the mutex to read them.
4023-
void* context;
4024-
size_t max_queue_size;
4030+
auto script_result =
4031+
maybe_script.ToLocalChecked()->Run(context);
4032+
CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure);
40254033

4026-
// These are variables accessed only from the loop thread.
4027-
node::Persistent<v8::Function> ref;
4028-
napi_env env;
4029-
void* finalize_data;
4030-
napi_finalize finalize_cb;
4031-
napi_threadsafe_function_call_js call_js_cb;
4032-
bool handles_closing;
4033-
};
4034+
*result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked());
4035+
return GET_RETURN_STATUS(env);
4036+
}
40344037

4035-
NAPI_EXTERN napi_status
4038+
napi_status
40364039
napi_create_threadsafe_function(napi_env env,
40374040
napi_value func,
40384041
napi_value async_resource,
@@ -4067,16 +4070,17 @@ napi_create_threadsafe_function(napi_env env,
40674070
v8::Local<v8::String> v8_name;
40684071
CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name);
40694072

4070-
TsFn* ts_fn = new TsFn(v8_func,
4071-
v8_resource,
4072-
v8_name,
4073-
initial_thread_count,
4074-
context,
4075-
max_queue_size,
4076-
env,
4077-
thread_finalize_data,
4078-
thread_finalize_cb,
4079-
call_js_cb);
4073+
v8impl::ThreadSafeFunction* ts_fn =
4074+
new v8impl::ThreadSafeFunction(v8_func,
4075+
v8_resource,
4076+
v8_name,
4077+
initial_thread_count,
4078+
context,
4079+
max_queue_size,
4080+
env,
4081+
thread_finalize_data,
4082+
thread_finalize_cb,
4083+
call_js_cb);
40804084

40814085
if (ts_fn == nullptr) {
40824086
status = napi_generic_failure;
@@ -4091,45 +4095,46 @@ napi_create_threadsafe_function(napi_env env,
40914095
return napi_set_last_error(env, status);
40924096
}
40934097

4094-
NAPI_EXTERN napi_status
4098+
napi_status
40954099
napi_get_threadsafe_function_context(napi_threadsafe_function func,
40964100
void** result) {
40974101
CHECK(func != nullptr);
40984102
CHECK(result != nullptr);
40994103

4100-
*result = reinterpret_cast<TsFn*>(func)->Context();
4104+
*result = reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Context();
41014105
return napi_ok;
41024106
}
41034107

4104-
NAPI_EXTERN napi_status
4108+
napi_status
41054109
napi_call_threadsafe_function(napi_threadsafe_function func,
41064110
void* data,
41074111
napi_threadsafe_function_call_mode is_blocking) {
41084112
CHECK(func != nullptr);
4109-
return reinterpret_cast<TsFn*>(func)->Push(data, is_blocking);
4113+
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Push(data,
4114+
is_blocking);
41104115
}
41114116

4112-
NAPI_EXTERN napi_status
4117+
napi_status
41134118
napi_acquire_threadsafe_function(napi_threadsafe_function func) {
41144119
CHECK(func != nullptr);
4115-
return reinterpret_cast<TsFn*>(func)->Acquire();
4120+
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Acquire();
41164121
}
41174122

4118-
NAPI_EXTERN napi_status
4123+
napi_status
41194124
napi_release_threadsafe_function(napi_threadsafe_function func,
41204125
napi_threadsafe_function_release_mode mode) {
41214126
CHECK(func != nullptr);
4122-
return reinterpret_cast<TsFn*>(func)->Release(mode);
4127+
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Release(mode);
41234128
}
41244129

4125-
NAPI_EXTERN napi_status
4130+
napi_status
41264131
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
41274132
CHECK(func != nullptr);
4128-
return reinterpret_cast<TsFn*>(func)->Unref();
4133+
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Unref();
41294134
}
41304135

4131-
NAPI_EXTERN napi_status
4136+
napi_status
41324137
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
41334138
CHECK(func != nullptr);
4134-
return reinterpret_cast<TsFn*>(func)->Ref();
4139+
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Ref();
41354140
}

0 commit comments

Comments
 (0)
Please sign in to comment.