Skip to content

Add precompilation via PrecompileTools #446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

julia_version = "1.8.5"
manifest_format = "2.0"
project_hash = "c7130869591b4f985941e8a5c1d2a572f2f59175"
project_hash = "5333a6c200b6e6add81c46547527f66ddc0dc16c"

[[deps.Artifacts]]
uuid = "56f22d72-fd6d-98f1-02f0-08ddc0907c33"
Expand Down Expand Up @@ -137,6 +137,18 @@ git-tree-sha1 = "2e73fe17cac3c62ad1aebe70d44c963c3cfdc3e3"
uuid = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
version = "1.6.2"

[[deps.PrecompileTools]]
deps = ["Preferences"]
git-tree-sha1 = "03b4c25b43cb84cee5c90aa9b5ea0a78fd848d2f"
uuid = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
version = "1.2.0"

[[deps.Preferences]]
deps = ["TOML"]
git-tree-sha1 = "00805cd429dcb4870060ff49ef443486c262e38e"
uuid = "21216c6a-2e73-6563-6e65-726566657250"
version = "1.4.1"

[[deps.Printf]]
deps = ["Unicode"]
uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7"
Expand Down Expand Up @@ -201,6 +213,11 @@ git-tree-sha1 = "1d77abd07f617c4868c33d4f5b9e1dbb2643c9cf"
uuid = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
version = "0.34.2"

[[deps.TOML]]
deps = ["Dates"]
uuid = "fa267f1f-6049-4f14-aa54-33bafae1ed76"
version = "1.0.0"

[[deps.Test]]
deps = ["InteractiveUtils", "Logging", "Random", "Serialization"]
uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Expand Down
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Requires = "ae029012-a4dd-5104-9daa-d747884805df"
Expand All @@ -24,6 +25,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
DataStructures = "0.18"
MacroTools = "0.5"
MemPool = "0.4.4"
PrecompileTools = "1.2"
Requires = "1"
ScopedValues = "1.1"
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33, 0.34"
Expand Down
4 changes: 4 additions & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ include("ui/gantt-text.jl")
# Logging
include("lib/logging-events.jl")

# Precompilation
using PrecompileTools
include("precompile.jl")

function __init__()
# Initialize system UUID
system_uuid()
Expand Down
32 changes: 32 additions & 0 deletions src/precompile.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
@compile_workload begin
system_uuid()
add_processor_callback!("__cpu_thread_1__") do
ThreadProc(1, 1)
end
t1 = @spawn 1+1
t2 = spawn(+, 1, t1)
fetch(t2)
spawn() do
Sch.halt!(sch_handle())
end
while Sch.EAGER_INIT[]
sleep(0.1)
end
Sch.EAGER_CONTEXT[] = nothing
GC.gc()
yield()
lock(Sch.ERRORMONITOR_TRACKED) do tracked
if all(t->istaskdone(t) || istaskfailed(t), tracked)
empty!(tracked)
return
end
for t in tracked
Base.throwto(t, InterruptException())
end
end
MemPool.exit_hook()
GC.gc()
yield()
@assert isempty(Sch.WORKER_MONITOR_CHANS)
@assert isempty(Sch.WORKER_MONITOR_TASKS)
end
62 changes: 42 additions & 20 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -339,30 +339,39 @@ function init_proc(state, p, log_sink)

state.worker_loadavg[p.pid] = (0.0, 0.0, 0.0)
end
lock(WORKER_MONITOR_LOCK) do
wid = p.pid
if !haskey(WORKER_MONITOR_TASKS, wid)
t = @async begin
try
# Wait until this connection is terminated
remotecall_fetch(sleep, wid, typemax(UInt64))
catch err
if err isa ProcessExitedException
if p.pid != 1
lock(WORKER_MONITOR_LOCK) do
wid = p.pid
if !haskey(WORKER_MONITOR_TASKS, wid)
t = @async begin
try
# Wait until this connection is terminated
remotecall_fetch(sleep, wid, typemax(UInt64))
catch err
# TODO: Report other kinds of errors? IOError, etc.
#if !(err isa ProcessExitedException)
#end
finally
lock(WORKER_MONITOR_LOCK) do
d = WORKER_MONITOR_CHANS[wid]
for uid in keys(d)
put!(d[uid], (wid, OSProc(wid), nothing, (ProcessExitedException(wid), nothing)))
try
put!(d[uid], (wid, OSProc(wid), nothing, (ProcessExitedException(wid), nothing)))
catch
end
end
empty!(d)
delete!(WORKER_MONITOR_CHANS, wid)
delete!(WORKER_MONITOR_TASKS, wid)
end
end
end
errormonitor_tracked(t)
WORKER_MONITOR_TASKS[wid] = t
WORKER_MONITOR_CHANS[wid] = Dict{UInt64,RemoteChannel}()
end
WORKER_MONITOR_TASKS[wid] = t
WORKER_MONITOR_CHANS[wid] = Dict{UInt64,RemoteChannel}()
WORKER_MONITOR_CHANS[wid][state.uid] = state.chan
end
WORKER_MONITOR_CHANS[wid][state.uid] = state.chan
end

