Skip to content

Commit 5a6c939

Browse files
authored
Merge pull request #258 from JuliaParallel/jps/wavetoy-fix
Fix eager API thunk retention
2 parents e26b2ef + 03f7aa4 commit 5a6c939

File tree

7 files changed

+217
-148
lines changed

7 files changed

+217
-148
lines changed

Diff for: Project.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "Dagger"
22
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
3-
version = "0.12.3"
3+
version = "0.12.4"
44

55
[deps]
66
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"

Diff for: src/sch/Sch.jl

+34-71
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
module Sch
22

33
using Distributed
4-
import MemPool: DRef
4+
import MemPool: DRef, poolset
55

66
import ..Dagger
7-
import ..Dagger: Context, Processor, Thunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, AnyScope
8-
import ..Dagger: order, free!, dependents, noffspring, istask, inputs, unwrap_weak, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime
7+
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, AnyScope
8+
import ..Dagger: order, free!, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime
99

1010
const OneToMany = Dict{Thunk, Set{Thunk}}
1111

@@ -45,7 +45,7 @@ Fields:
4545
- `cache::Dict{Thunk, Any}` - Maps from a finished `Thunk` to it's cached result, often a DRef
4646
- `running::Set{Thunk}` - The set of currently-running `Thunk`s
4747
- `running_on::Dict{Thunk,OSProc}` - Map from `Thunk` to the OS process executing it
48-
- `thunk_dict::Dict{Int, Any}` - Maps from thunk IDs to a `Thunk`
48+
- `thunk_dict::Dict{Int, WeakThunk}` - Maps from thunk IDs to a `Thunk`
4949
- `node_order::Any` - Function that returns the order of a thunk
5050
- `worker_pressure::Dict{Int,Dict{Type,UInt64}}` - Cache of worker pressure
5151
- `worker_capacity::Dict{Int,Dict{Type,UInt64}}` - Maps from worker ID to capacity
@@ -67,7 +67,7 @@ struct ComputeState
6767
cache::WeakKeyDict{Thunk, Any}
6868
running::Set{Thunk}
6969
running_on::Dict{Thunk,OSProc}
70-
thunk_dict::Dict{Int, Any}
70+
thunk_dict::Dict{Int, WeakThunk}
7171
node_order::Any
7272
worker_pressure::Dict{Int,Dict{Type,UInt64}}
7373
worker_capacity::Dict{Int,Dict{Type,UInt64}}
@@ -90,7 +90,7 @@ function start_state(deps::Dict, node_order, chan)
9090
Dict{Thunk, Any}(),
9191
Set{Thunk}(),
9292
Dict{Thunk,OSProc}(),
93-
Dict{Int, Thunk}(),
93+
Dict{Int, WeakThunk}(),
9494
node_order,
9595
Dict{Int,Dict{Type,UInt64}}(),
9696
Dict{Int,Dict{Type,UInt64}}(),
@@ -333,15 +333,22 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
333333

334334
# setup thunk_dict mappings
335335
for node in filter(istask, keys(deps))
336-
state.thunk_dict[node.id] = node
336+
state.thunk_dict[node.id] = WeakThunk(node)
337337
for dep in deps[node]
338-
state.thunk_dict[dep.id] = dep
338+
state.thunk_dict[dep.id] = WeakThunk(dep)
339339
end
340340
end
341341

342342
# Initialize procs, pressure, and capacity
343343
@sync for p in procs_to_use(ctx)
344-
@async init_proc(state, p)
344+
@async begin
345+
try
346+
init_proc(state, p)
347+
catch err
348+
@error "Error initializing worker $p" exception=(err,catch_backtrace())
349+
remove_dead_proc!(ctx, state, p)
350+
end
351+
end
345352
end
346353

347354
# setup dynamic listeners
@@ -357,6 +364,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
357364

