Skip to content

Fix unsafe usage of Base.Condition #234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 17, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ on:
jobs:
reverse_CI_JuliaDB:
runs-on: ${{ matrix.os }}
continue-on-error: true
strategy:
fail-fast: false
matrix:
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "Dagger"
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
version = "0.11.4"
version = "0.11.5"

[deps]
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
6 changes: 6 additions & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -8,6 +8,12 @@ platform:
- x86 # 32-bit
- x64 # 64-bit

matrix:
allow_failures:
# Windows is too unreliable right now
- platform: x86
- platform: x64

branches:
only:
- master
25 changes: 13 additions & 12 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
@@ -302,12 +302,11 @@ function cleanup_proc(state, p)
end
end

"Process-local count of actively-executing Dagger tasks per processor type."
const ACTIVE_TASKS = Dict{UInt64,Dict{Type,Ref{UInt}}}()
const ACTIVE_TASKS_LOCK = ReentrantLock()
"Process-local condition variable (and lock) indicating task completion."
const TASK_SYNC = Threads.Condition()

"Process-local condition variable indicating task completion."
const TASK_SYNC = Condition()
"Process-local dictionary tracking per-processor total utilization."
const PROC_UTILIZATION = Dict{UInt64,Dict{Type,Ref{UInt}}}()

"""
MaxUtilization
@@ -876,19 +875,19 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c
end

# Check if we'll go over capacity from running this thunk
real_util = lock(ACTIVE_TASKS_LOCK) do
AT = get!(()->Dict{Type,Ref{UInt}}(), ACTIVE_TASKS, uid)
real_util = lock(TASK_SYNC) do
AT = get!(()->Dict{Type,Ref{UInt}}(), PROC_UTILIZATION, uid)
get!(()->Ref{UInt}(UInt(0)), AT, typeof(to_proc))
end
cap = UInt(capacity(OSProc(), typeof(to_proc))) * UInt(1e9)
while true
lock(ACTIVE_TASKS_LOCK)
lock(TASK_SYNC)
if ((extra_util isa MaxUtilization) && (real_util[] > 0)) ||
((extra_util isa Real) && (extra_util + real_util[] > cap))
# Fully subscribed, wait and re-check
@debug "($(myid())) $f ($thunk_id) Waiting for free $(typeof(to_proc)): $extra_util | $(real_util[])/$cap"
unlock(ACTIVE_TASKS_LOCK)
wait(TASK_SYNC)
unlock(TASK_SYNC)
else
# Under-subscribed, calculate extra utilization and execute thunk
@debug "($(myid())) ($thunk_id) Using available $to_proc: $extra_util | $(real_util[])/$cap"
@@ -898,7 +897,7 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c
extra_util
end
real_util[] += extra_util
unlock(ACTIVE_TASKS_LOCK)
unlock(TASK_SYNC)
break
end
end
@@ -926,11 +925,13 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c
end
threadtime = cputhreadtime() - threadtime_start
@dbg timespan_end(ctx, :compute, thunk_id, (f, to_proc))
lock(ACTIVE_TASKS_LOCK) do
lock(TASK_SYNC) do
real_util[] -= extra_util
end
@debug "($(myid())) ($thunk_id) Releasing $(typeof(to_proc)): $extra_util | $(real_util[])/$cap"
notify(TASK_SYNC)
lock(TASK_SYNC) do
notify(TASK_SYNC)
end
metadata = (
pressure=real_util[],
loadavg=((Sys.loadavg()...,) ./ Sys.CPU_THREADS),
4 changes: 2 additions & 2 deletions src/sch/eager.jl
Original file line number Diff line number Diff line change
@@ -33,8 +33,8 @@ end
the specified amount."
function adjust_pressure!(h::SchedulerHandle, proctype::Type, pressure)
uid = Dagger.get_tls().sch_uid
lock(ACTIVE_TASKS_LOCK) do
ACTIVE_TASKS[uid][proctype][] += pressure
lock(TASK_SYNC) do
PROC_UTILIZATION[uid][proctype][] += pressure
notify(TASK_SYNC)
end
exec!(_adjust_pressure!, h, myid(), proctype, pressure)