Skip to content

Commit d5fd837

Browse files
committed
Rename the WorkerState instances and add an exterminated state
The new `WorkerState_exterminated` state is for indicating that a worker was killed by something other than us.
1 parent 8779372 commit d5fd837

File tree

3 files changed

+34
-19
lines changed

3 files changed

+34
-19
lines changed

src/cluster.jl

+30-15
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,15 @@ mutable struct WorkerConfig
9292
end
9393
end
9494

95-
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED W_UNKNOWN_STATE
95+
@enum WorkerState begin
96+
WorkerState_created
97+
WorkerState_connected
98+
WorkerState_terminating # rmprocs() has been called on the worker
99+
WorkerState_terminated # Worker was gracefully removed
100+
WorkerState_exterminated # Worker was forcefully removed (not by us)
101+
WorkerState_unknown
102+
end
103+
96104
mutable struct Worker
97105
id::Int
98106
msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
@@ -123,7 +131,7 @@ mutable struct Worker
123131
w.manager = manager
124132
w.config = config
125133
w.version = version
126-
set_worker_state(w, W_CONNECTED)
134+
set_worker_state(w, WorkerState_connected)
127135
register_worker_streams(w)
128136
w
129137
end
@@ -134,7 +142,7 @@ mutable struct Worker
134142
if haskey(map_pid_wrkr, id)
135143
return map_pid_wrkr[id]
136144
end
137-
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Threads.Condition(), time(), conn_func)
145+
w=new(id, Threads.ReentrantLock(), [], [], false, WorkerState_created, Threads.Condition(), time(), conn_func)
138146
w.initialized = Event()
139147
register_worker(w)
140148
w
@@ -150,8 +158,15 @@ function set_worker_state(w, state)
150158
end
151159
end
152160

161+
# Helper function to check if a worker is dead or not. It's recommended to use
162+
# this instead of checking Worker.state manually.
163+
function is_worker_dead(w::Worker)
164+
state = @atomic w.state
165+
return state === WorkerState_terminated || state === WorkerState_exterminated
166+
end
167+
153168
function check_worker_state(w::Worker)
154-
if (@atomic w.state) === W_CREATED
169+
if (@atomic w.state) === WorkerState_created
155170
if !isclusterlazy()
156171
if PGRP.topology === :all_to_all
157172
# Since higher pids connect with lower pids, the remote worker
@@ -190,7 +205,7 @@ function exec_conn_func(w::Worker)
190205
end
191206

192207
function wait_for_conn(w)
193-
if (@atomic w.state) === W_CREATED
208+
if (@atomic w.state) === WorkerState_created
194209
timeout = worker_timeout() - (time() - w.ct_time)
195210
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
196211

@@ -203,7 +218,7 @@ function wait_for_conn(w)
203218
errormonitor(T)
204219
lock(w.c_state) do
205220
wait(w.c_state)
206-
(@atomic w.state) === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
221+
(@atomic w.state) === WorkerState_created && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
207222
end
208223
end
209224
nothing
@@ -666,7 +681,7 @@ function create_worker(manager, wconfig)
666681
if (jw.id != 1) && (jw.id < w.id)
667682
lock(jw.c_state) do
668683
# wait for wl to join
669-
if (@atomic jw.state) === W_CREATED
684+
if (@atomic jw.state) === WorkerState_created
670685
wait(jw.c_state)
671686
end
672687
end
@@ -693,7 +708,7 @@ function create_worker(manager, wconfig)
693708

694709
for wl in wlist
695710
lock(wl.c_state) do
696-
if (@atomic wl.state) === W_CREATED
711+
if (@atomic wl.state) === WorkerState_created
697712
# wait for wl to join
698713
wait(wl.c_state)
699714
end
@@ -900,7 +915,7 @@ function nprocs()
900915
n = length(PGRP.workers)
901916
# filter out workers in the process of being setup/shutdown.
902917
for jw in PGRP.workers
903-
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED)
918+
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== WorkerState_connected)
904919
n = n - 1
905920
end
906921
end
@@ -953,7 +968,7 @@ julia> procs()
953968
function procs()
954969
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
955970
# filter out workers in the process of being setup/shutdown.
956-
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
971+
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)]
957972
else
958973
return Int[x.id for x in PGRP.workers]
959974
end
@@ -970,7 +985,7 @@ other_procs() = filter(!=(myid()), procs())
970985
function id_in_procs(id) # faster version of `id in procs()`
971986
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
972987
for x in PGRP.workers
973-
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED)
988+
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === WorkerState_connected)
974989
return true
975990
end
976991
end
@@ -994,7 +1009,7 @@ See also [`other_procs()`](@ref).
9941009
"""
9951010
function procs(pid::Integer)
9961011
if myid() == 1
997-
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
1012+
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)]
9981013
if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
9991014
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
10001015
else
@@ -1103,7 +1118,7 @@ function _rmprocs(pids, waitfor)
11031118
else
11041119
if haskey(map_pid_wrkr, p)
11051120
w = map_pid_wrkr[p]
1106-
set_worker_state(w, W_TERMINATING)
1121+
set_worker_state(w, WorkerState_terminating)
11071122
kill(w.manager, p, w.config)
11081123
push!(rmprocset, w)
11091124
end
@@ -1112,11 +1127,11 @@ function _rmprocs(pids, waitfor)
11121127

11131128
start = time_ns()
11141129
while (time_ns() - start) < waitfor*1e9
1115-
all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break
1130+
all(is_worker_dead, rmprocset) && break
11161131
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
11171132
end
11181133

1119-
unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)]
1134+
unremoved = [wrkr.id for wrkr in filter(!is_worker_dead, rmprocset)]
11201135
if length(unremoved) > 0
11211136
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
11221137
throw(ErrorException(estr))

src/messages.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ end
194194
function flush_gc_msgs()
195195
try
196196
for w in (PGRP::ProcessGroup).workers
197-
if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag
197+
if isa(w,Worker) && ((@atomic w.state) == WorkerState_connected) && w.gcflag
198198
flush_gc_msgs(w)
199199
end
200200
end

src/process_messages.jl

+3-3
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
210210
handle_msg(msg, header, r_stream, w_stream, version)
211211
end
212212
catch e
213-
oldstate = W_UNKNOWN_STATE
213+
oldstate = WorkerState_unknown
214214

215215
# Check again as it may have been set in a message handler but not propagated to the calling block above
216216
if wpid < 1
@@ -223,7 +223,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
223223
elseif !(wpid in map_del_wrkr)
224224
werr = worker_from_id(wpid)
225225
oldstate = @atomic werr.state
226-
set_worker_state(werr, W_TERMINATED)
226+
set_worker_state(werr, oldstate != WorkerState_terminating ? WorkerState_exterminated : WorkerState_terminated)
227227

228228
# If unhandleable error occurred talking to pid 1, exit
229229
if wpid == 1
@@ -243,7 +243,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
243243
close(w_stream)
244244

245245
if (myid() == 1) && (wpid > 1)
246-
if oldstate != W_TERMINATING
246+
if oldstate != WorkerState_terminating
247247
println(stderr, "Worker $wpid terminated.")
248248
rethrow()
249249
end

0 commit comments

Comments
 (0)