358365
# Loop while we still have thunks to execute
359366
while !isempty(state.ready) || !isempty(state.running)
367+
procs_state = assign_new_procs!(ctx, state, procs_state)
360368
if !isempty(state.ready)
361369
# Nothing running, so schedule up to N thunks, 1 per N workers
362370
schedule!(ctx, state, procs_state)
@@ -407,14 +415,14 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
407415
end
408416
continue
409417
else
410-
if ctx.options.allow_errors || state.thunk_dict[thunk_id].options.allow_errors
418+
if ctx.options.allow_errors || unwrap_weak_checked(state.thunk_dict[thunk_id]).options.allow_errors
411419
thunk_failed = true
412420
else
413421
throw(res)
414422
end
415423
end
416424
end
417-
node = state.thunk_dict[thunk_id]
425+
node = unwrap_weak_checked(state.thunk_dict[thunk_id])
418426
if metadata !== nothing
419427
state.worker_pressure[pid][typeof(proc)] = metadata.pressure
420428
state.worker_loadavg[pid] = metadata.loadavg
@@ -572,7 +580,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
572580

573581
# Calculate scope
574582
scope = AnyScope()
575-
for input in unwrap_weak.(task.inputs)
583+
for input in unwrap_weak_checked.(task.inputs)
576584
chunk = if istask(input)
577585
state.cache[input]
578586
elseif input isa Chunk
@@ -741,69 +749,23 @@ function finish_task!(ctx, state, node, thunk_failed)
741749
if node.cache
742750
node.cache_ref = state.cache[node]
743751
end
744-
for dep in sort!(collect(get(()->Set{Thunk}(), state.waiting_data, node)), by=state.node_order)
745-
dep_isready = false
746-
if haskey(state.waiting, dep)
747-
set = state.waiting[dep]
748-
node in set && pop!(set, node)
749-
dep_isready = isempty(set)
750-
if dep_isready
751-
delete!(state.waiting, dep)
752-
end
753-
else
754-
dep_isready = true
755-
end
756-
if dep_isready
757-
if !thunk_failed
758-
push!(state.ready, dep)
759-
end
760-
end
761-
end
762-
if haskey(state.futures, node)
763-
# Notify any listening thunks
764-
for future in state.futures[node]
765-
put!(future, state.cache[node]; error=thunk_failed)
766-
end
767-
delete!(state.futures, node)
768-
end
752+
schedule_dependents!(state, node, thunk_failed)
753+
fill_registered_futures!(state, node, thunk_failed)
769754

770-
# Chunk clean-up
771-
to_evict = Set{Chunk}()
772-
for inp in filter(t->istask(t) || (t isa Chunk), unwrap_weak.(node.inputs))
773-
if inp in keys(state.waiting_data)
774-
w = state.waiting_data[inp]
775-
if node in w
776-
pop!(w, node)
777-
end
778-
if isempty(w)
779-
delete!(state.waiting_data, inp)
780-
if istask(inp) && haskey(state.cache, inp)
781-
_node = state.cache[inp]
782-
if _node isa Chunk
783-
push!(to_evict, _node)
784-
end
785-
GC.@preserve inp begin
786-
pop!(state.cache, inp)
787-
if haskey(state.errored, inp)
788-
pop!(state.errored, inp)
789-
end
790-
end
791-
elseif inp isa Chunk
792-
push!(to_evict, inp)
793-
end
794-
end
795-
end
796-
end
755+
to_evict = cleanup_inputs!(state, node)
797756
if haskey(state.waiting_data, node) && isempty(state.waiting_data[node])
798757
delete!(state.waiting_data, node)
799758
end
759+
evict_all_chunks!(ctx, to_evict)
760+
end
761+
762+
function evict_all_chunks!(ctx, to_evict)
800763
if !isempty(to_evict)
801764
@sync for w in map(p->p.pid, procs_to_use(ctx))
802765
@async remote_do(evict_chunks!, w, to_evict)
803766
end
804767
end
805768
end
806-
807769
function evict_chunks!(chunks::Set{Chunk})
808770
for chunk in chunks
809771
haskey(CHUNK_CACHE, chunk) && delete!(CHUNK_CACHE, chunk)
@@ -852,19 +814,20 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
852814
end
853815

