@@ -4,7 +4,7 @@ using Distributed
4
4
import MemPool: DRef
5
5
6
6
import .. Dagger
7
- import .. Dagger: Context, Processor, Thunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg , @logmsg , timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor
7
+ import .. Dagger: Context, Processor, Thunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, unwrap_weak, affinity, tochunk, @dbg , @logmsg , timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor
8
8
9
9
const OneToMany = Dict{Thunk, Set{Thunk}}
10
10
@@ -44,8 +44,6 @@ The internal state-holding struct of the scheduler.
44
44
45
45
Fields:
46
46
- `uid::UInt64` - Unique identifier for this scheduler instance
47
- - `dependents::Dict{Union{Thunk,Chunk},Set{Thunk}}` - The result of calling `dependents` on the DAG
48
- - `finished::Set{Thunk}` - The set of completed `Thunk`s
49
47
- `waiting::OneToMany` - Map from downstream `Thunk` to upstream `Thunk`s that still need to execute
50
48
- `waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}}` - Map from input `Chunk`/upstream `Thunk` to all unfinished downstream `Thunk`s, to retain caches
51
49
- `ready::Vector{Thunk}` - The list of `Thunk`s that are ready to execute
@@ -60,20 +58,18 @@ Fields:
60
58
- `worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}}` - Communication channels between the scheduler and each worker
61
59
- `procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}}` - Cached linked list of processors ready to be used
62
60
- `function_cost_cache::Dict{Type{<:Tuple},UInt}` - Cache of estimated CPU time required to compute the given signature
63
- - `halt::Base.RefValue{Bool} ` - Flag indicating, when set, that the scheduler should halt immediately
64
- - `lock::ReentrantLock() ` - Lock around operations which modify the state
61
+ - `halt::Base.Event ` - Event indicating that the scheduler is halting
62
+ - `lock::ReentrantLock` - Lock around operations which modify the state
65
63
- `futures::Dict{Thunk, Vector{ThunkFuture}}` - Futures registered for waiting on the result of a thunk.
66
- - `errored::Set {Thunk}` - Thunks that threw an error
67
- - `chan::RemoteChannel{Channel{Any}}` - Channel for receiving completed thunks
64
+ - `errored::WeakKeyDict {Thunk,Bool }` - Indicates if a thunk's result is due to an error.
65
+ - `chan::RemoteChannel{Channel{Any}}` - Channel for receiving completed thunks.
68
66
"""
69
67
struct ComputeState
70
68
uid:: UInt64
71
- dependents:: Dict{Union{Thunk,Chunk},Set{Thunk}}
72
- finished:: Set{Thunk}
73
69
waiting:: OneToMany
74
70
waiting_data:: Dict{Union{Thunk,Chunk},Set{Thunk}}
75
71
ready:: Vector{Thunk}
76
- cache:: Dict {Thunk, Any}
72
+ cache:: WeakKeyDict {Thunk, Any}
77
73
running:: Set{Thunk}
78
74
running_on:: Dict{Thunk,OSProc}
79
75
thunk_dict:: Dict{Int, Any}
@@ -84,19 +80,17 @@ struct ComputeState
84
80
worker_chans:: Dict{Int, Tuple{RemoteChannel,RemoteChannel}}
85
81
procs_cache_list:: Base.RefValue{Union{ProcessorCacheEntry,Nothing}}
86
82
function_cost_cache:: Dict{Type{<:Tuple},UInt}
87
- halt:: Base.RefValue{Bool}
83
+ halt:: Base.Event
88
84
lock:: ReentrantLock
89
85
futures:: Dict{Thunk, Vector{ThunkFuture}}
90
- errored:: Set {Thunk}
86
+ errored:: WeakKeyDict {Thunk,Bool }
91
87
chan:: RemoteChannel{Channel{Any}}
92
88
end
93
89
94
90
function start_state (deps:: Dict , node_order, chan)
95
91
state = ComputeState (rand (UInt64),
96
- deps,
97
- Set {Thunk} (),
98
92
OneToMany (),
99
- Dict {Union{Thunk,Chunk},Set{Thunk}} () ,
93
+ deps ,
100
94
Vector {Thunk} (undef, 0 ),
101
95
Dict {Thunk, Any} (),
102
96
Set {Thunk} (),
@@ -109,18 +103,13 @@ function start_state(deps::Dict, node_order, chan)
109
103
Dict {Int, Tuple{RemoteChannel,RemoteChannel}} (),
110
104
Ref {Union{ProcessorCacheEntry,Nothing}} (nothing ),
111
105
Dict {Type{<:Tuple},UInt} (),
112
- Ref {Bool} ( false ),
106
+ Base . Event ( ),
113
107
ReentrantLock (),
114
108
Dict {Thunk, Vector{ThunkFuture}} (),
115
- Set {Thunk} (),
109
+ WeakKeyDict {Thunk,Bool } (),
116
110
chan)
117
111
118
- nodes = sort (collect (keys (deps)), by= node_order)
119
- # N.B. Using merge! here instead would modify deps
120
- for (key,val) in deps
121
- state. waiting_data[key] = copy (val)
122
- end
123
- for k in nodes
112
+ for k in sort (collect (keys (deps)), by= node_order)
124
113
if istask (k)
125
114
waiting = Set {Thunk} (Iterators. filter (istask, inputs (k)))
126
115
if isempty (waiting)
@@ -392,7 +381,11 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
392
381
end
393
382
394
383
isempty (state. running) && continue
395
- pid, proc, thunk_id, (res, metadata) = take! (chan) # get result of completed thunk
384
+ chan_value = take! (chan) # get result of completed thunk
385
+ if chan_value isa RescheduleSignal
386
+ continue
387
+ end
388
+ pid, proc, thunk_id, (res, metadata) = chan_value
396
389
gproc = OSProc (pid)
397
390
lock (newtasks_lock) # This waits for any assign_new_procs! above to complete and then shuts down the task
398
391
safepoint (state)
@@ -424,6 +417,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
424
417
state. function_cost_cache[sig] = (metadata. threadtime + get (state. function_cost_cache, sig, 0 )) ÷ 2
425
418
end
426
419
state. cache[node] = res
420
+ state. errored[node] = thunk_failed
427
421
if node. options != = nothing && node. options. checkpoint != = nothing
428
422
try
429
423
node. options. checkpoint (node, res)
@@ -441,12 +435,14 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
441
435
442
436
safepoint (state)
443
437
end
444
- state. halt[] = true
438
+ @assert ! isready (state. chan)
439
+ close (state. chan)
440
+ notify (state. halt)
445
441
@sync for p in procs_to_use (ctx)
446
442
@async cleanup_proc (state, p)
447
443
end
448
444
value = state. cache[d] # TODO : move(OSProc(), state.cache[d])
449
- if d in state. errored
445
+ if state. errored[d]
450
446
throw (value)
451
447
end
452
448
if options. checkpoint != = nothing
@@ -556,6 +552,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
556
552
while ! isempty (state. ready)
557
553
# Select a new task and get its options
558
554
task = pop! (state. ready)
555
+ @assert ! haskey (state. cache, task)
559
556
opts = merge (ctx. options, task. options)
560
557
sig = signature (task, state)
561
558
@@ -693,62 +690,69 @@ function pop_with_affinity!(ctx, tasks, proc)
693
690
return nothing
694
691
end
695
692
696
- function finish_task! (ctx, state, node, thunk_failed; free = true )
693
+ function finish_task! (ctx, state, node, thunk_failed)
697
694
pop! (state. running, node)
698
695
delete! (state. running_on, node)
699
- if ! thunk_failed
700
- push! (state. finished, node)
701
- else
696
+ if thunk_failed
702
697
set_failed! (state, node)
703
698
end
704
- if istask (node) && node. cache
699
+ if node. cache
705
700
node. cache_ref = state. cache[node]
706
701
end
707
- if ! thunk_failed
708
- for dep in sort! (collect (state. dependents[node]), by= state. node_order)
702
+ for dep in sort! (collect (get (()-> Set {Thunk} (), state. waiting_data, node)), by= state. node_order)
703
+ dep_isready = false
704
+ if haskey (state. waiting, dep)
709
705
set = state. waiting[dep]
710
706
node in set && pop! (set, node)
711
- if isempty (set)
712
- pop! (state . waiting, dep)
713
- push ! (state. ready , dep)
707
+ dep_isready = isempty (set)
708
+ if dep_isready
709
+ delete ! (state. waiting , dep)
714
710
end
715
- # todo: free data
711
+ else
712
+ dep_isready = true
716
713
end
717
- if haskey (state. futures, node)
718
- # Notify any listening thunks
719
- for future in state. futures[node]
720
- if istask (node) && haskey (state. cache, node)
721
- put! (future, state. cache[node])
722
- else
723
- put! (future, nothing )
724
- end
714
+ if dep_isready
715
+ if ! thunk_failed
716
+ push! (state. ready, dep)
725
717
end
726
- delete! (state. futures, node)
727
718
end
728
719
end
720
+ if haskey (state. futures, node)
721
+ # Notify any listening thunks
722
+ for future in state. futures[node]
723
+ put! (future, state. cache[node]; error= thunk_failed)
724
+ end
725
+ delete! (state. futures, node)
726
+ end
729
727
730
728
# Chunk clean-up
731
729
to_evict = Set {Chunk} ()
732
- for inp in filter (t-> istask (t) || (t isa Chunk), inputs (node))
730
+ for inp in filter (t-> istask (t) || (t isa Chunk), unwrap_weak . (node. inputs ))
733
731
if inp in keys (state. waiting_data)
734
- s = state. waiting_data[inp]
735
- if node in s
736
- pop! (s , node)
732
+ w = state. waiting_data[inp]
733
+ if node in w
734
+ pop! (w , node)
737
735
end
738
- if free && isempty (s)
736
+ if isempty (w)
737
+ delete! (state. waiting_data, inp)
739
738
if istask (inp) && haskey (state. cache, inp)
740
739
_node = state. cache[inp]
741
740
if _node isa Chunk
742
741
push! (to_evict, _node)
743
742
end
744
- free! (_node, force= false , cache= (istask (inp) && inp. cache))
745
- pop! (state. cache, inp)
743
+ GC. @preserve inp begin
744
+ pop! (state. cache, inp)
745
+ pop! (state. errored, inp)
746
+ end
746
747
elseif inp isa Chunk
747
748
push! (to_evict, inp)
748
749
end
749
750
end
750
751
end
751
752
end
753
+ if haskey (state. waiting_data, node) && isempty (state. waiting_data[node])
754
+ delete! (state. waiting_data, node)
755
+ end
752
756
if ! isempty (to_evict)
753
757
@sync for w in map (p-> p. pid, procs_to_use (ctx))
754
758
@async remote_do (evict_chunks!, w, to_evict)
@@ -779,8 +783,8 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
779
783
if data != = nothing
780
784
# cache hit
781
785
state. cache[thunk] = data
782
- thunk_failed = thunk in state. errored
783
- finish_task! (ctx, state, thunk, thunk_failed; free = false )
786
+ thunk_failed = state. errored[thunk]
787
+ finish_task! (ctx, state, thunk, thunk_failed)
784
788
continue
785
789
else
786
790
# cache miss
@@ -791,19 +795,20 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
791
795
try
792
796
result = thunk. options. restore (thunk)
793
797
state. cache[thunk] = result
794
- finish_task! (ctx, state, thunk, false ; free= false )
798
+ state. errored[thunk] = false
799
+ finish_task! (ctx, state, thunk, false )
795
800
continue
796
801
catch err
797
802
@error " Thunk restore failed" exception= (err,catch_backtrace ())
798
803
end
799
804
end
800
805
801
806
ids = map (enumerate (thunk. inputs)) do (idx,x)
802
- istask (x) ? x . id : - idx
807
+ istask (x) ? unwrap_weak (x) . id : - idx
803
808
end
804
809
805
810
data = map (thunk. inputs) do x
806
- istask (x) ? state. cache[x ] : x
811
+ istask (x) ? state. cache[unwrap_weak (x) ] : x
807
812
end
808
813
toptions = thunk. options != = nothing ? thunk. options : ThunkOptions ()
809
814
options = merge (ctx. options, toptions)
@@ -829,7 +834,8 @@ function do_tasks(to_proc, chan, tasks)
829
834
for task in tasks
830
835
@async begin
831
836
try
832
- put! (chan, (myid (), to_proc, task[2 ], do_task (to_proc, task... )))
837
+ result = do_task (to_proc, task... )
838
+ put! (chan, (myid (), to_proc, task[2 ], result))
833
839
catch ex
834
840
bt = catch_backtrace ()
835
841
put! (chan, (myid (), to_proc, task[2 ], (CapturedException (ex, bt), nothing )))
0 commit comments