Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: JuliaParallel/Dagger.jl
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.11.5
Choose a base ref
...
head repository: JuliaParallel/Dagger.jl
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v0.11.6
Choose a head ref
  • 6 commits
  • 11 files changed
  • 1 contributor

Commits on Jul 22, 2021

  1. Copy the full SHA
    9a6da0b View commit details
  2. Bugfixes for eager API

    Ensure that unreachable thunks are cleaned up by GC
    Make `add_thunk!` interrupt sleeping scheduler
    Allow registering eager thunks and single future in `add_thunk!`
    Make Dagger.spawn synchronous
    Remove redundant `dependents` and `finished` fields from ComputeState
    jpsamaroo committed Jul 22, 2021
    Copy the full SHA
    42189ed View commit details
  3. Copy the full SHA
    4f6575d View commit details
  4. Make next_id() thread-safe

    jpsamaroo committed Jul 22, 2021
    Copy the full SHA
    82fe11f View commit details
  5. Bump to 0.11.6

    jpsamaroo committed Jul 22, 2021
    Copy the full SHA
    922dfa2 View commit details
  6. Merge pull request #223 from JuliaParallel/jps/eager-mem-leak-fix

    Fix memory leaks in eager API
    jpsamaroo authored Jul 22, 2021
    2
    Copy the full SHA
    5ccbd87 View commit details
Showing with 243 additions and 146 deletions.
  1. +1 −1 Project.toml
  2. +2 −0 src/Dagger.jl
  3. +0 −2 src/chunks.jl
  4. +65 −59 src/sch/Sch.jl
  5. +38 −25 src/sch/dynamic.jl
  6. +28 −6 src/sch/eager.jl
  7. +22 −19 src/sch/util.jl
  8. +54 −19 src/thunk.jl
  9. +15 −0 test/runtests.jl
  10. +16 −13 test/scheduler.jl
  11. +2 −2 test/thunk.jl
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "Dagger"
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
version = "0.11.5"
version = "0.11.6"

[deps]
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
2 changes: 2 additions & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@ module Dagger

using Distributed, SharedArrays

using MemPool

import Base: collect, adjoint, reduce
import Distributed: procs

2 changes: 0 additions & 2 deletions src/chunks.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@

using MemPool
using Serialization

export domain, UnitDomain, project, alignfirst, ArrayDomain
124 changes: 65 additions & 59 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ using Distributed
import MemPool: DRef

import ..Dagger
import ..Dagger: Context, Processor, Thunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor
import ..Dagger: Context, Processor, Thunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, unwrap_weak, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor

const OneToMany = Dict{Thunk, Set{Thunk}}