854816
ids = map(enumerate(thunk.inputs)) do (idx,x)
855-
istask(x) ? unwrap_weak(x).id : -idx
817+
istask(x) ? unwrap_weak_checked(x).id : -idx
856818
end
857819

858820
data = map(thunk.inputs) do x
859-
istask(x) ? state.cache[unwrap_weak(x)] : x
821+
istask(x) ? state.cache[unwrap_weak_checked(x)] : x
860822
end
861823
toptions = thunk.options !== nothing ? thunk.options : ThunkOptions()
862824
options = merge(ctx.options, toptions)
863825
@assert (options.single == 0) || (gproc.pid == options.single)
864-
sch_handle = SchedulerHandle(ThunkID(thunk.id), state.worker_chans[gproc.pid]...)
826+
# TODO: Set `sch_handle.tid.ref` to the right `DRef`
827+
sch_handle = SchedulerHandle(ThunkID(thunk.id, nothing), state.worker_chans[gproc.pid]...)
865828
state.worker_pressure[gproc.pid][typeof(proc)] += util
866829

867-
# FIXME: De-dup common fields (log_sink, uid, etc.)
830+
# TODO: De-dup common fields (log_sink, uid, etc.)
868831
push!(to_send, (util, thunk.id, thunk.f, data, thunk.get_result,
869832
thunk.persist, thunk.cache, thunk.meta, options, ids,
870833
(log_sink=ctx.log_sink, profile=ctx.profile),
@@ -944,7 +907,7 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c
944907
unlock(TASK_SYNC)
945908
else
946909
# Under-subscribed, calculate extra utilization and execute thunk
947-
@debug "($(myid())) ($thunk_id) Using available $to_proc: $extra_util | $(real_util[])/$cap"
910+
@debug "($(myid())) $f ($thunk_id) Using available $to_proc: $extra_util | $(real_util[])/$cap"
948911
extra_util = if extra_util isa MaxUtilization
949912
count(c->typeof(c)===typeof(to_proc), children(from_proc))
950913
else
@@ -982,7 +945,7 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c
982945
lock(TASK_SYNC) do
983946
real_util[] -= extra_util
984947
end
985-
@debug "($(myid())) ($thunk_id) Releasing $(typeof(to_proc)): $extra_util | $(real_util[])/$cap"
948+
@debug "($(myid())) $f ($thunk_id) Releasing $(typeof(to_proc)): $extra_util | $(real_util[])/$cap"
986949
lock(TASK_SYNC) do
987950
notify(TASK_SYNC)
988951
end

Diff for: src/sch/dynamic.jl

+68-33
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
export SchedulerHaltedException
22
export sch_handle, halt!, exec!, get_dag_ids, add_thunk!
33

4-
"Identifies a thunk by its ID."
4+
"Identifies a thunk by its ID, and preserves the thunk in the scheduler."
55
struct ThunkID
66
id::Int
7+
ref::Union{DRef,Nothing}
78
end
9+
ThunkID(id::Int) = ThunkID(id, nothing)
810

911
"A handle to the scheduler, used by dynamic thunks."
1012
struct SchedulerHandle
@@ -52,7 +54,13 @@ function dynamic_listener!(ctx, state)
5254
if !(unwrap_nested_exception(err) isa Union{SchedulerHaltedException,
5355
ProcessExitedException,
5456
InvalidStateException})
55-
@error exception=(err,catch_backtrace())
57+
iob = IOContext(IOBuffer(), :color=>true)
58+
println(iob, "Error in sending dynamic request:")
59+
Base.showerror(iob, err)
60+
Base.show_backtrace(iob, catch_backtrace())
61+
println(iob)
62+
seek(iob.io, 0)
63+
write(stderr, iob)
5664
end
5765
break
5866
end
@@ -69,7 +77,13 @@ function dynamic_listener!(ctx, state)
6977
if !(unwrap_nested_exception(err) isa Union{SchedulerHaltedException,
7078
ProcessExitedException,
7179
InvalidStateException})
72-
@error exception=(err,catch_backtrace())
80+
iob = IOContext(IOBuffer(), :color=>true)
81+
println(iob, "Error in sending dynamic result:")
82+
Base.showerror(iob, err)
83+
Base.show_backtrace(iob, catch_backtrace())
84+
println(iob)
85+
seek(iob.io, 0)
86+
write(stderr, iob)
7387
end
7488
end
7589
end
@@ -110,25 +124,34 @@ end
110124
"Waits on a thunk to complete, and fetches its result."
111125
function Base.fetch(h::SchedulerHandle, id::ThunkID)
112126
future = ThunkFuture(Future(1))
113-
exec!(_register_future!, h, future, id.id)
127+
exec!(_register_future!, h, future, id)
114128
fetch(future; proc=thunk_processor())
115129
end
116130
"Waits on a thunk to complete, and fetches its result."
117-
function register_future!(h::SchedulerHandle, id::ThunkID, future::ThunkFuture)
118-
exec!(_register_future!, h, future, id.id)
119-
end
120-
function _register_future!(ctx, state, task, tid, (future, id))
121-
tid != id || throw(DynamicThunkException("Cannot fetch own result"))
122-
thunk = state.thunk_dict[id]
123-
ownthunk = state.thunk_dict[tid]
124-
dominates(target, t) = (t == target) || any(_t->dominates(target, _t), filter(istask, unwrap_weak.(t.inputs)))
125-
!dominates(ownthunk, thunk) || throw(DynamicThunkException("Cannot fetch result of dominated thunk"))
126-
# TODO: Assert that future will be fulfilled
127-
if haskey(state.cache, thunk)
128-
put!(future, state.cache[thunk]; error=state.errored[thunk])
129-
else
130-
futures = get!(()->ThunkFuture[], state.futures, thunk)
131-
push!(futures, future)
131+
register_future!(h::SchedulerHandle, id::ThunkID, future::ThunkFuture) =
132+
exec!(_register_future!, h, future, id)
133+
function _register_future!(ctx, state, task, tid, (future, id)::Tuple{ThunkFuture,ThunkID})
134+
tid != id.id || throw(DynamicThunkException("Cannot fetch own result"))
135+
GC.@preserve id begin
136+
thunk = unwrap_weak_checked(state.thunk_dict[id.id])
137+
ownthunk = unwrap_weak_checked(state.thunk_dict[tid])
138+
function dominates(target, t)
139+
t == target && return true
140+
# N.B. Skips expired tasks
141+
task_inputs = filter(istask, Dagger.unwrap_weak.(t.inputs))
142+
if any(_t->dominates(target, _t), task_inputs)
143+
return true
144+
end
145+
return false
146+
end
147+
!dominates(ownthunk, thunk) || throw(DynamicThunkException("Cannot fetch result of dominated thunk"))
148+
# TODO: Assert that future will be fulfilled
149+
if haskey(state.cache, thunk)
150+
put!(future, state.cache[thunk]; error=state.errored[thunk])
151+
else
152+
futures = get!(()->ThunkFuture[], state.futures, thunk)
153+
push!(futures, future)
154+
end
132155
end
133156
nothing
134157
end
@@ -146,27 +169,39 @@ get_dag_ids(h::SchedulerHandle) =
146169
function _get_dag_ids(ctx, state, task, tid, _)
147170
deps = Dict{ThunkID,Set{ThunkID}}()
148171
for (id,thunk) in state.thunk_dict
172+
thunk = unwrap_weak_checked(thunk)
173+
# TODO: Get at `thunk_ref` for `thunk_id.ref`
174+
thunk_id = ThunkID(id, nothing)
149175
if haskey(state.waiting_data, thunk)
150-
deps[ThunkID(id)] = Set(map(t->ThunkID(t.id), collect(state.waiting_data[thunk])))
176+
deps[thunk_id] = Set(map(t->ThunkID(t.id, nothing), collect(state.waiting_data[thunk])))
151177
else
152-
deps[ThunkID(id)] = Set{ThunkID}()
178+
deps[thunk_id] = Set{ThunkID}()
153179
end
154180
end
155181
deps
156182
end
157183

158184
"Adds a new Thunk to the DAG."
159-
add_thunk!(f, h::SchedulerHandle, args...; future=nothing, kwargs...) =
160-
ThunkID(exec!(_add_thunk!, h, f, args, kwargs, future))
161-
function _add_thunk!(ctx, state, task, tid, (f, args, kwargs, future))
162-
_args = map(arg->arg isa ThunkID ? Dagger.WeakThunk(state.thunk_dict[arg.id]) : arg, args)
163-
thunk = Thunk(f, _args...; kwargs...)
164-
state.thunk_dict[thunk.id] = thunk
165-
@assert reschedule_inputs!(state, thunk)
166-
if future !== nothing
167-
# Ensure we attach a future before the thunk is scheduled
168-
_register_future!(ctx, state, task, tid, (future, thunk.id))
185+
add_thunk!(f, h::SchedulerHandle, args...; future=nothing, ref=nothing, kwargs...) =
186+
exec!(_add_thunk!, h, f, args, kwargs, future, ref)
187+
function _add_thunk!(ctx, state, task, tid, (f, args, kwargs, future, ref))
188+
_args = map(arg->arg isa ThunkID ? state.thunk_dict[arg.id] : arg, args)
189+
GC.@preserve _args begin
190+
thunk = Thunk(f, _args...; kwargs...)
191+
# Create a `DRef` to `thunk` so that the caller can preserve it
192+
thunk_ref = poolset(thunk)
193+
thunk_id = ThunkID(thunk.id, thunk_ref)
194+
state.thunk_dict[thunk.id] = WeakThunk(thunk)
195+
reschedule_inputs!(state, thunk)
196+
if future !== nothing
197+
# Ensure we attach a future before the thunk is scheduled
198+
_register_future!(ctx, state, task, tid, (future, thunk_id))
199+
end
200+
if ref !== nothing
201+
# Preserve the `EagerThunkFinalizer` through `thunk`
202+
thunk.eager_ref = ref
203+
end
204+
put!(state.chan, RescheduleSignal())
205+
return thunk_id
169206
end
170-
put!(state.chan, RescheduleSignal())
171-
return thunk.id::Int
172207
end

Diff for: src/sch/eager.jl

+4-4
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ function eager_thunk()
7474
adjust_pressure!(h, Dagger.ThreadProc, -util)
7575
while isopen(EAGER_THUNK_CHAN)
7676
try
77-
ev, future, uid, f, args, opts = take!(EAGER_THUNK_CHAN)
77+
added_future, future, uid, ref, f, args, opts = take!(EAGER_THUNK_CHAN)
7878
# preserve inputs until they enter the scheduler
7979
tid = GC.@preserve args begin
80-
args = map(x->x isa Dagger.EagerThunk ? ThunkID(EAGER_ID_MAP[x.uid]) : x, args)
81-
add_thunk!(f, h, args...; future=future, opts...)
80+
_args = map(x->x isa Dagger.EagerThunk ? ThunkID(EAGER_ID_MAP[x.uid], x.thunk_ref) : x, args)
81+
add_thunk!(f, h, _args...; future=future, ref=ref, opts...)
8282
end
8383
EAGER_ID_MAP[uid] = tid.id
84-
notify(ev)
84+
put!(added_future, tid.ref)
8585
catch err
8686
iob = IOContext(IOBuffer(), :color=>true)
8787
println(iob, "Error in eager listener:")

0 commit comments

Comments
 (0)