-
-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathcancellation.jl
110 lines (98 loc) · 4.03 KB
/
cancellation.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
cancel!(tid=nothing; sch_uid=nothing, force=false, halt_sch=false)
Cancel a running thunk. Note that if the scheduler has already shut down this
will not do anything.
"""
function cancel!(tid::Union{Int,Nothing}=nothing;
sch_uid::Union{UInt64,Nothing}=nothing,
force::Bool=false, halt_sch::Bool=false)
remotecall_fetch(1, tid, sch_uid, force, halt_sch) do tid, sch_uid, force, halt_sch
state = Sch.EAGER_STATE[]
# Check that the scheduler isn't stopping or has already stopped
if !isnothing(state) && !state.halt.set
@lock state.lock _cancel!(state, tid, sch_uid, force, halt_sch)
end
end
end
function _cancel!(state, tid, sch_uid, force, halt_sch)
@assert islocked(state.lock)
# Get the scheduler uid
if sch_uid === nothing
sch_uid = state.uid
end
# Cancel ready tasks
for task in state.ready
tid !== nothing && task.id == tid && continue
@dagdebug tid :cancel "Cancelling ready task"
state.cache[task] = InterruptException()
state.errored[task] = true
Sch.set_failed!(state, task)
end
empty!(state.ready)
# Cancel waiting tasks
for task in keys(state.waiting)
tid !== nothing && task.id == tid && continue
@dagdebug tid :cancel "Cancelling waiting task"
state.cache[task] = InterruptException()
state.errored[task] = true
Sch.set_failed!(state, task)
end
empty!(state.waiting)
# Cancel running tasks at the processor level
wids = unique(map(root_worker_id, values(state.running_on)))
for wid in wids
remotecall_fetch(wid, tid, sch_uid, force) do _tid, sch_uid, force
Dagger.Sch.proc_states(sch_uid) do states
for (proc, state) in states
istate = state.state
any_cancelled = false
@lock istate.queue begin
for (tid, task) in istate.tasks
_tid !== nothing && tid == _tid && continue
task_spec = istate.task_specs[tid]
Tf = task_spec[6]
Tf === typeof(Sch.eager_thunk) && continue
istaskdone(task) && continue
any_cancelled = true
@dagdebug tid :cancel "Cancelling running task ($Tf)"
if force
@dagdebug tid :cancel "Interrupting running task ($Tf)"
Threads.@spawn Base.throwto(task, InterruptException())
else
# Tell the processor to just drop this task
task_occupancy = task_spec[4]
time_util = task_spec[2]
istate.proc_occupancy[] -= task_occupancy
istate.time_pressure[] -= time_util
push!(istate.cancelled, tid)
to_proc = istate.proc
put!(istate.return_queue, (myid(), to_proc, tid, (InterruptException(), nothing)))
end
end
end
if any_cancelled
notify(istate.reschedule)
end
end
end
return
end
end
if halt_sch
unlock(state.lock)
# Give tasks a moment to be processed
sleep(0.5)
# Halt the scheduler
@dagdebug nothing :cancel "Halting the scheduler"
notify(state.halt)
put!(state.chan, (1, nothing, nothing, (Sch.SchedulerHaltedException(), nothing)))
# Wait for the scheduler to halt
@dagdebug nothing :cancel "Waiting for scheduler to halt"
while Sch.EAGER_INIT[]
sleep(0.1)
end
@dagdebug nothing :cancel "Scheduler halted"
lock(state.lock)
end
return
end