From 453797e9a8272e4563b82fc1fef86e9715123596 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Wed, 10 Apr 2024 10:29:46 -0700 Subject: [PATCH 1/3] precompile: Reset thunk ID counter to 1 --- src/precompile.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/precompile.jl b/src/precompile.jl index 0ce69f852..ed5e9c4bf 100644 --- a/src/precompile.jl +++ b/src/precompile.jl @@ -49,4 +49,5 @@ yield() @assert isempty(Sch.WORKER_MONITOR_CHANS) @assert isempty(Sch.WORKER_MONITOR_TASKS) + ID_COUNTER[] = 1 end From 6bf297c9e2c0531e6e12a96a8ba3d6a9d813b2b6 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Wed, 10 Apr 2024 10:32:53 -0700 Subject: [PATCH 2/3] Add cancel! for cancelling tasks --- docs/src/task-spawning.md | 28 +++++++++ src/Dagger.jl | 1 + src/cancellation.jl | 128 ++++++++++++++++++++++++++++++++++++++ src/sch/Sch.jl | 27 +++++++- src/threadproc.jl | 6 ++ src/utils/dagdebug.jl | 2 +- test/scheduler.jl | 11 ++++ 7 files changed, 199 insertions(+), 4 deletions(-) create mode 100644 src/cancellation.jl diff --git a/docs/src/task-spawning.md b/docs/src/task-spawning.md index e54e9f162..85348f2a6 100644 --- a/docs/src/task-spawning.md +++ b/docs/src/task-spawning.md @@ -141,6 +141,34 @@ but can be enabled by setting the scheduler/thunk option ([Scheduler and Thunk o non-dynamic usecases, since any thunk failure will propagate down to the output thunk regardless of where it occurs. +## Cancellation + +Sometimes a task runs longer than expected (maybe it's hanging due to a bug), +or the user decides that they don't want to wait on a task to run to +completion. In these cases, Dagger provides the `Dagger.cancel!` function, +which allows for stopping a task while it's running, or terminating it before +it gets the chance to start running. + +```julia +t = Dagger.@spawn sleep(1000) +# We're bored, let's cancel `t` +Dagger.cancel!(t) +``` + +`Dagger.cancel!` is generally safe to call, as it will not actually *force* a +task to stop; instead, Dagger will simply "abandon" the task and allow it to +finish on its own in the background, and it will not block the execution of +other `DTask`s that are queued to run. It is possible to force-cancel a task by +doing `Dagger.cancel!(t; force=true)`, but this is generally discouraged, as it +can cause memory leaks, hangs, and segfaults. + +If it's desired to cancel all tasks that are scheduled or running, one can call +`Dagger.cancel!()`, and all tasks will be abandoned (or force-cancelled, if +specified). Additionally, if Dagger's scheduler needs to be restarted for any +reason, one can call `Dagger.cancel!(;halt_sch=true)` to stop the scheduler and +all tasks. The scheduler will be automatically restarted on the next +`@spawn`/`spawn` call. + ## Lazy API Alongside the modern eager API, Dagger also has a legacy lazy API, accessible diff --git a/src/Dagger.jl b/src/Dagger.jl index aabb4012a..d39f2f5e8 100644 --- a/src/Dagger.jl +++ b/src/Dagger.jl @@ -55,6 +55,7 @@ include("thunk.jl") include("submission.jl") include("chunks.jl") include("memory-spaces.jl") +include("cancellation.jl") # Task scheduling include("compute.jl") diff --git a/src/cancellation.jl b/src/cancellation.jl new file mode 100644 index 000000000..f6b75413b --- /dev/null +++ b/src/cancellation.jl @@ -0,0 +1,128 @@ +""" + cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false) + +Cancels `task` at any point in its lifecycle, causing the scheduler to abandon +it. If `force` is `true`, the task will be interrupted with an +`InterruptException` (not recommended, this is unsafe). If `halt_sch` is +`true`, the scheduler will be halted after the task is cancelled (it will +restart automatically upon the next `@spawn`/`spawn` call). + +As an example, the following code will cancel task `t` before it finishes +executing: + +```julia +t = Dagger.@spawn sleep(1000) +# We're bored, let's cancel `t` +Dagger.cancel!(t) +``` + +Cancellation allows the scheduler to free up execution resources for other +tasks which are waiting to run. Using `cancel!` is generally a much safer +alternative to Ctrl+C, as it cooperates with the scheduler and runtime and +avoids unintended side effects. +""" +function cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false) + tid = lock(Dagger.Sch.EAGER_ID_MAP) do id_map + id_map[task.uid] + end + cancel!(tid; force, halt_sch) +end +function cancel!(tid::Union{Int,Nothing}=nothing; + force::Bool=false, halt_sch::Bool=false) + remotecall_fetch(1, tid, force, halt_sch) do tid, force, halt_sch + state = Sch.EAGER_STATE[] + state === nothing && return + @lock state.lock _cancel!(state, tid, force, halt_sch) + end +end +function _cancel!(state, tid, force, halt_sch) + @assert islocked(state.lock) + + # Get the scheduler uid + sch_uid = state.uid + + # Cancel ready tasks + for task in state.ready + tid !== nothing && task.id != tid && continue + @dagdebug tid :cancel "Cancelling ready task" + state.cache[task] = InterruptException() + state.errored[task] = true + Sch.set_failed!(state, task) + end + empty!(state.ready) + + # Cancel waiting tasks + for task in keys(state.waiting) + tid !== nothing && task.id != tid && continue + @dagdebug tid :cancel "Cancelling waiting task" + state.cache[task] = InterruptException() + state.errored[task] = true + Sch.set_failed!(state, task) + end + empty!(state.waiting) + + # Cancel running tasks at the processor level + wids = unique(map(root_worker_id, values(state.running_on))) + for wid in wids + remotecall_fetch(wid, tid, sch_uid, force) do _tid, sch_uid, force + Dagger.Sch.proc_states(sch_uid) do states + for (proc, state) in states + istate = state.state + any_cancelled = false + @lock istate.queue begin + for (tid, task) in istate.tasks + _tid !== nothing && tid != _tid && continue + task_spec = istate.task_specs[tid] + Tf = task_spec[6] + Tf === typeof(Sch.eager_thunk) && continue + istaskdone(task) && continue + any_cancelled = true + @dagdebug tid :cancel "Cancelling running task ($Tf)" + if force + @dagdebug tid :cancel "Interrupting running task ($Tf)" + Threads.@spawn Base.throwto(task, InterruptException()) + else + # Tell the processor to just drop this task + task_occupancy = task_spec[4] + time_util = task_spec[2] + istate.proc_occupancy[] -= task_occupancy + istate.time_pressure[] -= time_util + push!(istate.cancelled, tid) + to_proc = istate.proc + put!(istate.return_queue, (myid(), to_proc, tid, (InterruptException(), nothing))) + end + end + end + if any_cancelled + notify(istate.reschedule) + end + end + end + return + end + end + + if halt_sch + unlock(state.lock) + try + # Give tasks a moment to be processed + sleep(0.5) + + # Halt the scheduler + @dagdebug nothing :cancel "Halting the scheduler" + notify(state.halt) + put!(state.chan, (1, nothing, nothing, (Sch.SchedulerHaltedException(), nothing))) + + # Wait for the scheduler to halt + @dagdebug nothing :cancel "Waiting for scheduler to halt" + while Sch.EAGER_INIT[] + sleep(0.1) + end + @dagdebug nothing :cancel "Scheduler halted" + finally + lock(state.lock) + end + end + + return +end diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index b60987cd4..3cd16be12 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -652,6 +652,8 @@ function scheduler_exit(ctx, state::ComputeState, options) lock(ctx.proc_notify) do notify(ctx.proc_notify) end + + @dagdebug nothing :global "Tore down scheduler" uid=state.uid end function procs_to_use(ctx, options=ctx.options) @@ -1161,11 +1163,14 @@ Base.hash(key::TaskSpecKey, h::UInt) = hash(key.task_id, hash(TaskSpecKey, h)) struct ProcessorInternalState ctx::Context proc::Processor + return_queue::RemoteChannel queue::LockedObject{PriorityQueue{TaskSpecKey, UInt32, Base.Order.ForwardOrdering}} reschedule::Doorbell tasks::Dict{Int,Task} + task_specs::Dict{Int,Vector{Any}} proc_occupancy::Base.RefValue{UInt32} time_pressure::Base.RefValue{UInt64} + cancelled::Set{Int} done::Base.RefValue{Bool} end struct ProcessorState @@ -1314,6 +1319,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re # Execute the task and return its result t = @task begin + was_cancelled = false result = try do_task(to_proc, task) catch err @@ -1322,11 +1328,23 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re finally lock(istate.queue) do _ delete!(tasks, thunk_id) - proc_occupancy[] -= task_occupancy - time_pressure[] -= time_util + delete!(istate.task_specs, thunk_id) + if !(thunk_id in istate.cancelled) + proc_occupancy[] -= task_occupancy + time_pressure[] -= time_util + else + # Task was cancelled, so occupancy and pressure are + # already reduced + pop!(istate.cancelled, thunk_id) + was_cancelled = true + end end notify(istate.reschedule) end + if was_cancelled + # A result was already posted to the return queue + return + end try put!(return_queue, (myid(), to_proc, thunk_id, result)) catch err @@ -1345,6 +1363,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re t.sticky = false end tasks[thunk_id] = errormonitor_tracked("thunk $thunk_id", schedule(t)) + istate.task_specs[thunk_id] = task proc_occupancy[] += task_occupancy time_pressure[] += time_util end @@ -1377,10 +1396,12 @@ function do_tasks(to_proc, return_queue, tasks) queue = PriorityQueue{TaskSpecKey, UInt32}() queue_locked = LockedObject(queue) reschedule = Doorbell() - istate = ProcessorInternalState(ctx, to_proc, + istate = ProcessorInternalState(ctx, to_proc, return_queue, queue_locked, reschedule, Dict{Int,Task}(), + Dict{Int,Vector{Any}}(), Ref(UInt32(0)), Ref(UInt64(0)), + Set{Int}(), Ref(false)) runner = start_processor_runner!(istate, uid, return_queue) @static if VERSION < v"1.9" diff --git a/src/threadproc.jl b/src/threadproc.jl index e4f25363f..09099889a 100644 --- a/src/threadproc.jl +++ b/src/threadproc.jl @@ -26,6 +26,12 @@ function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...); @n fetch(task) return result[] catch err + if err isa InterruptException + if !istaskdone(task) + # Propagate cancellation signal + Threads.@spawn Base.throwto(task, InterruptException()) + end + end err, frames = Base.current_exceptions(task)[1] rethrow(CapturedException(err, frames)) end diff --git a/src/utils/dagdebug.jl b/src/utils/dagdebug.jl index 83c97d67b..9a9d24167 100644 --- a/src/utils/dagdebug.jl +++ b/src/utils/dagdebug.jl @@ -2,7 +2,7 @@ function istask end function task_id end const DAGDEBUG_CATEGORIES = Symbol[:global, :submit, :schedule, :scope, - :take, :execute, :move, :processor] + :take, :execute, :move, :processor, :cancel] macro dagdebug(thunk, category, msg, args...) cat_sym = category.value @gensym id diff --git a/test/scheduler.jl b/test/scheduler.jl index 0e7bf046f..b9fe01872 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -535,3 +535,14 @@ end end end end + +@testset "Cancellation" begin + t = Dagger.@spawn scope=Dagger.scope(worker=1, thread=1) sleep(100) + start_time = time_ns() + Dagger.cancel!(t) + @test_throws_unwrap Dagger.DTaskFailedException fetch(t) + t = Dagger.@spawn scope=Dagger.scope(worker=1, thread=1) yield() + fetch(t) + finish_time = time_ns() + @test (finish_time - start_time) * 1e-9 < 100 +end From 58ee8669fe1ca1159051293f064c16d6b3f4a855 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Tue, 13 Aug 2024 13:27:11 -0500 Subject: [PATCH 3/3] Sch: Exit properly on scheduler failure --- src/sch/eager.jl | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/sch/eager.jl b/src/sch/eager.jl index 1ee72829f..87a109788 100644 --- a/src/sch/eager.jl +++ b/src/sch/eager.jl @@ -17,6 +17,9 @@ function init_eager() end if Threads.atomic_xchg!(EAGER_INIT, true) wait(EAGER_READY) + if EAGER_STATE[] === nothing + throw(ConcurrencyViolationError("Eager scheduler failed to start")) + end return end ctx = eager_context() @@ -38,14 +41,19 @@ function init_eager() seek(iob.io, 0) write(stderr, iob) finally - reset(EAGER_READY) + # N.B. Sequence order matters to ensure that observers can see that we failed to start EAGER_STATE[] = nothing + notify(EAGER_READY) + reset(EAGER_READY) lock(EAGER_ID_MAP) do id_map empty!(id_map) end Threads.atomic_xchg!(EAGER_INIT, false) end) wait(EAGER_READY) + if EAGER_STATE[] === nothing + throw(ConcurrencyViolationError("Eager scheduler failed to start")) + end end function eager_thunk() exec!(Dagger.sch_handle()) do ctx, state, task, tid, _