Skip to content

Commit 9aa8eee

Browse files
committed
Move support for addprocs!()/rmprocs!() exclusively to the eager API
This ends up being simpler than supporting both.
1 parent befdd4d commit 9aa8eee

File tree

6 files changed

+116
-249
lines changed

6 files changed

+116
-249
lines changed

Diff for: src/context.jl

+70-22
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
export Context, addprocs!, rmprocs!
2+
13
"""
24
Context(xs::Vector{OSProc}) -> Context
35
Context(xs::Vector{Int}) -> Context
@@ -15,21 +17,18 @@ Special fields include:
1517
mutable struct Context
1618
procs::Vector{Processor}
1719
proc_lock::ReentrantLock
18-
proc_notify::Threads.Condition
1920
log_sink::Any
2021
profile::Bool
2122
options
2223
end
2324

2425
function Context(procs::Vector{P}=Processor[OSProc(w) for w in procs()];
25-
proc_lock=ReentrantLock(), proc_notify=Threads.Condition(),
26-
log_sink=TimespanLogging.NoOpLog(), log_file=nothing, profile=false,
27-
options=nothing) where {P<:Processor}
26+
proc_lock=ReentrantLock(), log_sink=TimespanLogging.NoOpLog(),
27+
log_file=nothing, profile=false, options=nothing) where {P<:Processor}
2828
if log_file !== nothing
2929
@warn "`log_file` is no longer supported\nPlease instead load `GraphViz.jl` and use `render_logs(logs, :graphviz)`."
3030
end
31-
Context(procs, proc_lock, proc_notify, log_sink,
32-
profile, options)
31+
Context(procs, proc_lock, log_sink, profile, options)
3332
end
3433
Context(xs::Vector{Int}; kwargs...) = Context(map(OSProc, xs); kwargs...)
3534
Context(ctx::Context, xs::Vector=copy(procs(ctx))) = # make a copy
@@ -62,41 +61,90 @@ procs(ctx::Context) = lock(ctx) do
6261
end
6362

6463
"""
65-
addprocs!(ctx::Context, xs)
64+
addprocs!(xs) -> Processor[]
6665
67-
Add new workers `xs` to `ctx`.
66+
Add new workers `xs` to the eager scheduler and returns the ones that were
67+
actually added.
6868
6969
Workers will typically be assigned new tasks in the next scheduling iteration
7070
if scheduling is ongoing.
7171
7272
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
7373
"""
74-
addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs))
75-
function addprocs!(ctx::Context, xs::AbstractVector{<:OSProc})
76-
lock(ctx) do
77-
append!(ctx.procs, xs)
74+
addprocs!(xs::AbstractVector{<:Integer}) = addprocs!(map(OSProc, xs))
75+
function addprocs!(xs::AbstractVector{<:Processor})
76+
Sch.init_eager()
77+
78+
ctx = Sch.eager_context()
79+
state = Sch.EAGER_STATE[]
80+
81+
to_add = setdiff(xs, procs(ctx))
82+
83+
timespan_start(ctx, :addprocs!, nothing, nothing)
84+
85+
# Initialize new procs
86+
for p in to_add
87+
Sch.init_proc(state, p, ctx.log_sink)
88+
89+
# Empty the processor cache list and force reschedule
90+
lock(state.lock) do
91+
state.procs_cache_list[] = nothing
92+
end
93+
put!(state.chan, Sch.RescheduleSignal())
7894
end
79-
lock(ctx.proc_notify) do
80-
notify(ctx.proc_notify)
95+
96+
lock(ctx) do
97+
append!(ctx.procs, to_add)
8198
end
99+
timespan_finish(ctx, :addprocs!, nothing, nothing)
100+
101+
return to_add
82102
end
83103

84104
"""
85-
rmprocs!(ctx::Context, xs)
105+
rmprocs!(xs) -> Processor[]
86106
87-
Remove the specified workers `xs` from `ctx`.
107+
Remove the specified workers `xs` from the eager scheduler and returns the ones
108+
that were actuall added.
88109
89110
Workers will typically finish all their assigned tasks if scheduling is ongoing
90111
but will not be assigned new tasks after removal.
91112
92113
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
93114
"""
94-
rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs))
95-
function rmprocs!(ctx::Context, xs::AbstractVector{<:OSProc})
96-
lock(ctx) do
97-
filter!(p -> (p xs), ctx.procs)
115+
rmprocs!(xs::AbstractVector{<:Integer}) = rmprocs!(map(OSProc, xs))
116+
function rmprocs!(xs::AbstractVector{<:Processor})
117+
Sch.init_eager()
118+
119+
ctx = Sch.eager_context()
120+
state = Sch.EAGER_STATE[]
121+
122+
_rmprocs!(ctx, state, xs; allow_empty=false)
123+
end
124+
125+
function _rmprocs!(ctx::Context, state, xs; allow_empty=true)
126+
127+
if !allow_empty && Set(procs(ctx)) == Set(xs)
128+
throw(ArgumentError("Refusing to remove all processors from the Dagger.Context, at least one must be present."))
129+
end
130+
131+
to_remove = intersect(xs, procs(ctx))
132+
133+
timespan_start(ctx, :rmprocs!, nothing, nothing)
134+
135+
for p in to_remove
136+
Sch.cleanup_proc(state, p, ctx.log_sink)
137+
138+
# Empty the processor cache list
139+
lock(state.lock) do
140+
state.procs_cache_list[] = nothing
141+
end
98142
end
99-
lock(ctx.proc_notify) do
100-
notify(ctx.proc_notify)
143+
144+
lock(ctx) do
145+
filter!(p -> (p to_remove), ctx.procs)
101146
end
147+
timespan_finish(ctx, :rmprocs!, nothing, nothing)
148+
149+
return to_remove
102150
end

Diff for: src/processor.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
export OSProc, Context, addprocs!, rmprocs!
1+
export OSProc
22

33
import Base: @invokelatest
44

Diff for: src/sch/Sch.jl

+2-63
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ function cleanup_proc(state, p, log_sink)
415415
# We allow ProcessExitedException's, which means that the worker
416416
# shutdown halfway through cleanup.
417417
if !(ex isa ProcessExitedException)
418-
throw(ex)
418+
rethrow()
419419
end
420420
end
421421
end
@@ -515,15 +515,6 @@ function scheduler_init(ctx, state::ComputeState, d::Thunk, options, deps)
515515
end
516516
end
517517
end
518-
519-
# Listen for new workers
520-
@async begin
521-
try
522-
monitor_procs_changed!(ctx, state)
523-
catch err
524-
@error "Error assigning workers" exception=(err,catch_backtrace())
525-
end
526-
end
527518
end
528519

529520
function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
@@ -643,11 +634,6 @@ function scheduler_exit(ctx, state::ComputeState, options)
643634
end
644635
empty!(state.futures)
645636
end
646-
647-
# Let the context procs handler clean itself up
648-
lock(ctx.proc_notify) do
649-
notify(ctx.proc_notify)
650-
end
651637
end
652638

653639
function procs_to_use(ctx, options=ctx.options)
@@ -865,56 +851,9 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
865851
end
866852
end
867853

868-
"""
869-
Monitors for workers being added/removed to/from `ctx`, sets up or tears down
870-
per-worker state, and notifies the scheduler so that work can be reassigned.
871-
"""
872-
function monitor_procs_changed!(ctx, state)
873-
# Load current set of procs
874-
old_ps = procs_to_use(ctx)
875-
876-
while !state.halt.set
877-
# Wait for the notification that procs have changed
878-
lock(ctx.proc_notify) do
879-
wait(ctx.proc_notify)
880-
end
881-
882-
timespan_start(ctx, :assign_procs, nothing, nothing)
883-
884-
# Load new set of procs
885-
new_ps = procs_to_use(ctx)
886-
887-
# Initialize new procs
888-
diffps = setdiff(new_ps, old_ps)
889-
for p in diffps
890-
init_proc(state, p, ctx.log_sink)
891-
892-
# Empty the processor cache list and force reschedule
893-
lock(state.lock) do
894-
state.procs_cache_list[] = nothing
895-
end
896-
put!(state.chan, RescheduleSignal())
897-
end
898-
899-
# Cleanup removed procs
900-
diffps = setdiff(old_ps, new_ps)
901-
for p in diffps
902-
cleanup_proc(state, p, ctx.log_sink)
903-
904-
# Empty the processor cache list
905-
lock(state.lock) do
906-
state.procs_cache_list[] = nothing
907-
end
908-
end
909-
910-
timespan_finish(ctx, :assign_procs, nothing, nothing)
911-
old_ps = new_ps
912-
end
913-
end
914-
915854
function remove_dead_proc!(ctx, state, proc, options=ctx.options)
916855
@assert options.single !== proc.pid "Single worker failed, cannot continue."
917-
rmprocs!(ctx, [proc])
856+
Dagger._rmprocs!(ctx, state, [proc])
918857
delete!(state.worker_time_pressure, proc.pid)
919858
delete!(state.worker_storage_pressure, proc.pid)
920859
delete!(state.worker_storage_capacity, proc.pid)

Diff for: test/processors.jl

-15
Original file line numberDiff line numberDiff line change
@@ -61,21 +61,6 @@ end
6161
addcb()
6262
end
6363

64-
@testset "Modify workers in Context" begin
65-
ps = addprocs(4, exeflags="--project")
66-
@everywhere ps using Dagger
67-
68-
ctx = Context(ps[1:2])
69-
70-
Dagger.addprocs!(ctx, ps[3:end])
71-
@test map(p -> p.pid, procs(ctx)) == ps
72-
73-
Dagger.rmprocs!(ctx, ps[3:end])
74-
@test map(p -> p.pid, procs(ctx)) == ps[1:2]
75-
76-
wait(rmprocs(ps))
77-
end
78-
7964
@testset "Callable as Thunk function" begin
8065
@everywhere begin
8166
struct ABC end

0 commit comments

Comments
 (0)