Skip to content

Commit 71ad471

Browse files
vianplstefanhaRH
authored andcommitted
util/event-loop-base: Introduce options to set the thread pool size
The thread pool regulates itself: when idle, it kills threads until empty, when in demand, it creates new threads until full. This behaviour doesn't play well with latency sensitive workloads where the price of creating a new thread is too high. For example, when paired with qemu's '-mlock', or using safety features like SafeStack, creating a new thread has been measured take multiple milliseconds. In order to mitigate this let's introduce a new 'EventLoopBase' property to set the thread pool size. The threads will be created during the pool's initialization or upon updating the property's value, remain available during its lifetime regardless of demand, and destroyed upon freeing it. A properly characterized workload will then be able to configure the pool to avoid any latency spikes. Signed-off-by: Nicolas Saenz Julienne <[email protected]> Reviewed-by: Stefan Hajnoczi <[email protected]> Acked-by: Markus Armbruster <[email protected]> Message-id: [email protected] Signed-off-by: Stefan Hajnoczi <[email protected]>
1 parent 70ac26b commit 71ad471

10 files changed

+133
-5
lines changed

event-loop-base.c

+23
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,30 @@
1414
#include "qemu/osdep.h"
1515
#include "qom/object_interfaces.h"
1616
#include "qapi/error.h"
17+
#include "block/thread-pool.h"
1718
#include "sysemu/event-loop-base.h"
1819

1920
typedef struct {
2021
const char *name;
2122
ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
2223
} EventLoopBaseParamInfo;
2324

25+
static void event_loop_base_instance_init(Object *obj)
26+
{
27+
EventLoopBase *base = EVENT_LOOP_BASE(obj);
28+
29+
base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
30+
}
31+
2432
static EventLoopBaseParamInfo aio_max_batch_info = {
2533
"aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
2634
};
35+
static EventLoopBaseParamInfo thread_pool_min_info = {
36+
"thread-pool-min", offsetof(EventLoopBase, thread_pool_min),
37+
};
38+
static EventLoopBaseParamInfo thread_pool_max_info = {
39+
"thread-pool-max", offsetof(EventLoopBase, thread_pool_max),
40+
};
2741

2842
static void event_loop_base_get_param(Object *obj, Visitor *v,
2943
const char *name, void *opaque, Error **errp)
@@ -95,12 +109,21 @@ static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
95109
event_loop_base_get_param,
96110
event_loop_base_set_param,
97111
NULL, &aio_max_batch_info);
112+
object_class_property_add(klass, "thread-pool-min", "int",
113+
event_loop_base_get_param,
114+
event_loop_base_set_param,
115+
NULL, &thread_pool_min_info);
116+
object_class_property_add(klass, "thread-pool-max", "int",
117+
event_loop_base_get_param,
118+
event_loop_base_set_param,
119+
NULL, &thread_pool_max_info);
98120
}
99121

100122
static const TypeInfo event_loop_base_info = {
101123
.name = TYPE_EVENT_LOOP_BASE,
102124
.parent = TYPE_OBJECT,
103125
.instance_size = sizeof(EventLoopBase),
126+
.instance_init = event_loop_base_instance_init,
104127
.class_size = sizeof(EventLoopBaseClass),
105128
.class_init = event_loop_base_class_init,
106129
.abstract = true,

include/block/aio.h

+10
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ struct AioContext {
192192
QSLIST_HEAD(, Coroutine) scheduled_coroutines;
193193
QEMUBH *co_schedule_bh;
194194

195+
int thread_pool_min;
196+
int thread_pool_max;
195197
/* Thread pool for performing work and receiving completion callbacks.
196198
* Has its own locking.
197199
*/
@@ -769,4 +771,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
769771
void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
770772
Error **errp);
771773

774+
/**
775+
* aio_context_set_thread_pool_params:
776+
* @ctx: the aio context
777+
* @min: min number of threads to have readily available in the thread pool
778+
* @min: max number of threads the thread pool can contain
779+
*/
780+
void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
781+
int64_t max, Error **errp);
772782
#endif

include/block/thread-pool.h

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
#include "block/block.h"
2222

23+
#define THREAD_POOL_MAX_THREADS_DEFAULT 64
24+
2325
typedef int ThreadPoolFunc(void *opaque);
2426

2527
typedef struct ThreadPool ThreadPool;
@@ -33,5 +35,6 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
3335
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
3436
ThreadPoolFunc *func, void *arg);
3537
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
38+
void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
3639

3740
#endif

include/sysemu/event-loop-base.h

+4
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,9 @@ struct EventLoopBase {
3333

3434
/* AioContext AIO engine parameters */
3535
int64_t aio_max_batch;
36+
37+
/* AioContext thread pool parameters */
38+
int64_t thread_pool_min;
39+
int64_t thread_pool_max;
3640
};
3741
#endif

iothread.c

+3
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
174174
aio_context_set_aio_params(iothread->ctx,
175175
iothread->parent_obj.aio_max_batch,
176176
errp);
177+
178+
aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
179+
base->thread_pool_max, errp);
177180
}
178181

179182

qapi/qom.json

+9-1
Original file line numberDiff line numberDiff line change
@@ -508,10 +508,18 @@
508508
# 0 means that the engine will use its default.
509509
# (default: 0)
510510
#
511+
# @thread-pool-min: minimum number of threads reserved in the thread pool
512+
# (default:0)
513+
#
514+
# @thread-pool-max: maximum number of threads the thread pool can contain
515+
# (default:64)
516+
#
511517
# Since: 7.1
512518
##
513519
{ 'struct': 'EventLoopBaseProperties',
514-
'data': { '*aio-max-batch': 'int' } }
520+
'data': { '*aio-max-batch': 'int',
521+
'*thread-pool-min': 'int',
522+
'*thread-pool-max': 'int' } }
515523