# Setup worker-to-scheduler channels
Expand All @@ -379,18 +388,26 @@ function init_proc(state, p, log_sink)
end
function _cleanup_proc(uid, log_sink)
empty!(CHUNK_CACHE) # FIXME: Should be keyed on uid!
proc_states(uid) do states
for (proc, state) in states
istate = state.state
istate.done[] = true
notify(istate.reschedule)
end
empty!(states)
end
end
function cleanup_proc(state, p, log_sink)
ctx = Context(Int[]; log_sink)
timespan_start(ctx, :cleanup_proc, p.pid, 0)
wid = p.pid
timespan_start(ctx, :cleanup_proc, wid, 0)
lock(WORKER_MONITOR_LOCK) do
wid = p.pid
if haskey(WORKER_MONITOR_CHANS, wid)
delete!(WORKER_MONITOR_CHANS[wid], state.uid)
remote_do(_cleanup_proc, wid, state.uid, log_sink)
end
end
timespan_finish(ctx, :cleanup_proc, p.pid, 0)
remote_do(_cleanup_proc, wid, state.uid, log_sink)
timespan_finish(ctx, :cleanup_proc, wid, 0)
end

"Process-local condition variable (and lock) indicating task completion."
Expand Down Expand Up @@ -1096,6 +1113,7 @@ struct ProcessorInternalState
tasks::Dict{Int,Task}
proc_occupancy::Base.RefValue{UInt32}
time_pressure::Base.RefValue{UInt64}
done::Base.RefValue{Bool}
end
struct ProcessorState
state::ProcessorInternalState
Expand Down Expand Up @@ -1144,6 +1162,9 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
reset(istate.reschedule)
end
timespan_finish(ctx, :proc_run_wait, to_proc, nothing)
if istate.done[]
return
end
end

# Fetch a new task to execute
Expand Down Expand Up @@ -1270,7 +1291,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
else
t.sticky = false
end
tasks[thunk_id] = errormonitor(schedule(t))
tasks[thunk_id] = errormonitor_tracked(schedule(t))
proc_occupancy[] += task_occupancy
time_pressure[] += time_util
end
Expand All @@ -1283,7 +1304,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
else
proc_run_task.sticky = false
end
return errormonitor(schedule(proc_run_task))
return errormonitor_tracked(schedule(proc_run_task))
end

