Skip to content

Commit c4fa03a

Browse files
committed
test: fix intermittent TSFN crashes
PR-URL: nodejs/node-addon-api#974 Reviewed-By: Michael Dawson <[email protected]>
1 parent 64bc592 commit c4fa03a

File tree

3 files changed

+47
-21
lines changed

3 files changed

+47
-21
lines changed

test/binding.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ Object Init(Env env, Object exports) {
121121
exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env));
122122
exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env));
123123
exports.Set("threadsafe_function_unref", InitThreadSafeFunctionUnref(env));
124-
exports.Set("threadsafe_function", InitTypedThreadSafeFunction(env));
124+
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
125125
exports.Set("typed_threadsafe_function_ctx",
126126
InitTypedThreadSafeFunctionCtx(env));
127127
exports.Set("typed_threadsafe_function_existing_tsfn",

test/threadsafe_function/threadsafe_function.cc

+18-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include <chrono>
2+
#include <condition_variable>
3+
#include <mutex>
24
#include <thread>
35
#include "napi.h"
46

@@ -22,6 +24,9 @@ struct ThreadSafeFunctionInfo {
2224
bool startSecondary;
2325
FunctionReference jsFinalizeCallback;
2426
uint32_t maxQueueSize;
27+
bool closeCalledFromJs;
28+
std::mutex protect;
29+
std::condition_variable signal;
2530
} tsfnInfo;
2631

2732
// Thread data to transmit to JS
@@ -65,12 +70,13 @@ static void DataSourceThread() {
6570
break;
6671
}
6772

68-
if (info->maxQueueSize == 0) {
69-
// Let's make this thread really busy for 200 ms to give the main thread a
70-
// chance to abort.
71-
auto start = std::chrono::high_resolution_clock::now();
72-
constexpr auto MS_200 = std::chrono::milliseconds(200);
73-
for (; std::chrono::high_resolution_clock::now() - start < MS_200;);
73+
if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) {
74+
// Let's make this thread really busy to give the main thread a chance to
75+
// abort / close.
76+
std::unique_lock<std::mutex> lk(info->protect);
77+
while (!info->closeCalledFromJs) {
78+
info->signal.wait(lk);
79+
}
7480
}
7581

7682
switch (status) {
@@ -112,6 +118,11 @@ static Value StopThread(const CallbackInfo& info) {
112118
} else {
113119
tsfn.Release();
114120
}
121+
{
122+
std::lock_guard<std::mutex> _(tsfnInfo.protect);
123+
tsfnInfo.closeCalledFromJs = true;
124+
tsfnInfo.signal.notify_one();
125+
}
115126
return Value();
116127
}
117128

@@ -134,6 +145,7 @@ static Value StartThreadInternal(const CallbackInfo& info,
134145
tsfnInfo.abort = info[1].As<Boolean>();
135146
tsfnInfo.startSecondary = info[2].As<Boolean>();
136147
tsfnInfo.maxQueueSize = info[3].As<Number>().Uint32Value();
148+
tsfnInfo.closeCalledFromJs = false;
137149

138150
tsfn = ThreadSafeFunction::New(info.Env(), info[0].As<Function>(),
139151
"Test", tsfnInfo.maxQueueSize, 2, &tsfnInfo, JoinTheThreads, threads);

test/typed_threadsafe_function/typed_threadsafe_function.cc

+28-14
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include <chrono>
2+
#include <condition_variable>
3+
#include <mutex>
24
#include <thread>
35
#include "napi.h"
46

@@ -17,6 +19,9 @@ static struct ThreadSafeFunctionInfo {
1719
bool startSecondary;
1820
FunctionReference jsFinalizeCallback;
1921
uint32_t maxQueueSize;
22+
bool closeCalledFromJs;
23+
std::mutex protect;
24+
std::condition_variable signal;
2025
} tsfnInfo;
2126

2227
static void TSFNCallJS(Env env,
@@ -42,7 +47,7 @@ static int ints[ARRAY_LENGTH];
4247

4348
static void SecondaryThread() {
4449
if (tsfn.Release() != napi_ok) {
45-
Error::Fatal("SecondaryThread", "ThreadSafeFunction.Release() failed");
50+
Error::Fatal("TypedSecondaryThread", "ThreadSafeFunction.Release() failed");
4651
}
4752
}
4853

@@ -52,7 +57,8 @@ static void DataSourceThread() {
5257

5358
if (info->startSecondary) {
5459
if (tsfn.Acquire() != napi_ok) {
55-
Error::Fatal("DataSourceThread", "ThreadSafeFunction.Acquire() failed");
60+
Error::Fatal("TypedDataSourceThread",
61+
"ThreadSafeFunction.Acquire() failed");
5662
}
5763

5864
threads[1] = std::thread(SecondaryThread);
@@ -75,13 +81,13 @@ static void DataSourceThread() {
7581
break;
7682
}
7783

78-
if (info->maxQueueSize == 0) {
79-
// Let's make this thread really busy for 200 ms to give the main thread a
80-
// chance to abort.
81-
auto start = std::chrono::high_resolution_clock::now();
82-
constexpr auto MS_200 = std::chrono::milliseconds(200);
83-
for (; std::chrono::high_resolution_clock::now() - start < MS_200;)
84-
;
84+
if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) {
85+
// Let's make this thread really busy to give the main thread a chance to
86+
// abort / close.
87+
std::unique_lock<std::mutex> lk(info->protect);
88+
while (!info->closeCalledFromJs) {
89+
info->signal.wait(lk);
90+
}
8591
}
8692

8793
switch (status) {
@@ -98,20 +104,22 @@ static void DataSourceThread() {
98104
break;
99105

100106
default:
101-
Error::Fatal("DataSourceThread", "ThreadSafeFunction.*Call() failed");
107+
Error::Fatal("TypedDataSourceThread",
108+
"ThreadSafeFunction.*Call() failed");
102109
}
103110
}
104111

105112
if (info->type == ThreadSafeFunctionInfo::NON_BLOCKING && !queueWasFull) {
106-
Error::Fatal("DataSourceThread", "Queue was never full");
113+
Error::Fatal("TypedDataSourceThread", "Queue was never full");
107114
}
108115

109116
if (info->abort && !queueWasClosing) {
110-
Error::Fatal("DataSourceThread", "Queue was never closing");
117+
Error::Fatal("TypedDataSourceThread", "Queue was never closing");
111118
}
112119

113120
if (!queueWasClosing && tsfn.Release() != napi_ok) {
114-
Error::Fatal("DataSourceThread", "ThreadSafeFunction.Release() failed");
121+
Error::Fatal("TypedDataSourceThread",
122+
"ThreadSafeFunction.Release() failed");
115123
}
116124
}
117125

@@ -123,6 +131,11 @@ static Value StopThread(const CallbackInfo& info) {
123131
} else {
124132
tsfn.Release();
125133
}
134+
{
135+
std::lock_guard<std::mutex> _(tsfnInfo.protect);
136+
tsfnInfo.closeCalledFromJs = true;
137+
tsfnInfo.signal.notify_one();
138+
}
126139
return Value();
127140
}
128141

@@ -145,6 +158,7 @@ static Value StartThreadInternal(const CallbackInfo& info,
145158
tsfnInfo.abort = info[1].As<Boolean>();
146159
tsfnInfo.startSecondary = info[2].As<Boolean>();
147160
tsfnInfo.maxQueueSize = info[3].As<Number>().Uint32Value();
161+
tsfnInfo.closeCalledFromJs = false;
148162

149163
tsfn = TSFN::New(info.Env(),
150164
info[0].As<Function>(),
@@ -163,7 +177,7 @@ static Value StartThreadInternal(const CallbackInfo& info,
163177

164178
static Value Release(const CallbackInfo& /* info */) {
165179
if (tsfn.Release() != napi_ok) {
166-
Error::Fatal("Release", "ThreadSafeFunction.Release() failed");
180+
Error::Fatal("Release", "TypedThreadSafeFunction.Release() failed");
167181
}
168182
return Value();
169183
}

0 commit comments

Comments
 (0)