Skip to content

Commit 6bf297c

Browse files
committed
Add cancel! for cancelling tasks
1 parent 453797e commit 6bf297c

File tree

7 files changed

+199
-4
lines changed

7 files changed

+199
-4
lines changed

Diff for: docs/src/task-spawning.md

+28
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,34 @@ but can be enabled by setting the scheduler/thunk option ([Scheduler and Thunk o
141141
non-dynamic usecases, since any thunk failure will propagate down to the output
142142
thunk regardless of where it occurs.
143143

144+
## Cancellation
145+
146+
Sometimes a task runs longer than expected (maybe it's hanging due to a bug),
147+
or the user decides that they don't want to wait on a task to run to
148+
completion. In these cases, Dagger provides the `Dagger.cancel!` function,
149+
which allows for stopping a task while it's running, or terminating it before
150+
it gets the chance to start running.
151+
152+
```julia
153+
t = Dagger.@spawn sleep(1000)
154+
# We're bored, let's cancel `t`
155+
Dagger.cancel!(t)
156+
```
157+
158+
`Dagger.cancel!` is generally safe to call, as it will not actually *force* a
159+
task to stop; instead, Dagger will simply "abandon" the task and allow it to
160+
finish on its own in the background, and it will not block the execution of
161+
other `DTask`s that are queued to run. It is possible to force-cancel a task by
162+
doing `Dagger.cancel!(t; force=true)`, but this is generally discouraged, as it
163+
can cause memory leaks, hangs, and segfaults.
164+
165+
If it's desired to cancel all tasks that are scheduled or running, one can call
166+
`Dagger.cancel!()`, and all tasks will be abandoned (or force-cancelled, if
167+
specified). Additionally, if Dagger's scheduler needs to be restarted for any
168+
reason, one can call `Dagger.cancel!(;halt_sch=true)` to stop the scheduler and
169+
all tasks. The scheduler will be automatically restarted on the next
170+
`@spawn`/`spawn` call.
171+
144172
## Lazy API
145173

146174
Alongside the modern eager API, Dagger also has a legacy lazy API, accessible

Diff for: src/Dagger.jl

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ include("thunk.jl")
5555
include("submission.jl")
5656
include("chunks.jl")
5757
include("memory-spaces.jl")
58+
include("cancellation.jl")
5859

5960
# Task scheduling
6061
include("compute.jl")

Diff for: src/cancellation.jl

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
"""
2+
cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)
3+
4+
Cancels `task` at any point in its lifecycle, causing the scheduler to abandon
5+
it. If `force` is `true`, the task will be interrupted with an
6+
`InterruptException` (not recommended, this is unsafe). If `halt_sch` is
7+
`true`, the scheduler will be halted after the task is cancelled (it will
8+
restart automatically upon the next `@spawn`/`spawn` call).
9+
10+
As an example, the following code will cancel task `t` before it finishes
11+
executing:
12+
13+
```julia
14+
t = Dagger.@spawn sleep(1000)
15+
# We're bored, let's cancel `t`
16+
Dagger.cancel!(t)
17+
```
18+
19+
Cancellation allows the scheduler to free up execution resources for other
20+
tasks which are waiting to run. Using `cancel!` is generally a much safer
21+
alternative to Ctrl+C, as it cooperates with the scheduler and runtime and
22+
avoids unintended side effects.
23+
"""
24+
function cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)
25+
tid = lock(Dagger.Sch.EAGER_ID_MAP) do id_map
26+
id_map[task.uid]
27+
end
28+
cancel!(tid; force, halt_sch)
29+
end
30+
function cancel!(tid::Union{Int,Nothing}=nothing;
31+
force::Bool=false, halt_sch::Bool=false)
32+
remotecall_fetch(1, tid, force, halt_sch) do tid, force, halt_sch
33+
state = Sch.EAGER_STATE[]
34+
state === nothing && return
35+
@lock state.lock _cancel!(state, tid, force, halt_sch)
36+
end
37+
end
38+
function _cancel!(state, tid, force, halt_sch)
39+
@assert islocked(state.lock)
40+
41+
# Get the scheduler uid
42+
sch_uid = state.uid
43+
44+
# Cancel ready tasks
45+
for task in state.ready
46+
tid !== nothing && task.id != tid && continue
47+
@dagdebug tid :cancel "Cancelling ready task"
48+
state.cache[task] = InterruptException()
49+
state.errored[task] = true
50+
Sch.set_failed!(state, task)
51+
end
52+
empty!(state.ready)
53+
54+
# Cancel waiting tasks
55+
for task in keys(state.waiting)
56+
tid !== nothing && task.id != tid && continue
57+
@dagdebug tid :cancel "Cancelling waiting task"
58+
state.cache[task] = InterruptException()
59+
state.errored[task] = true
60+
Sch.set_failed!(state, task)
61+
end
62+
empty!(state.waiting)
63+
64+
# Cancel running tasks at the processor level
65+
wids = unique(map(root_worker_id, values(state.running_on)))
66+
for wid in wids
67+
remotecall_fetch(wid, tid, sch_uid, force) do _tid, sch_uid, force
68+
Dagger.Sch.proc_states(sch_uid) do states
69+
for (proc, state) in states
70+
istate = state.state
71+
any_cancelled = false
72+
@lock istate.queue begin
73+
for (tid, task) in istate.tasks
74+
_tid !== nothing && tid != _tid && continue
75+
task_spec = istate.task_specs[tid]
76+
Tf = task_spec[6]
77+
Tf === typeof(Sch.eager_thunk) && continue
78+
istaskdone(task) && continue
79+
any_cancelled = true
80+
@dagdebug tid :cancel "Cancelling running task ($Tf)"
81+
if force
82+
@dagdebug tid :cancel "Interrupting running task ($Tf)"
83+
Threads.@spawn Base.throwto(task, InterruptException())
84+
else
85+
# Tell the processor to just drop this task
86+
task_occupancy = task_spec[4]
87+
time_util = task_spec[2]
88+
istate.proc_occupancy[] -= task_occupancy
89+
istate.time_pressure[] -= time_util
90+
push!(istate.cancelled, tid)
91+
to_proc = istate.proc
92+
put!(istate.return_queue, (myid(), to_proc, tid, (InterruptException(), nothing)))
93+
end
94+
end
95+
end
96+
if any_cancelled
97+
notify(istate.reschedule)
98+
end
99+
end
100+
end
101+
return
102+
end
103+
end
104+
105+
if halt_sch
106+
unlock(state.lock)
107+
try
108+
# Give tasks a moment to be processed
109+
sleep(0.5)
110+
111+
# Halt the scheduler
112+
@dagdebug nothing :cancel "Halting the scheduler"
113+
notify(state.halt)
114+
put!(state.chan, (1, nothing, nothing, (Sch.SchedulerHaltedException(), nothing)))
115+
116+
# Wait for the scheduler to halt
117+
@dagdebug nothing :cancel "Waiting for scheduler to halt"
118+
while Sch.EAGER_INIT[]
119+
sleep(0.1)
120+
end
121+
@dagdebug nothing :cancel "Scheduler halted"
122+
finally
123+
lock(state.lock)
124+
end
125+
end
126+
127+
return
128+
end

Diff for: src/sch/Sch.jl

+24-3
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,8 @@ function scheduler_exit(ctx, state::ComputeState, options)
652652
lock(ctx.proc_notify) do
653653
notify(ctx.proc_notify)
654654
end
655+
656+
@dagdebug nothing :global "Tore down scheduler" uid=state.uid
655657
end
656658

657659
function procs_to_use(ctx, options=ctx.options)
@@ -1161,11 +1163,14 @@ Base.hash(key::TaskSpecKey, h::UInt) = hash(key.task_id, hash(TaskSpecKey, h))
11611163
struct ProcessorInternalState
11621164
ctx::Context
11631165
proc::Processor
1166+
return_queue::RemoteChannel
11641167
queue::LockedObject{PriorityQueue{TaskSpecKey, UInt32, Base.Order.ForwardOrdering}}
11651168
reschedule::Doorbell
11661169
tasks::Dict{Int,Task}
1170+
task_specs::Dict{Int,Vector{Any}}
11671171
proc_occupancy::Base.RefValue{UInt32}
11681172
time_pressure::Base.RefValue{UInt64}
1173+
cancelled::Set{Int}
11691174
done::Base.RefValue{Bool}
11701175
end
11711176
struct ProcessorState
@@ -1314,6 +1319,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13141319

13151320
# Execute the task and return its result
13161321
t = @task begin
1322+
was_cancelled = false
13171323
result = try
13181324
do_task(to_proc, task)
13191325
catch err
@@ -1322,11 +1328,23 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13221328
finally
13231329
lock(istate.queue) do _
13241330
delete!(tasks, thunk_id)
1325-
proc_occupancy[] -= task_occupancy
1326-
time_pressure[] -= time_util
1331+
delete!(istate.task_specs, thunk_id)
1332+
if !(thunk_id in istate.cancelled)
1333+
proc_occupancy[] -= task_occupancy
1334+
time_pressure[] -= time_util
1335+
else
1336+
# Task was cancelled, so occupancy and pressure are
1337+
# already reduced
1338+
pop!(istate.cancelled, thunk_id)
1339+
was_cancelled = true
1340+
end
13271341
end
13281342
notify(istate.reschedule)
13291343
end
1344+
if was_cancelled
1345+
# A result was already posted to the return queue
1346+
return
1347+
end
13301348
try
13311349
put!(return_queue, (myid(), to_proc, thunk_id, result))
13321350
catch err
@@ -1345,6 +1363,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13451363
t.sticky = false
13461364
end
13471365
tasks[thunk_id] = errormonitor_tracked("thunk $thunk_id", schedule(t))
1366+
istate.task_specs[thunk_id] = task
13481367
proc_occupancy[] += task_occupancy
13491368
time_pressure[] += time_util
13501369
end
@@ -1377,10 +1396,12 @@ function do_tasks(to_proc, return_queue, tasks)
13771396
queue = PriorityQueue{TaskSpecKey, UInt32}()
13781397
queue_locked = LockedObject(queue)
13791398
reschedule = Doorbell()
1380-
istate = ProcessorInternalState(ctx, to_proc,
1399+
istate = ProcessorInternalState(ctx, to_proc, return_queue,
13811400
queue_locked, reschedule,
13821401
Dict{Int,Task}(),
1402+
Dict{Int,Vector{Any}}(),
13831403
Ref(UInt32(0)), Ref(UInt64(0)),
1404+
Set{Int}(),
13841405
Ref(false))
13851406
runner = start_processor_runner!(istate, uid, return_queue)
13861407
@static if VERSION < v"1.9"

Diff for: src/threadproc.jl

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...); @n
2626
fetch(task)
2727
return result[]
2828
catch err
29+
if err isa InterruptException
30+
if !istaskdone(task)
31+
# Propagate cancellation signal
32+
Threads.@spawn Base.throwto(task, InterruptException())
33+
end
34+
end
2935
err, frames = Base.current_exceptions(task)[1]
3036
rethrow(CapturedException(err, frames))
3137
end

Diff for: src/utils/dagdebug.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ function istask end
22
function task_id end
33

44
const DAGDEBUG_CATEGORIES = Symbol[:global, :submit, :schedule, :scope,
5-
:take, :execute, :move, :processor]
5+
:take, :execute, :move, :processor, :cancel]
66
macro dagdebug(thunk, category, msg, args...)
77
cat_sym = category.value
88
@gensym id

Diff for: test/scheduler.jl

+11
Original file line numberDiff line numberDiff line change
@@ -535,3 +535,14 @@ end
535535
end
536536
end
537537
end
538+
539+
@testset "Cancellation" begin
540+
t = Dagger.@spawn scope=Dagger.scope(worker=1, thread=1) sleep(100)
541+
start_time = time_ns()
542+
Dagger.cancel!(t)
543+
@test_throws_unwrap Dagger.DTaskFailedException fetch(t)
544+
t = Dagger.@spawn scope=Dagger.scope(worker=1, thread=1) yield()
545+
fetch(t)
546+
finish_time = time_ns()
547+
@test (finish_time - start_time) * 1e-9 < 100
548+
end

0 commit comments

Comments
 (0)