"""
Expand All @@ -1307,7 +1328,8 @@ function do_tasks(to_proc, return_queue, tasks)
istate = ProcessorInternalState(ctx, to_proc,
queue_locked, reschedule,
Dict{Int,Task}(),
Ref(UInt32(0)), Ref(UInt64(0)))
Ref(UInt32(0)), Ref(UInt64(0)),
Ref(false))
runner = start_processor_runner!(istate, uid, return_queue)
@static if VERSION < v"1.9"
reschedule.waiter = runner
Expand Down
11 changes: 6 additions & 5 deletions src/sch/dynamic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ function dynamic_listener!(ctx, state, wid)
task = current_task() # The scheduler's main task
inp_chan, out_chan = state.worker_chans[wid]
listener_task = @async begin
while isopen(inp_chan) && !state.halt.set
while !state.halt.set
tid, f, data = try
take!(inp_chan)
catch err
if !(unwrap_nested_exception(err) isa Union{SchedulerHaltedException,
ProcessExitedException,
InvalidStateException})
iob = IOContext(IOBuffer(), :color=>true)
println(iob, "Error in sending dynamic request:")
println(iob, "Error in receiving dynamic request:")
Base.showerror(iob, err)
Base.show_backtrace(iob, catch_backtrace())
println(iob)
Expand Down Expand Up @@ -87,12 +87,13 @@ function dynamic_listener!(ctx, state, wid)
end
end
end
@async begin
errormonitor_tracked(listener_task)
errormonitor_tracked(@async begin
wait(state.halt)
# TODO: Not sure why we need the @async here, but otherwise we
# don't stop all the listener tasks
@async Base.throwto(listener_task, SchedulerHaltedException())
end
end)
end

## Worker-side methods for dynamic communication
Expand All @@ -113,7 +114,7 @@ end
halt!(h::SchedulerHandle) = exec!(_halt, h, nothing)
function _halt(ctx, state, task, tid, _)
notify(state.halt)
put!(state.chan, (1, nothing, SchedulerHaltedException(), nothing))
put!(state.chan, (1, nothing, nothing, (SchedulerHaltedException(), nothing)))
Base.throwto(task, SchedulerHaltedException())
end

Expand Down
18 changes: 11 additions & 7 deletions src/sch/eager.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const EAGER_INIT = Threads.Atomic{Bool}(false)
const EAGER_READY = Base.Event()
const EAGER_FORCE_KILL = Ref{Bool}(false)
const EAGER_ID_MAP = LockedObject(Dict{UInt64,Int}())
const EAGER_CONTEXT = Ref{Union{Context,Nothing}}(nothing)
const EAGER_STATE = Ref{Union{ComputeState,Nothing}}(nothing)
Expand All @@ -21,13 +20,16 @@ function init_eager()
return
end
ctx = eager_context()
Threads.@spawn try
errormonitor_tracked(Threads.@spawn try
sopts = SchedulerOptions(;allow_errors=true)
opts = Dagger.Options((;scope=Dagger.ExactScope(Dagger.ThreadProc(1, 1)),
occupancy=Dict(Dagger.ThreadProc=>0)))
Dagger.compute(ctx, Dagger.delayed(eager_thunk, opts)();
options=sopts)
catch err
# Scheduler halting is considered normal
err isa SchedulerHaltedException && return

iob = IOContext(IOBuffer(), :color=>true)
println(iob, "Error in eager scheduler:")
Base.showerror(iob, err)
Expand All @@ -37,9 +39,12 @@ function init_eager()
write(stderr, iob)
finally
reset(EAGER_READY)
EAGER_STATE[] = nothing
lock(EAGER_ID_MAP) do id_map
empty!(id_map)
end
Threads.atomic_xchg!(EAGER_INIT, false)
EAGER_FORCE_KILL[] = true
end
end)
wait(EAGER_READY)
end
function eager_thunk()
Expand All @@ -48,8 +53,7 @@ function eager_thunk()
return
end
notify(EAGER_READY)
sleep(typemax(UInt))
error("eager_thunk exited")
wait(Dagger.Sch.EAGER_STATE[].halt)
end

"""
Expand Down Expand Up @@ -97,7 +101,7 @@ function thunk_yield(f)
end

eager_cleanup(t::Dagger.EagerThunkFinalizer) =
Threads.@spawn eager_cleanup(EAGER_STATE[], t.uid)
errormonitor_tracked(Threads.@spawn eager_cleanup(EAGER_STATE[], t.uid))
function eager_cleanup(state, uid)
tid = nothing
lock(EAGER_ID_MAP) do id_map
Expand Down
22 changes: 22 additions & 0 deletions src/sch/util.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
"Like `errormonitor`, but tracks how many outstanding tasks are running."
function errormonitor_tracked(t::Task)
errormonitor(t)
lock(ERRORMONITOR_TRACKED) do tracked
push!(tracked, t)
end
errormonitor(Threads.@spawn begin
try
wait(t)
finally
lock(ERRORMONITOR_TRACKED) do tracked
idx = findfirst(o->o===t, tracked)
# N.B. This may be nothing if precompile emptied these
if idx !== nothing
deleteat!(tracked, idx)
end
end
end
end)
end
const ERRORMONITOR_TRACKED = LockedObject(Task[])

"""
unwrap_nested_exception(err::Exception) -> Bool

Expand Down