Skip to content

Commit 779ad54

Browse files
indutnytargos
authored andcommittedJun 11, 2021
node-api: faster threadsafe_function
Invoke threadsafe_function during the same tick and avoid marshalling costs between threads and/or churning event loop if either: 1. There's a queued call already 2. `Push()` is called while the main thread was running threadsafe_function PR-URL: #38506 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Rich Trott <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent dbb0d26 commit 779ad54

File tree

3 files changed

+66
-35
lines changed

3 files changed

+66
-35
lines changed
 

‎src/node_api.cc

+55-33
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "tracing/traced_value.h"
1313
#include "util-inl.h"
1414

15+
#include <atomic>
1516
#include <memory>
1617

1718
struct node_napi_env__ : public napi_env__ {
@@ -136,6 +137,7 @@ class ThreadSafeFunction : public node::AsyncResource {
136137
*v8::String::Utf8Value(env_->isolate, name)),
137138
thread_count(thread_count_),
138139
is_closing(false),
140+
dispatch_state(kDispatchIdle),
139141
context(context_),
140142
max_queue_size(max_queue_size_),
141143
env(env_),
@@ -175,10 +177,8 @@ class ThreadSafeFunction : public node::AsyncResource {
175177
return napi_closing;
176178
}
177179
} else {
178-
if (uv_async_send(&async) != 0) {
179-
return napi_generic_failure;
180-
}
181180
queue.push(data);
181+
Send();
182182
return napi_ok;
183183
}
184184
}
@@ -210,9 +210,7 @@ class ThreadSafeFunction : public node::AsyncResource {
210210
if (is_closing && max_queue_size > 0) {
211211
cond->Signal(lock);
212212
}
213-
if (uv_async_send(&async) != 0) {
214-
return napi_generic_failure;
215-
}
213+
Send();
216214
}
217215
}
218216

@@ -237,7 +235,6 @@ class ThreadSafeFunction : public node::AsyncResource {
237235
cond = std::make_unique<node::ConditionVariable>();
238236
}
239237
if (max_queue_size == 0 || cond) {
240-
CHECK_EQ(0, uv_idle_init(loop, &idle));
241238
return napi_ok;
242239
}
243240

@@ -262,21 +259,46 @@ class ThreadSafeFunction : public node::AsyncResource {
262259

263260
napi_status Unref() {
264261
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
265-
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
266262

267263
return napi_ok;
268264
}
269265

270266
napi_status Ref() {
271267
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
272-
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
273268

274269
return napi_ok;
275270
}
276271

277-
void DispatchOne() {
272+
inline void* Context() {
273+
return context;
274+
}
275+
276+
protected:
277+
void Dispatch() {
278+
bool has_more = true;
279+
280+
// Limit maximum synchronous iteration count to prevent event loop
281+
// starvation. See `src/node_messaging.cc` for an inspiration.
282+
unsigned int iterations_left = kMaxIterationCount;
283+
while (has_more && --iterations_left != 0) {
284+
dispatch_state = kDispatchRunning;
285+
has_more = DispatchOne();
286+
287+
// Send() was called while we were executing the JS function
288+
if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
289+
has_more = true;
290+
}
291+
}
292+
293+
if (has_more) {
294+
Send();
295+
}
296+
}
297+
298+
bool DispatchOne() {
278299
void* data = nullptr;
279300
bool popped_value = false;
301+
bool has_more = false;
280302

281303
{
282304
node::Mutex::ScopedLock lock(this->mutex);
@@ -301,9 +323,9 @@ class ThreadSafeFunction : public node::AsyncResource {
301323
cond->Signal(lock);
302324
}
303325
CloseHandlesAndMaybeDelete();
304-
} else {
305-
CHECK_EQ(0, uv_idle_stop(&idle));
306326
}
327+
} else {
328+
has_more = true;
307329
}
308330
}
309331
}
@@ -321,6 +343,8 @@ class ThreadSafeFunction : public node::AsyncResource {
321343
call_js_cb(env, js_callback, context, data);
322344
});
323345
}
346+
347+
return has_more;
324348
}
325349

