Skip to content

Commit 25ac64c

Browse files
committed
test: add test for effect of UV_THREADPOOL_SIZE
This (not particularly elegant) native addon tests the effect of UV_THREADPOOL_SIZE on node-api. The test fails if Node.js allows more than UV_THREADPOOL_SIZE async tasks to run concurrently, or if it limits the number of concurrent async tasks to anything less than UV_THREADPOOL_SIZE.
1 parent 634eb50 commit 25ac64c

File tree

3 files changed

+193
-0
lines changed

3 files changed

+193
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"targets": [
3+
{
4+
"target_name": "test_uv_threadpool_size",
5+
"sources": [ "test_uv_threadpool_size.c" ]
6+
}
7+
]
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
'use strict';
2+
const common = require('../../common');
3+
const { test } = require(`./build/${common.buildType}/test_uv_threadpool_size`);
4+
5+
const uvThreadpoolSize = parseInt(process.env.EXPECTED_UV_THREADPOOL_SIZE ||
6+
process.env.UV_THREADPOOL_SIZE, 10) || 4;
7+
test(uvThreadpoolSize);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
#undef NDEBUG
2+
#include <assert.h>
3+
#include <node_api.h>
4+
#include <stdlib.h>
5+
#include <uv.h>
6+
#include "../../js-native-api/common.h"
7+
8+
typedef struct {
9+
uv_mutex_t mutex;
10+
uint32_t threadpool_size;
11+
uint32_t n_tasks_started;
12+
uint32_t n_tasks_exited;
13+
uint32_t n_tasks_finalized;
14+
bool observed_saturation;
15+
} async_shared_data;
16+
17+
typedef struct {
18+
uint32_t task_id;
19+
async_shared_data* shared_data;
20+
napi_async_work request;
21+
} async_carrier;
22+
23+
static inline bool all_tasks_started(async_shared_data* d) {
24+
assert(d->n_tasks_started <= d->threadpool_size + 1);
25+
return d->n_tasks_started == d->threadpool_size + 1;
26+
}
27+
28+
static inline bool all_tasks_exited(async_shared_data* d) {
29+
assert(d->n_tasks_exited <= d->n_tasks_started);
30+
return all_tasks_started(d) && d->n_tasks_exited == d->n_tasks_started;
31+
}
32+
33+
static inline bool all_tasks_finalized(async_shared_data* d) {
34+
assert(d->n_tasks_finalized <= d->n_tasks_exited);
35+
return all_tasks_exited(d) && d->n_tasks_finalized == d->n_tasks_exited;
36+
}
37+
38+
static inline bool still_saturating(async_shared_data* d) {
39+
return d->n_tasks_started < d->threadpool_size;
40+
}
41+
42+
static inline bool threadpool_saturated(async_shared_data* d) {
43+
return d->n_tasks_started == d->threadpool_size && d->n_tasks_exited == 0;
44+
}
45+
46+
static inline bool threadpool_desaturating(async_shared_data* d) {
47+
return d->n_tasks_started >= d->threadpool_size && d->n_tasks_exited != 0;
48+
}
49+
50+
static inline void print_info(const char* label, async_carrier* c) {
51+
async_shared_data* d = c->shared_data;
52+
printf("%s task_id=%u n_tasks_started=%u n_tasks_exited=%u "
53+
"n_tasks_finalized=%u observed_saturation=%d\n",
54+
label, c->task_id, d->n_tasks_started, d->n_tasks_exited,
55+
d->n_tasks_finalized, d->observed_saturation);
56+
}
57+
58+
static void Execute(napi_env env, void* data) {
59+
async_carrier* c = (async_carrier*) data;
60+
async_shared_data* d = c->shared_data;
61+
62+
// As long as fewer than threadpool_size async tasks have been started, more
63+
// should be started (eventually). Only once that happens should scheduled
64+
// async tasks remain queued.
65+
uv_mutex_lock(&d->mutex);
66+
bool should_be_concurrent = still_saturating(d);
67+
d->n_tasks_started++;
68+
assert(d->n_tasks_started <= d->threadpool_size + 1);
69+
70+
print_info("start", c);
71+
72+
if (should_be_concurrent) {
73+
// Wait for the thread pool to be saturated. This is not an elegant way of
74+
// doing so, but it really does not matter much here.
75+
while (still_saturating(d)) {
76+
print_info("waiting", c);
77+
uv_mutex_unlock(&d->mutex);
78+
uv_sleep(100);
79+
uv_mutex_lock(&d->mutex);
80+
}
81+
82+
// One async task will observe that the threadpool is saturated, that is,
83+
// that threadpool_size tasks have been started and none have exited yet.
84+
// That task will be the first to exit.
85+
if (!d->observed_saturation) {
86+
assert(threadpool_saturated(d));
87+
d->observed_saturation = true;
88+
} else {
89+
assert(threadpool_saturated(d) || threadpool_desaturating(d));
90+
}
91+
} else {
92+
// If this task is not among the first threadpool_size tasks, it should not
93+
// have been started unless other tasks have already finished.
94+
assert(threadpool_desaturating(d));
95+
}
96+
97+
print_info("exit", c);
98+
99+
// Allow other tasks to access the shared data. If the thread pool is actually
100+
// larger than threadpool_size, this allows an extraneous task to start, which
101+
// will lead to an assertion error.
102+
uv_mutex_unlock(&d->mutex);
103+
uv_sleep(1000);
104+
uv_mutex_lock(&d->mutex);
105+
106+
d->n_tasks_exited++;
107+
uv_mutex_unlock(&d->mutex);
108+
}
109+
110+
static void Complete(napi_env env, napi_status status, void* data) {
111+
async_carrier* c = (async_carrier*) data;
112+
async_shared_data* d = c->shared_data;
113+
114+
if (status != napi_ok) {
115+
napi_throw_type_error(env, NULL, "Execute callback failed.");
116+
return;
117+
}
118+
119+
uv_mutex_lock(&d->mutex);
120+
assert(threadpool_desaturating(d));
121+
d->n_tasks_finalized++;
122+
print_info("finalize", c);
123+
if (all_tasks_finalized(d)) {
124+
uv_mutex_unlock(&d->mutex);
125+
uv_mutex_destroy(&d->mutex);
126+
free(d);
127+
} else {
128+
uv_mutex_unlock(&d->mutex);
129+
}
130+
131+
NODE_API_CALL_RETURN_VOID(env, napi_delete_async_work(env, c->request));
132+
free(c);
133+
}
134+
135+
static napi_value Test(napi_env env, napi_callback_info info) {
136+
size_t argc = 1;
137+
napi_value argv[1];
138+
napi_value this;
139+
void* data;
140+
NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, &this, &data));
141+
NODE_API_ASSERT(env, argc >= 1, "Not enough arguments, expected 1.");
142+
143+
async_shared_data* shared_data = calloc(1, sizeof(async_shared_data));
144+
assert(shared_data != NULL);
145+
int ret = uv_mutex_init(&shared_data->mutex);
146+
assert(ret == 0);
147+
148+
napi_valuetype t;
149+
NODE_API_CALL(env, napi_typeof(env, argv[0], &t));
150+
NODE_API_ASSERT(env, t == napi_number,
151+
"Wrong first argument, integer expected.");
152+
NODE_API_CALL(env,
153+
napi_get_value_uint32(env, argv[0], &shared_data->threadpool_size));
154+
155+
napi_value resource_name;
156+
NODE_API_CALL(env, napi_create_string_utf8(
157+
env, "TestResource", NAPI_AUTO_LENGTH, &resource_name));
158+
159+
for (uint32_t i = 0; i <= shared_data->threadpool_size; i++) {
160+
async_carrier* carrier = malloc(sizeof(async_carrier));
161+
assert(carrier != NULL);
162+
carrier->task_id = i;
163+
carrier->shared_data = shared_data;
164+
NODE_API_CALL(env, napi_create_async_work(env, NULL, resource_name,
165+
Execute, Complete, carrier, &carrier->request));
166+
NODE_API_CALL(env, napi_queue_async_work(env, carrier->request));
167+
}
168+
169+
return NULL;
170+
}
171+
172+
static napi_value Init(napi_env env, napi_value exports) {
173+
napi_property_descriptor desc = DECLARE_NODE_API_PROPERTY("test", Test);
174+
NODE_API_CALL(env, napi_define_properties(env, exports, 1, &desc));
175+
return exports;
176+
}
177+
178+
NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)

0 commit comments

Comments
 (0)