Skip to content

Commit a69dbcd

Browse files
committed
Add cancel! for cancelling tasks
1 parent a419add commit a69dbcd

File tree

7 files changed

+176
-4
lines changed

7 files changed

+176
-4
lines changed

Diff for: docs/src/task-spawning.md

+28
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,34 @@ but can be enabled by setting the scheduler/thunk option ([Scheduler and Thunk o
141141
non-dynamic usecases, since any thunk failure will propagate down to the output
142142
thunk regardless of where it occurs.
143143

144+
## Cancellation
145+
146+
Sometimes a task runs longer than expected (maybe it's hanging due to a bug),
147+
or the user decides that they don't want to wait on a task to run to
148+
completion. In these cases, Dagger provides the `Dagger.cancel!` function,
149+
which allows for stopping a task while it's running, or terminating it before
150+
it gets the chance to start running.
151+
152+
```julia
153+
t = Dagger.@spawn sleep(1000)
154+
# We're bored, let's cancel `t`
155+
Dagger.cancel!(t)
156+
```
157+
158+
`Dagger.cancel!` is generally safe to call, as it will not actually *force* a
159+
task to stop; instead, Dagger will simply "abandon" the task and allow it to
160+
finish on its own in the background, and it will not block the execution of
161+
other `DTask`s that are queued to run. It is possible to force-cancel a task by
162+
doing `Dagger.cancel!(t; force=true)`, but this is generally discouraged, as it
163+
can cause memory leaks, hangs, and segfaults.
164+
165+
If it's desired to cancel all tasks that are scheduled or running, one can call
166+
`Dagger.cancel!()`, and all tasks will be abandoned (or force-cancelled, if
167+
specified). Additionally, if Dagger's scheduler needs to be restarted for any
168+
reason, one can call `Dagger.cancel!(;halt_sch=true)` to stop the scheduler and
169+
all tasks. The scheduler will be automatically restarted on the next
170+
`@spawn`/`spawn` call.
171+
144172
## Lazy API
145173

146174
Alongside the modern eager API, Dagger also has a legacy lazy API, accessible

Diff for: src/Dagger.jl

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ include("thunk.jl")
5555
include("submission.jl")
5656
include("chunks.jl")
5757
include("memory-spaces.jl")
58+
include("cancellation.jl")
5859

5960
# Task scheduling
6061
include("compute.jl")

Diff for: src/cancellation.jl

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
function cancel!(task::DTask; force::Bool=false)
2+
tid = lock(Dagger.Sch.EAGER_ID_MAP) do id_map
3+
id_map[task.uid]
4+
end
5+
cancel!(tid; force)
6+
end
7+
function cancel!(tid::Union{Int,Nothing}=nothing;
8+
force::Bool=false, halt_sch::Bool=false)
9+
remotecall_fetch(1, tid, force, halt_sch) do tid, force, halt_sch
10+
state = Sch.EAGER_STATE[]
11+
state === nothing && return
12+
@lock state.lock _cancel!(state, tid, force, halt_sch)
13+
end
14+
end
15+
function _cancel!(state, tid, force, halt_sch)
16+
@assert islocked(state.lock)
17+
18+
# Get the scheduler uid
19+
sch_uid = state.uid
20+
21+
# Cancel ready tasks
22+
for task in state.ready
23+
tid !== nothing && task.id != tid && continue
24+
@dagdebug tid :cancel "Cancelling ready task"
25+
state.cache[task] = InterruptException()
26+
state.errored[task] = true
27+
Sch.set_failed!(state, task)
28+
end
29+
empty!(state.ready)
30+
31+
# Cancel waiting tasks
32+
for task in keys(state.waiting)
33+
tid !== nothing && task.id != tid && continue
34+
@dagdebug tid :cancel "Cancelling waiting task"
35+
state.cache[task] = InterruptException()
36+
state.errored[task] = true
37+
Sch.set_failed!(state, task)
38+
end
39+
empty!(state.waiting)
40+
41+
# Cancel running tasks at the processor level
42+
wids = unique(map(root_worker_id, values(state.running_on)))
43+
for wid in wids
44+
remotecall_fetch(wid, tid, sch_uid, force) do _tid, sch_uid, force
45+
Dagger.Sch.proc_states(sch_uid) do states
46+
for (proc, state) in states
47+
istate = state.state
48+
any_cancelled = false
49+
@lock istate.queue begin
50+
for (tid, task) in istate.tasks
51+
_tid !== nothing && tid != _tid && continue
52+
task_spec = istate.task_specs[tid]
53+
Tf = task_spec[6]
54+
Tf === typeof(Sch.eager_thunk) && continue
55+
istaskdone(task) && continue
56+
any_cancelled = true
57+
@dagdebug tid :cancel "Cancelling running task ($Tf)"
58+
if force
59+
@dagdebug tid :cancel "Interrupting running task ($Tf)"
60+
Threads.@spawn Base.throwto(task, InterruptException())
61+
else
62+
# Tell the processor to just drop this task
63+
task_occupancy = task_spec[4]
64+
time_util = task_spec[2]
65+
istate.proc_occupancy[] -= task_occupancy
66+
istate.time_pressure[] -= time_util
67+
push!(istate.cancelled, tid)
68+
to_proc = istate.proc
69+
put!(istate.return_queue, (myid(), to_proc, tid, (InterruptException(), nothing)))
70+
end
71+
end
72+
end
73+
if any_cancelled
74+
notify(istate.reschedule)
75+
end
76+
end
77+
end
78+
return
79+
end
80+
end
81+
82+
if halt_sch
83+
unlock(state.lock)
84+
try
85+
# Give tasks a moment to be processed
86+
sleep(0.5)
87+
88+
# Halt the scheduler
89+
@dagdebug nothing :cancel "Halting the scheduler"
90+
notify(state.halt)
91+
put!(state.chan, (1, nothing, nothing, (Sch.SchedulerHaltedException(), nothing)))
92+
93+
# Wait for the scheduler to halt
94+
@dagdebug nothing :cancel "Waiting for scheduler to halt"
95+
while Sch.EAGER_INIT[]
96+
sleep(0.1)
97+
end
98+
@dagdebug nothing :cancel "Scheduler halted"
99+
finally
100+
lock(state.lock)
101+
end
102+
end
103+
104+
return
105+
end

Diff for: src/sch/Sch.jl

+24-3
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,8 @@ function scheduler_exit(ctx, state::ComputeState, options)
652652
lock(ctx.proc_notify) do
653653
notify(ctx.proc_notify)
654654
end
655+
656+
@dagdebug nothing :global "Tore down scheduler" uid=state.uid
655657
end
656658

657659
function procs_to_use(ctx, options=ctx.options)
@@ -1161,11 +1163,14 @@ Base.hash(key::TaskSpecKey, h::UInt) = hash(key.task_id, hash(TaskSpecKey, h))
11611163
struct ProcessorInternalState
11621164
ctx::Context
11631165
proc::Processor
1166+
return_queue::RemoteChannel
11641167
queue::LockedObject{PriorityQueue{TaskSpecKey, UInt32, Base.Order.ForwardOrdering}}
11651168
reschedule::Doorbell
11661169
tasks::Dict{Int,Task}
1170+
task_specs::Dict{Int,Vector{Any}}
11671171
proc_occupancy::Base.RefValue{UInt32}
11681172
time_pressure::Base.RefValue{UInt64}
1173+
cancelled::Set{Int}
11691174
done::Base.RefValue{Bool}
11701175
end
11711176
struct ProcessorState
@@ -1314,6 +1319,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13141319

13151320
# Execute the task and return its result
13161321
t = @task begin
1322+
was_cancelled = false
13171323
result = try
13181324
do_task(to_proc, task)
13191325
catch err
@@ -1322,11 +1328,23 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13221328
finally
13231329
lock(istate.queue) do _
13241330
delete!(tasks, thunk_id)
1325-
proc_occupancy[] -= task_occupancy
1326-
time_pressure[] -= time_util
1331+
delete!(istate.task_specs, thunk_id)
1332+
if !(thunk_id in istate.cancelled)
1333+
proc_occupancy[] -= task_occupancy
1334+
time_pressure[] -= time_util
1335+
else
1336+
# Task was cancelled, so occupancy and pressure are
1337+
# already reduced
1338+
pop!(istate.cancelled, thunk_id)
1339+
was_cancelled = true
1340+
end
13271341
end
13281342
notify(istate.reschedule)
13291343
end
1344+
if was_cancelled
1345+
# A result was already posted to the return queue
1346+
return
1347+
end
13301348
try
13311349
put!(return_queue, (myid(), to_proc, thunk_id, result))
13321350
catch err
@@ -1345,6 +1363,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13451363
t.sticky = false
13461364
end
13471365
tasks[thunk_id] = errormonitor_tracked("thunk $thunk_id", schedule(t))
1366+
istate.task_specs[thunk_id] = task
13481367
proc_occupancy[] += task_occupancy
13491368
time_pressure[] += time_util
13501369
end
@@ -1377,10 +1396,12 @@ function do_tasks(to_proc, return_queue, tasks)
13771396
queue = PriorityQueue{TaskSpecKey, UInt32}()
13781397
queue_locked = LockedObject(queue)
13791398
reschedule = Doorbell()
1380-
istate = ProcessorInternalState(ctx, to_proc,
1399+
istate = ProcessorInternalState(ctx, to_proc, return_queue,
13811400
queue_locked, reschedule,
13821401
Dict{Int,Task}(),
1402+
Dict{Int,Vector{Any}}(),
13831403
Ref(UInt32(0)), Ref(UInt64(0)),
1404+
Set{Int}(),
13841405
Ref(false))
13851406
runner = start_processor_runner!(istate, uid, return_queue)
13861407
@static if VERSION < v"1.9"

Diff for: src/threadproc.jl

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...); @n
2626
fetch(task)
2727
return result[]
2828
catch err
29+
if err isa InterruptException
30+
if !istaskdone(task)
31+
# Propagate cancellation signal
32+
Threads.@spawn Base.throwto(task, InterruptException())
33+
end
34+
end
2935
err, frames = Base.current_exceptions(task)[1]
3036
rethrow(CapturedException(err, frames))
3137
end

Diff for: src/utils/dagdebug.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ function istask end
22
function task_id end
33

44
const DAGDEBUG_CATEGORIES = Symbol[:global, :submit, :schedule, :scope,
5-
:take, :execute, :move, :processor]
5+
:take, :execute, :move, :processor, :cancel]
66
macro dagdebug(thunk, category, msg, args...)
77
cat_sym = category.value
88
@gensym id

Diff for: test/scheduler.jl

+11
Original file line numberDiff line numberDiff line change
@@ -535,3 +535,14 @@ end
535535
end
536536
end
537537
end
538+
539+
@testset "Cancellation" begin
540+
t = Dagger.@spawn scope=Dagger.scope(worker=1, thread=1) sleep(100)
541+
start_time = time_ns()
542+
Dagger.cancel!(t)
543+
@test_throws_unwrap Dagger.DTaskFailedException fetch(t)
544+
t = Dagger.@spawn scope=Dagger.scope(worker=1, thread=1) yield()
545+
fetch(t)
546+
finish_time = time_ns()
547+
@test (finish_time - start_time) * 1e-9 < 100
548+
end

0 commit comments

Comments
 (0)