Skip to content

Commit 0b72b77

Browse files
authored
Merge pull request #264 from JuliaParallel/jps/stalled-thunks
Fix task stalls due to deserialization errors
2 parents 5a6c939 + 2ed3dda commit 0b72b77

File tree

3 files changed

+51
-5
lines changed

3 files changed

+51
-5
lines changed

Diff for: Project.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "Dagger"
22
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
3-
version = "0.12.4"
3+
version = "0.12.5"
44

55
[deps]
66
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"

Diff for: src/sch/Sch.jl

+30-4
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,9 @@ end
294294
"Process-local condition variable (and lock) indicating task completion."
295295
const TASK_SYNC = Threads.Condition()
296296

297+
"Process-local set of running task IDs."
298+
const TASKS_RUNNING = Set{Int}()
299+
297300
"Process-local dictionary tracking per-processor total utilization."
298301
const PROC_UTILIZATION = Dict{UInt64,Dict{Type,Ref{UInt64}}}()
299302

@@ -833,7 +836,21 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
833836
(log_sink=ctx.log_sink, profile=ctx.profile),
834837
sch_handle, state.uid))
835838
end
836-
remote_do(do_tasks, gproc.pid, proc, state.chan, to_send)
839+
try
840+
remotecall_wait(do_tasks, gproc.pid, proc, state.chan, to_send)
841+
catch
842+
# We might get a deserialization error due to something not being
843+
# defined on the worker; in this case, we re-fire one task at a time to
844+
# determine which task failed
845+
for ts in to_send
846+
try
847+
remotecall_wait(do_tasks, gproc.pid, proc, state.chan, [ts])
848+
catch err
849+
bt = catch_backtrace()
850+
put!(state.chan, (gproc.pid, proc, ts[2], (CapturedException(err, bt), nothing)))
851+
end
852+
end
853+
end
837854
end
838855

839856
"""
@@ -843,6 +860,16 @@ Executes a batch of tasks on `to_proc`.
843860
"""
844861
function do_tasks(to_proc, chan, tasks)
845862
for task in tasks
863+
should_launch = lock(TASK_SYNC) do
864+
# Already running; don't try to re-launch
865+
if !(task[2] in TASKS_RUNNING)
866+
push!(TASKS_RUNNING, task[2])
867+
true
868+
else
869+
false
870+
end
871+
end
872+
should_launch || continue
846873
@async begin
847874
try
848875
result = do_task(to_proc, task...)
@@ -944,11 +971,10 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c
944971
@dbg timespan_end(ctx, :compute, thunk_id, (f, to_proc))
945972
lock(TASK_SYNC) do
946973
real_util[] -= extra_util
947-
end
948-
@debug "($(myid())) $f ($thunk_id) Releasing $(typeof(to_proc)): $extra_util | $(real_util[])/$cap"
949-
lock(TASK_SYNC) do
974+
pop!(TASKS_RUNNING, thunk_id)
950975
notify(TASK_SYNC)
951976
end
977+
@debug "($(myid())) $f ($thunk_id) Releasing $(typeof(to_proc)): $extra_util | $(real_util[])/$cap"
952978
metadata = (
953979
pressure=real_util[],
954980
loadavg=((Sys.loadavg()...,) ./ Sys.CPU_THREADS),

Diff for: test/thunk.jl

+20
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,24 @@ end
150150
# Mild stress-test
151151
@test dynamic_fib(10) == 55
152152
end
153+
@testset "undefined function" begin
154+
# Issues #254, #255
155+
156+
# only defined on head node
157+
@eval evil_f(x) = x
158+
159+
eager_thunks = map(1:10) do i
160+
single = isodd(i) ? 1 : first(workers())
161+
Dagger.@spawn single=single evil_f(i)
162+
end
163+
164+
errored(t) = try
165+
fetch(t)
166+
false
167+
catch
168+
true
169+
end
170+
@test any(t->errored(t), eager_thunks)
171+
@test any(t->!errored(t), eager_thunks)
172+
end
153173
end

0 commit comments

Comments
 (0)