516524
##
517525
# @IothreadProperties:

util/aio-posix.c

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include "qemu/osdep.h"
1717
#include "block/block.h"
18+
#include "block/thread-pool.h"
1819
#include "qemu/main-loop.h"
1920
#include "qemu/rcu.h"
2021
#include "qemu/rcu_queue.h"

util/async.c

+20
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,9 @@ AioContext *aio_context_new(Error **errp)
563563

564564
ctx->aio_max_batch = 0;
565565

566+
ctx->thread_pool_min = 0;
567+
ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
568+
566569
return ctx;
567570
fail:
568571
g_source_destroy(&ctx->source);
@@ -696,3 +699,20 @@ void qemu_set_current_aio_context(AioContext *ctx)
696699
assert(!get_my_aiocontext());
697700
set_my_aiocontext(ctx);
698701
}
702+
703+
void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
704+
int64_t max, Error **errp)
705+
{
706+
707+
if (min > max || !max || min > INT_MAX || max > INT_MAX) {
708+
error_setg(errp, "bad thread-pool-min/thread-pool-max values");
709+
return;
710+
}
711+
712+
ctx->thread_pool_min = min;
713+
ctx->thread_pool_max = max;
714+
715+
if (ctx->thread_pool) {
716+
thread_pool_update_params(ctx->thread_pool, ctx);
717+
}
718+
}

util/main-loop.c

+9
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "sysemu/replay.h"
3131
#include "qemu/main-loop.h"
3232
#include "block/aio.h"
33+
#include "block/thread-pool.h"
3334
#include "qemu/error-report.h"
3435
#include "qemu/queue.h"
3536
#include "qemu/compiler.h"
@@ -187,12 +188,20 @@ int qemu_init_main_loop(Error **errp)
187188

188189
static void main_loop_update_params(EventLoopBase *base, Error **errp)
189190
{
191+
ERRP_GUARD();
192+
190193
if (!qemu_aio_context) {
191194
error_setg(errp, "qemu aio context not ready");
192195
return;
193196
}
194197

195198
aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
199+
if (*errp) {
200+
return;
201+
}
202+
203+
aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min,
204+
base->thread_pool_max, errp);
196205
}
197206

198207
MainLoop *mloop;

util/thread-pool.c

+51-4
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ struct ThreadPool {
5858
QemuMutex lock;
5959
QemuCond worker_stopped;
6060
QemuSemaphore sem;
61-
int max_threads;
6261
QEMUBH *new_thread_bh;
6362

6463
/* The following variables are only accessed from one AioContext. */
@@ -71,8 +70,27 @@ struct ThreadPool {
7170
int new_threads; /* backlog of threads we need to create */
7271
int pending_threads; /* threads created but not running yet */
7372
bool stopping;
73+
int min_threads;
74+
int max_threads;
7475
};
7576

77+
static inline bool back_to_sleep(ThreadPool *pool, int ret)
78+
{
79+
/*
80+
* The semaphore timed out, we should exit the loop except when:
81+
* - There is work to do, we raced with the signal.
82+
* - The max threads threshold just changed, we raced with the signal.
83+
* - The thread pool forces a minimum number of readily available threads.
84+
*/
85+
if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
86+
pool->cur_threads > pool->max_threads ||
87+
pool->cur_threads <= pool->min_threads)) {
88+
return true;
89+
}
90+
91+
return false;
92+
}
93+
7694
static void *worker_thread(void *opaque)
7795
{
7896
ThreadPool *pool = opaque;
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
91109
ret = qemu_sem_timedwait(&pool->sem, 10000);
92110
qemu_mutex_lock(&pool->lock);
93111
pool->idle_threads--;
94-
} while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
95-
if (ret == -1 || pool->stopping) {
112+
} while (back_to_sleep(pool, ret));
113+
if (ret == -1 || pool->stopping ||
114+
pool->cur_threads > pool->max_threads) {
96115
break;
97116
}
98117

@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
294313
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
295314
}
296315

316+
void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
317+
{
318+
qemu_mutex_lock(&pool->lock);
319+
320+
pool->min_threads = ctx->thread_pool_min;
321+
pool->max_threads = ctx->thread_pool_max;
322+
323+
/*
324+
* We either have to:
325+
* - Increase the number available of threads until over the min_threads
326+
* threshold.
327+
* - Decrease the number of available threads until under the max_threads
328+
* threshold.
329+
* - Do nothing. The current number of threads fall in between the min and
330+
* max thresholds. We'll let the pool manage itself.
331+
*/
332+
for (int i = pool->cur_threads; i < pool->min_threads; i++) {
333+
spawn_thread(pool);
334+
}
335+
336+
for (int i = pool->cur_threads; i > pool->max_threads; i--) {
337+
qemu_sem_post(&pool->sem);
338+
}
339+
340+
qemu_mutex_unlock(&pool->lock);
341+
}
342+
297343
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
298344
{
299345
if (!ctx) {
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
306352
qemu_mutex_init(&pool->lock);
307353
qemu_cond_init(&pool->worker_stopped);
308354
qemu_sem_init(&pool->sem, 0);
309-
pool->max_threads = 64;
310355
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
311356

312357
QLIST_INIT(&pool->head);
313358
QTAILQ_INIT(&pool->request_list);
359+
360+
thread_pool_update_params(pool, ctx);
314361
}
315362

316363
ThreadPool *thread_pool_new(AioContext *ctx)

0 commit comments

Comments
 (0)