@@ -44,8 +44,6 @@ The internal state-holding struct of the scheduler.
Fields:
- `uid::UInt64` - Unique identifier for this scheduler instance
- `dependents::Dict{Union{Thunk,Chunk},Set{Thunk}}` - The result of calling `dependents` on the DAG
- `finished::Set{Thunk}` - The set of completed `Thunk`s
- `waiting::OneToMany` - Map from downstream `Thunk` to upstream `Thunk`s that still need to execute
- `waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}}` - Map from input `Chunk`/upstream `Thunk` to all unfinished downstream `Thunk`s, to retain caches
- `ready::Vector{Thunk}` - The list of `Thunk`s that are ready to execute
@@ -60,20 +58,18 @@ Fields:
- `worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}}` - Communication channels between the scheduler and each worker
- `procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}}` - Cached linked list of processors ready to be used
- `function_cost_cache::Dict{Type{<:Tuple},UInt}` - Cache of estimated CPU time required to compute the given signature
- `halt::Base.RefValue{Bool}` - Flag indicating, when set, that the scheduler should halt immediately
- `lock::ReentrantLock()` - Lock around operations which modify the state
- `halt::Base.Event` - Event indicating that the scheduler is halting
- `lock::ReentrantLock` - Lock around operations which modify the state
- `futures::Dict{Thunk, Vector{ThunkFuture}}` - Futures registered for waiting on the result of a thunk.
- `errored::Set{Thunk}` - Thunks that threw an error
- `chan::RemoteChannel{Channel{Any}}` - Channel for receiving completed thunks
- `errored::WeakKeyDict{Thunk,Bool}` - Indicates if a thunk's result is due to an error.
- `chan::RemoteChannel{Channel{Any}}` - Channel for receiving completed thunks.
"""
struct ComputeState
uid::UInt64
dependents::Dict{Union{Thunk,Chunk},Set{Thunk}}
finished::Set{Thunk}
waiting::OneToMany
waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}}
ready::Vector{Thunk}
cache::Dict{Thunk, Any}
cache::WeakKeyDict{Thunk, Any}
running::Set{Thunk}
running_on::Dict{Thunk,OSProc}
thunk_dict::Dict{Int, Any}
@@ -84,19 +80,17 @@ struct ComputeState
worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}}
procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}}
function_cost_cache::Dict{Type{<:Tuple},UInt}
halt::Base.RefValue{Bool}
halt::Base.Event
lock::ReentrantLock
futures::Dict{Thunk, Vector{ThunkFuture}}
errored::Set{Thunk}
errored::WeakKeyDict{Thunk,Bool}
chan::RemoteChannel{Channel{Any}}
end

function start_state(deps::Dict, node_order, chan)
state = ComputeState(rand(UInt64),
deps,
Set{Thunk}(),
OneToMany(),
Dict{Union{Thunk,Chunk},Set{Thunk}}(),
deps,
Vector{Thunk}(undef, 0),
Dict{Thunk, Any}(),
Set{Thunk}(),
@@ -109,18 +103,13 @@ function start_state(deps::Dict, node_order, chan)
Dict{Int, Tuple{RemoteChannel,RemoteChannel}}(),
Ref{Union{ProcessorCacheEntry,Nothing}}(nothing),
Dict{Type{<:Tuple},UInt}(),
Ref{Bool}(false),
Base.Event(),
ReentrantLock(),
Dict{Thunk, Vector{ThunkFuture}}(),
Set{Thunk}(),
WeakKeyDict{Thunk,Bool}(),
chan)

nodes = sort(collect(keys(deps)), by=node_order)
# N.B. Using merge! here instead would modify deps
for (key,val) in deps
state.waiting_data[key] = copy(val)
end
for k in nodes
for k in sort(collect(keys(deps)), by=node_order)
if istask(k)
waiting = Set{Thunk}(Iterators.filter(istask, inputs(k)))
if isempty(waiting)
@@ -392,7 +381,11 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end

isempty(state.running) && continue
pid, proc, thunk_id, (res, metadata) = take!(chan) # get result of completed thunk
chan_value = take!(chan) # get result of completed thunk
if chan_value isa RescheduleSignal
continue
end
pid, proc, thunk_id, (res, metadata) = chan_value
gproc = OSProc(pid)
lock(newtasks_lock) # This waits for any assign_new_procs! above to complete and then shuts down the task
safepoint(state)
@@ -424,6 +417,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
state.function_cost_cache[sig] = (metadata.threadtime + get(state.function_cost_cache, sig, 0)) ÷ 2
end
state.cache[node] = res
state.errored[node] = thunk_failed
if node.options !== nothing && node.options.checkpoint !== nothing
try
node.options.checkpoint(node, res)
@@ -441,12 +435,14 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())

safepoint(state)
end
state.halt[] = true
@assert !isready(state.chan)
close(state.chan)
notify(state.halt)
@sync for p in procs_to_use(ctx)
@async cleanup_proc(state, p)
end
value = state.cache[d] # TODO: move(OSProc(), state.cache[d])
if d in state.errored
if state.errored[d]
throw(value)
end
if options.checkpoint !== nothing
@@ -556,6 +552,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
while !isempty(state.ready)
# Select a new task and get its options
task = pop!(state.ready)
@assert !haskey(state.cache, task)
opts = merge(ctx.options, task.options)
sig = signature(task, state)

@@ -693,62 +690,69 @@ function pop_with_affinity!(ctx, tasks, proc)
return nothing
end

function finish_task!(ctx, state, node, thunk_failed; free=true)
function finish_task!(ctx, state, node, thunk_failed)
pop!(state.running, node)
delete!(state.running_on, node)
if !thunk_failed
push!(state.finished, node)
else
if thunk_failed
set_failed!(state, node)
end
if istask(node) && node.cache
if node.cache
node.cache_ref = state.cache[node]
end
if !thunk_failed
for dep in sort!(collect(state.dependents[node]), by=state.node_order)
for dep in sort!(collect(get(()->Set{Thunk}(), state.waiting_data, node)), by=state.node_order)
dep_isready = false
if haskey(state.waiting, dep)
set = state.waiting[dep]
node in set && pop!(set, node)
if isempty(set)
pop!(state.waiting, dep)
push!(state.ready, dep)
dep_isready = isempty(set)
if dep_isready
delete!(state.waiting, dep)
end
# todo: free data
else
dep_isready = true
end
if haskey(state.futures, node)
# Notify any listening thunks
for future in state.futures[node]
if istask(node) && haskey(state.cache, node)
put!(future, state.cache[node])
else
put!(future, nothing)
end
if dep_isready
if !thunk_failed
push!(state.ready, dep)
end
delete!(state.futures, node)
end
end
if haskey(state.futures, node)
# Notify any listening thunks
for future in state.futures[node]
put!(future, state.cache[node]; error=thunk_failed)
end
delete!(state.futures, node)
end

# Chunk clean-up
to_evict = Set{Chunk}()
for inp in filter(t->istask(t) || (t isa Chunk), inputs(node))
for inp in filter(t->istask(t) || (t isa Chunk), unwrap_weak.(node.inputs))
if inp in keys(state.waiting_data)
s = state.waiting_data[inp]
if node in s
pop!(s, node)
w = state.waiting_data[inp]
if node in w
pop!(w, node)
end
if free && isempty(s)
if isempty(w)
delete!(state.waiting_data, inp)
if istask(inp) && haskey(state.cache, inp)
_node = state.cache[inp]
if _node isa Chunk
push!(to_evict, _node)
end
free!(_node, force=false, cache=(istask(inp) && inp.cache))
pop!(state.cache, inp)
GC.@preserve inp begin
pop!(state.cache, inp)
pop!(state.errored, inp)
end
elseif inp isa Chunk
push!(to_evict, inp)
end
end
end
end
if haskey(state.waiting_data, node) && isempty(state.waiting_data[node])
delete!(state.waiting_data, node)
end
if !isempty(to_evict)
@sync for w in map(p->p.pid, procs_to_use(ctx))
@async remote_do(evict_chunks!, w, to_evict)
@@ -779,8 +783,8 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
if data !== nothing
# cache hit
state.cache[thunk] = data
thunk_failed = thunk in state.errored
finish_task!(ctx, state, thunk, thunk_failed; free=false)
thunk_failed = state.errored[thunk]
finish_task!(ctx, state, thunk, thunk_failed)
continue
else
# cache miss
@@ -791,19 +795,20 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
try
result = thunk.options.restore(thunk)
state.cache[thunk] = result
finish_task!(ctx, state, thunk, false; free=false)
state.errored[thunk] = false
finish_task!(ctx, state, thunk, false)
continue
catch err
@error "Thunk restore failed" exception=(err,catch_backtrace())
end
end

ids = map(enumerate(thunk.inputs)) do (idx,x)
istask(x) ? x.id : -idx
istask(x) ? unwrap_weak(x).id : -idx
end

data = map(thunk.inputs) do x
istask(x) ? state.cache[x] : x
istask(x) ? state.cache[unwrap_weak(x)] : x
end
toptions = thunk.options !== nothing ? thunk.options : ThunkOptions()
options = merge(ctx.options, toptions)
@@ -829,7 +834,8 @@ function do_tasks(to_proc, chan, tasks)
for task in tasks
@async begin
try
put!(chan, (myid(), to_proc, task[2], do_task(to_proc, task...)))
result = do_task(to_proc, task...)
put!(chan, (myid(), to_proc, task[2], result))
catch ex
bt = catch_backtrace()
put!(chan, (myid(), to_proc, task[2], (CapturedException(ex, bt), nothing)))
Loading