Skip to content

Commit d76ac5d

Browse files
committed
remove threadedregion and move jl_threading_run to julia
1 parent 53beb34 commit d76ac5d

File tree

5 files changed

+23
-115
lines changed

5 files changed

+23
-115
lines changed

base/threadingconstructs.jl

+18-23
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,17 @@ on `threadid()`.
1818
"""
1919
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))
2020

21-
# Only read/written by the main thread
22-
const in_threaded_loop = Ref(false)
23-
2421
function _threadsfor(iter,lbody)
2522
lidx = iter.args[1] # index
2623
range = iter.args[2]
2724
quote
28-
local threadsfor_fun
2925
let range = $(esc(range))
30-
function threadsfor_fun(onethread=false)
26+
function threadsfor_fun(tid)
3127
r = range # Load into local variable
3228
lenr = length(r)
33-
# divide loop iterations among threads
34-
if onethread
35-
tid = 1
36-
len, rem = lenr, 0
37-
else
38-
tid = threadid()
39-
len, rem = divrem(lenr, nthreads())
40-
end
29+
# divide loop iterations among tasks
30+
ngrains = min(nthreads(), lenr)
31+
len, rem = divrem(lenr, ngrains)
4132
# not enough iterations for all the threads?
4233
if len == 0
4334
if tid > rem
@@ -64,21 +55,25 @@ function _threadsfor(iter,lbody)
6455
$(esc(lbody))
6556
end
6657
end
67-
end
68-
# Hack to make nested threaded loops kinda work
69-
if threadid() != 1 || in_threaded_loop[]
70-
# We are in a nested threaded loop
71-
Base.invokelatest(threadsfor_fun, true)
72-
else
73-
in_threaded_loop[] = true
74-
# the ccall is not expected to throw
75-
ccall(:jl_threading_run, Cvoid, (Any,), threadsfor_fun)
76-
in_threaded_loop[] = false
58+
threading_run(threadsfor_fun, length(range))
7759
end
7860
nothing
7961
end
8062
end
8163

64+
function threading_run(func, len)
65+
ngrains = min(nthreads(), len)
66+
tasks = Vector{Task}(undef, ngrains)
67+
for tid = 1:ngrains
68+
t = Task(()->func(tid))
69+
t.sticky = false
70+
tasks[tid] = t
71+
schedule(t)
72+
end
73+
Base.sync_end(tasks)
74+
return nothing
75+
end
76+
8277
"""
8378
Threads.@threads
8479

src/jl_uv.c