326350
void Finalize() {
@@ -334,10 +358,6 @@ class ThreadSafeFunction : public node::AsyncResource {
334358
EmptyQueueAndDelete();
335359
}
336360

337-
inline void* Context() {
338-
return context;
339-
}
340-
341361
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
342362
v8::HandleScope scope(env->isolate);
343363
if (set_closing) {
@@ -357,18 +377,20 @@ class ThreadSafeFunction : public node::AsyncResource {
357377
ThreadSafeFunction* ts_fn =
358378
node::ContainerOf(&ThreadSafeFunction::async,
359379
reinterpret_cast<uv_async_t*>(handle));
360-
v8::HandleScope scope(ts_fn->env->isolate);
361-
ts_fn->env->node_env()->CloseHandle(
362-
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
363-
[](uv_handle_t* handle) -> void {
364-
ThreadSafeFunction* ts_fn =
365-
node::ContainerOf(&ThreadSafeFunction::idle,
366-
reinterpret_cast<uv_idle_t*>(handle));
367-
ts_fn->Finalize();
368-
});
380+
ts_fn->Finalize();
369381
});
370382
}
371383

384+
void Send() {
385+
// Ask currently running Dispatch() to make one more iteration
386+
unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
387+
if ((current_state & kDispatchRunning) == kDispatchRunning) {
388+
return;
389+
}
390+
391+
CHECK_EQ(0, uv_async_send(&async));
392+
}
393+
372394
// Default way of calling into JavaScript. Used when ThreadSafeFunction is
373395
// without a call_js_cb_.
374396
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
@@ -392,16 +414,10 @@ class ThreadSafeFunction : public node::AsyncResource {
392414
}
393415
}
394416

395-
static void IdleCb(uv_idle_t* idle) {
396-
ThreadSafeFunction* ts_fn =
397-
node::ContainerOf(&ThreadSafeFunction::idle, idle);
398-
ts_fn->DispatchOne();
399-
}
400-
401417
static void AsyncCb(uv_async_t* async) {
402418
ThreadSafeFunction* ts_fn =
403419
node::ContainerOf(&ThreadSafeFunction::async, async);
404-
CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb));
420+
ts_fn->Dispatch();
405421
}
406422

407423
static void Cleanup(void* data) {
@@ -410,14 +426,20 @@ class ThreadSafeFunction : public node::AsyncResource {
410426
}
411427

412428
private:
429+
static const unsigned char kDispatchIdle = 0;
430+
static const unsigned char kDispatchRunning = 1 << 0;
431+
static const unsigned char kDispatchPending = 1 << 1;
432+
433+
static const unsigned int kMaxIterationCount = 1000;
434+
413435
// These are variables protected by the mutex.
414436
node::Mutex mutex;
415437
std::unique_ptr<node::ConditionVariable> cond;
416438
std::queue<void*> queue;
417439
uv_async_t async;
418-
uv_idle_t idle;
419440
size_t thread_count;
420441
bool is_closing;
442+
std::atomic_uchar dispatch_state;
421443

422444
// These are variables set once, upon creation, and then never again, which
423445
// means we don't need the mutex to read them.

‎test/node-api/test_threadsafe_function/binding.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include <node_api.h>
88
#include "../../js-native-api/common.h"
99

10-
#define ARRAY_LENGTH 10
10+
#define ARRAY_LENGTH 10000
1111
#define MAX_QUEUE_SIZE 2
1212

1313
static uv_thread_t uv_threads[2];
@@ -72,7 +72,7 @@ static void data_source_thread(void* data) {
7272
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
7373
status = napi_call_threadsafe_function(ts_fn, &ints[index],
7474
ts_fn_info->block_on_full);
75-
if (ts_fn_info->max_queue_size == 0) {
75+
if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
7676
// Let's make this thread really busy for 200 ms to give the main thread a
7777
// chance to abort.
7878
uint64_t start = uv_hrtime();

‎test/node-api/test_threadsafe_function/test.js

+9
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,15 @@ new Promise(function testWithoutJSMarshaller(resolve) {
210210
}))
211211
.then((result) => assert.strictEqual(result.indexOf(0), -1))
212212

213+
// Make sure that threadsafe function isn't stalled when we hit
214+
// `kMaxIterationCount` in `src/node_api.cc`
215+
.then(() => testWithJSMarshaller({
216+
threadStarter: 'StartThreadNonblocking',
217+
maxQueueSize: binding.ARRAY_LENGTH >>> 1,
218+
quitAfter: binding.ARRAY_LENGTH
219+
}))
220+
.then((result) => assert.deepStrictEqual(result, expectedArray))
221+
213222
// Start a child process to test rapid teardown
214223
.then(() => testUnref(binding.MAX_QUEUE_SIZE))
215224

0 commit comments

Comments
 (0)
Please sign in to comment.