Skip to content

Commit 0107b31

Browse files
authored
Merge pull request #370 from JuliaParallel/jps/task-in-cache-bug
Fix incorrect assertion in schedule!
2 parents 66d5147 + 2ebb51b commit 0107b31

File tree

4 files changed

+74
-12
lines changed

4 files changed

+74
-12
lines changed

Diff for: src/sch/Sch.jl

+16-1
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,22 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
610610
end
611611
task = pop!(state.ready)
612612
timespan_start(ctx, :schedule, task.id, (;thunk_id=task.id))
613-
@assert !haskey(state.cache, task)
613+
if haskey(state.cache, task)
614+
if haskey(state.errored, task)
615+
# An error was eagerly propagated to this task
616+
finish_failed!(state, task)
617+
else
618+
# This shouldn't have happened
619+
iob = IOBuffer()
620+
println(iob, "Scheduling inconsistency: Task being scheduled is already cached!")
621+
println(iob, " Task: $(task.id)")
622+
println(iob, " Cache Entry: $(typeof(state.cache[task]))")
623+
ex = SchedulingException(String(take!(iob)))
624+
state.cache[task] = ex
625+
state.errored[task] = true
626+
end
627+
@goto pop_task
628+
end
614629
opts = merge(ctx.options, task.options)
615630
sig = signature(task, state)
616631

Diff for: src/sch/util.jl

+4-1
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,17 @@ function set_failed!(state, origin, thunk=origin)
145145
filter!(x->x!==thunk, state.ready)
146146
state.cache[thunk] = ThunkFailedException(thunk, origin, state.cache[origin])
147147
state.errored[thunk] = true
148+
finish_failed!(state, thunk, origin)
149+
end
150+
function finish_failed!(state, thunk, origin=nothing)
148151
fill_registered_futures!(state, thunk, true)
149152
if haskey(state.waiting_data, thunk)
150153
for dep in state.waiting_data[thunk]
151154
haskey(state.waiting, dep) &&
152155
delete!(state.waiting, dep)
153156
haskey(state.errored, dep) &&
154157
continue
155-
set_failed!(state, origin, dep)
158+
origin !== nothing && set_failed!(state, origin, dep)
156159
end
157160
delete!(state.waiting_data, thunk)
158161
end

Diff for: src/thunk.jl

+51-7
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,59 @@ ThunkFailedException(thunk, origin, ex::E) where E =
193193
ThunkFailedException{E}(convert(WeakThunk, thunk), convert(WeakThunk, origin), ex)
194194
function Base.showerror(io::IO, ex::ThunkFailedException)
195195
t = unwrap_weak(ex.thunk)
196-
o = unwrap_weak(ex.origin)
197-
t_str = t !== nothing ? "$t" : "?"
198-
o_str = o !== nothing ? "$o" : "?"
196+
197+
# Find root-cause thunk
198+
last_tfex = ex
199+
failed_tasks = Union{Thunk,Nothing}[]
200+
while last_tfex.ex isa ThunkFailedException && unwrap_weak(last_tfex.ex.origin) !== nothing
201+
push!(failed_tasks, unwrap_weak(last_tfex.thunk))
202+
last_tfex = last_tfex.ex
203+
end
204+
o = unwrap_weak(last_tfex.origin)
205+
root_ex = last_tfex.ex
206+
207+
function thunk_string(t)
208+
if t === nothing
209+
return "Thunk(?)"
210+
end
211+
Tinputs = Any[]
212+
for input in t.inputs
213+
input = unwrap_weak(input)
214+
if istask(input)
215+
push!(Tinputs, "Thunk(id=$(input.id))")
216+
else
217+
push!(Tinputs, input)
218+
end
219+
end
220+
t_sig = if length(Tinputs) <= 4
221+
"$(t.f)($(join(Tinputs, ", "))))"
222+
else
223+
"$(t.f)($(length(Tinputs)) inputs...)"
224+
end
225+
return "Thunk(id=$(t.id), $t_sig"
226+
end
227+
t_str = thunk_string(t)
228+
o_str = thunk_string(o)
199229
t_id = t !== nothing ? t.id : '?'
200230
o_id = o !== nothing ? o.id : '?'
201-
println(io, "ThunkFailedException ($t failure",
202-
(o !== nothing && t != o) ? " due to a failure in $o)" : ")",
203-
":")
204-
Base.showerror(io, ex.ex)
231+
println(io, "ThunkFailedException:")
232+
println(io, " Root Exception Type: $(typeof(root_ex))")
233+
println(io, " Root Exception:")
234+
Base.showerror(io, root_ex); println(io)
235+
if t !== o
236+
println(io, " Root Thunk: $o_str")
237+
if length(failed_tasks) <= 4
238+
for i in failed_tasks
239+
i_str = thunk_string(i)
240+
println(io, " Inner Thunk: $i_str")
241+
end
242+
else
243+
println(io, " ...")
244+
println(io, " $(length(failed_tasks)) Inner Thunks...")
245+
println(io, " ...")
246+
end
247+
end
248+
print(io, " This Thunk: $t_str")
205249
end
206250

207251
"""

Diff for: test/thunk.jl

+3-3
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ end
9595
end
9696
ex = Dagger.Sch.unwrap_nested_exception(ex)
9797
ex_str = sprint(io->Base.showerror(io,ex))
98-
@test occursin(r"^ThunkFailedException \(Thunk.*failure\):", ex_str)
98+
@test occursin(r"^ThunkFailedException:", ex_str)
9999
@test occursin("Test", ex_str)
100-
@test !occursin("due to a failure in", ex_str)
100+
@test !occursin("Root Thunk", ex_str)
101101

102102
ex = try
103103
fetch(b)
@@ -106,8 +106,8 @@ end
106106
end
107107
ex = Dagger.Sch.unwrap_nested_exception(ex)
108108
ex_str = sprint(io->Base.showerror(io,ex))
109-
@test occursin(r"Thunk.*failure due to a failure in", ex_str)
110109
@test occursin("Test", ex_str)
110+
@test occursin("Root Thunk", ex_str)
111111
end
112112
@testset "single dependent" begin
113113
a = @spawn error("Test")

0 commit comments

Comments
 (0)