+2-4
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,10 @@ JL_DLLEXPORT void jl_uv_req_set_data(uv_req_t *req, void *data) { req->data = da
201201
JL_DLLEXPORT void *jl_uv_handle_data(uv_handle_t *handle) { return handle->data; }
202202
JL_DLLEXPORT void *jl_uv_write_handle(uv_write_t *req) { return req->handle; }
203203

204-
extern volatile unsigned _threadedregion;
205-
206204
JL_DLLEXPORT int jl_run_once(uv_loop_t *loop)
207205
{
208206
jl_ptls_t ptls = jl_get_ptls_states();
209-
if (loop && (_threadedregion || ptls->tid == 0)) {
207+
if (loop) {
210208
jl_gc_safepoint_(ptls);
211209
JL_UV_LOCK();
212210
loop->stop_flag = 0;
@@ -220,7 +218,7 @@ JL_DLLEXPORT int jl_run_once(uv_loop_t *loop)
220218
JL_DLLEXPORT int jl_process_events(uv_loop_t *loop)
221219
{
222220
jl_ptls_t ptls = jl_get_ptls_states();
223-
if (loop && (_threadedregion || ptls->tid == 0)) {
221+
if (loop) {
224222
jl_gc_safepoint_(ptls);
225223
if (jl_mutex_trylock(&jl_uv_mutex)) {
226224
loop->stop_flag = 0;

src/partr.c

+3-14
Original file line numberDiff line numberDiff line change
@@ -414,8 +414,6 @@ static int may_sleep(jl_ptls_t ptls)
414414
return jl_atomic_load(&sleep_check_state) == sleeping && jl_atomic_load(&ptls->sleep_check_state) == sleeping;
415415
}
416416

417-
extern volatile unsigned _threadedregion;
418-
419417
JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
420418
{
421419
jl_ptls_t ptls = jl_get_ptls_states();
@@ -437,7 +435,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
437435
#endif
438436

439437
jl_cpu_pause();
440-
if (sleep_check_after_threshold(&start_cycles) || (!_threadedregion && ptls->tid == 0)) {
438+
if (sleep_check_after_threshold(&start_cycles)) {
441439
if (!sleep_check_now(ptls->tid))
442440
continue;
443441
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
@@ -449,14 +447,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
449447
// although none are allowed to create new ones
450448
// outside of threaded regions, all IO is permitted,
451449
// but only on thread 1
452-
int uvlock = 0;
453-
if (_threadedregion) {
454-
uvlock = jl_mutex_trylock(&jl_uv_mutex);
455-
}
456-
else if (ptls->tid == 0) {
457-
uvlock = 1;
458-
JL_UV_LOCK();
459-
}
450+
int uvlock = jl_mutex_trylock(&jl_uv_mutex);
460451
if (uvlock) {
461452
int active = 1;
462453
if (jl_atomic_load(&jl_uv_n_waiters) != 0) {
@@ -486,9 +477,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
486477
// to the last thread to do an explicit operation,
487478
// which may starve other threads of critical work
488479
}
489-
if (!_threadedregion && active && ptls->tid == 0) {
490-
// thread 0 is the only thread permitted to run the event loop
491-
// so it needs to stay alive
480+
if (active) {
492481
start_cycles = 0;
493482
continue;
494483
}

src/threading.c

-72
Original file line numberDiff line numberDiff line change
@@ -472,78 +472,6 @@ void jl_start_threads(void)
472472

473473
#endif
474474

475-
unsigned volatile _threadedregion; // HACK: keep track of whether it is safe to do IO
476-
477-
// simple fork/join mode code
478-
JL_DLLEXPORT void jl_threading_run(jl_value_t *func)
479-
{
480-
jl_ptls_t ptls = jl_get_ptls_states();
481-
int8_t gc_state = jl_gc_unsafe_enter(ptls);
482-
size_t world = jl_world_counter;
483-
jl_method_instance_t *mfunc = jl_lookup_generic(&func, 1, jl_int32hash_fast(jl_return_address()), world);
484-
// Ignore constant return value for now.
485-
jl_code_instance_t *fptr = jl_compile_method_internal(mfunc, world);
486-
if (fptr->invoke == jl_fptr_const_return)
487-
return;
488-
489-
size_t nthreads = jl_n_threads;
490-
jl_svec_t *ts = jl_alloc_svec(nthreads);
491-
JL_GC_PUSH1(&ts);
492-
jl_value_t *wait_func = jl_get_global(jl_base_module, jl_symbol("wait"));
493-
jl_value_t *schd_func = jl_get_global(jl_base_module, jl_symbol("schedule"));
494-
// create and schedule all tasks
495-
_threadedregion += 1;
496-
for (int i = 0; i < nthreads; i++) {
497-
jl_value_t *args2[2];
498-
args2[0] = (jl_value_t*)jl_task_type;
499-
args2[1] = func;
500-
jl_task_t *t = (jl_task_t*)jl_apply(args2, 2);
501-
jl_svecset(ts, i, t);
502-
t->sticky = 1;
503-
t->tid = i;
504-
args2[0] = schd_func;
505-
args2[1] = (jl_value_t*)t;
506-
jl_apply(args2, 2);
507-
#ifdef JULIA_ENABLE_THREADING
508-
if (i == 1) {
509-
// let threads know work is coming (optimistic)
510-
jl_wakeup_thread(-1);
511-
}
512-
#endif
513-
}
514-
#ifdef JULIA_ENABLE_THREADING
515-
if (nthreads > 2) {
516-
// let threads know work is ready (guaranteed)
517-
jl_wakeup_thread(-1);
518-
}
519-
#endif
520-
// join with all tasks
521-
JL_TRY {
522-
for (int i = 0; i < nthreads; i++) {
523-
jl_value_t *t = jl_svecref(ts, i);
524-
jl_value_t *args[2] = { wait_func, t };
525-
jl_apply(args, 2);
526-
}
527-
}
528-
JL_CATCH {
529-
_threadedregion -= 1;
530-
jl_wake_libuv();
531-
JL_UV_LOCK();
532-
JL_UV_UNLOCK();
533-
jl_rethrow();
534-
}
535-
// make sure no threads are sitting in the event loop
536-
_threadedregion -= 1;
537-
jl_wake_libuv();
538-
// make sure no more callbacks will run while user code continues
539-
// outside thread region and might touch an I/O object.
540-
JL_UV_LOCK();
541-
JL_UV_UNLOCK();
542-
JL_GC_POP();
543-
jl_gc_unsafe_leave(ptls, gc_state);
544-
}
545-
546-
547475
#ifndef JULIA_ENABLE_THREADING
548476

549477
void jl_init_threading(void)

test/threads_exec.jl

-2
Original file line numberDiff line numberDiff line change
@@ -394,11 +394,9 @@ for period in (0.06, Dates.Millisecond(60))
394394
t = Timer(period)
395395
wait(t)
396396
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
397-
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
398397
wait(c)
399398
sleep(period)
400399
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
401-
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
402400
end))
403401
wait(c)
404402
notify(c)

0 commit comments

Comments
 (0)