Skip to content

Commit 64aba00

Browse files
committed
Add support for worker statuses
1 parent e23c490 commit 64aba00

File tree

4 files changed

+97
-10
lines changed

4 files changed

+97
-10
lines changed

docs/src/_changelog.md

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ This documents notable changes in DistributedNext.jl. The format is based on
2020
exported ([#18]).
2121
- Implemented callback support for workers being added/removed etc ([#17]).
2222
- Added a package extension to support Revise.jl ([#17]).
23+
- Added support for setting worker statuses with [`setstatus`](@ref) and
24+
[`getstatus`](@ref) ([#17]).
2325

2426
## [v1.0.0] - 2024-12-02
2527

docs/src/index.md

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ DistributedNext.rmprocs
1414
DistributedNext.interrupt
1515
DistributedNext.myid
1616
DistributedNext.pmap
17+
DistributedNext.getstatus
18+
DistributedNext.setstatus
1719
DistributedNext.RemoteException
1820
DistributedNext.ProcessExitedException
1921
DistributedNext.Future

src/cluster.jl

+66-5
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,8 @@ const LPROC = LocalProcess()
870870
const LPROCROLE = Ref{Symbol}(:master)
871871
const HDR_VERSION_LEN=16
872872
const HDR_COOKIE_LEN=16
873+
const map_pid_statuses = Dict{Int, Any}()
874+
const map_pid_statuses_lock = ReentrantLock()
873875
const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}()
874876
const map_sock_wrkr = IdDict()
875877
const map_del_wrkr = Set{Int}()
@@ -1010,15 +1012,16 @@ for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker
10101012
segfaulting etc). Chooses and returns a unique key for the callback if `key` is
10111013
not specified.
10121014
1013-
The callback will be called with the worker ID and the final
1014-
`Distributed.WorkerState` of the worker, e.g. `f(w::Int, state)`. `state` is an
1015+
The callback will be called with the worker ID, the final
1016+
`Distributed.WorkerState` of the worker, and the last status of the worker as
1017+
set by [`setstatus`](@ref), e.g. `f(w::Int, state, status)`. `state` is an
10151018
enum, a value of `WorkerState_terminated` means a graceful exit and a value of
10161019
`WorkerState_exterminated` means the worker died unexpectedly.
10171020
10181021
If the callback throws an exception it will be caught and printed.
10191022
"""
10201023
add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks;
1021-
arg_types=Tuple{Int, WorkerState})
1024+
arg_types=Tuple{Int, WorkerState, Any})
10221025

10231026
"""
10241027
remove_worker_exited_callback(key)
@@ -1206,6 +1209,59 @@ Identical to [`workers()`](@ref) except that the current worker is filtered out.
12061209
"""
12071210
other_workers() = filter(!=(myid()), workers())
12081211

1212+
"""
1213+
setstatus(x, pid::Int=myid())
1214+
1215+
Set the status for worker `pid` to `x`. `x` may be any serializable object but
1216+
it's recommended to keep it small enough to cheaply send over a network. The
1217+
status will be passed to the worker-exited callbacks (see
1218+
[`add_worker_exited_callback`](@ref)) when the worker exits.
1219+
1220+
This can be handy if you want a way to know what a worker is doing at any given
1221+
time, or (in combination with a worker-exited callback) for knowing what a
1222+
worker was last doing before it died.
1223+
1224+
# Examples
1225+
```julia-repl
1226+
julia> DistributedNext.setstatus("working on dataset 42")
1227+
"working on dataset 42"
1228+
1229+
julia> DistributedNext.getstatus()
1230+
"working on dataset 42"
1231+
```
1232+
"""
1233+
function setstatus(x, pid::Int=myid())
1234+
if pid procs()
1235+
throw(ArgumentError("Worker $(pid) does not exist, cannot set its status"))
1236+
end
1237+
1238+
if myid() == 1
1239+
@lock map_pid_statuses_lock map_pid_statuses[pid] = x
1240+
else
1241+
remotecall_fetch(setstatus, 1, x, myid())
1242+
end
1243+
end
1244+
1245+
_getstatus(pid) = @lock map_pid_statuses_lock get!(map_pid_statuses, pid, nothing)
1246+
1247+
"""
1248+
getstatus(pid::Int=myid())
1249+
1250+
Get the status for worker `pid`. If one was never explicitly set with
1251+
[`setstatus`](@ref) this will return `nothing`.
1252+
"""
1253+
function getstatus(pid::Int=myid())
1254+
if pid procs()
1255+
throw(ArgumentError("Worker $(pid) does not exist, cannot get its status"))
1256+
end
1257+
1258+
if myid() == 1
1259+
_getstatus(pid)
1260+
else
1261+
remotecall_fetch(getstatus, 1, pid)
1262+
end
1263+
end
1264+
12091265
function cluster_mgmt_from_master_check()
12101266
if myid() != 1
12111267
throw(ErrorException("Only process 1 can add and remove workers"))
@@ -1425,15 +1481,20 @@ function deregister_worker(pg, pid)
14251481
end
14261482
end
14271483

1428-
# Call callbacks on the master
14291484
if myid() == 1
1485+
status = _getstatus(pid)
1486+
1487+
# Call callbacks on the master
14301488
for (name, callback) in worker_exited_callbacks
14311489
try
1432-
callback(pid, w.state)
1490+
callback(pid, w.state, status)
14331491
catch ex
14341492
@error "Error when running worker-exited callback '$(name)'" exception=(ex, catch_backtrace())
14351493
end
14361494
end
1495+
1496+
# Delete its status
1497+
@lock map_pid_statuses_lock delete!(map_pid_statuses, pid)
14371498
end
14381499

14391500
return

test/distributed_exec.jl

+27-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import Revise
44
using DistributedNext, Random, Serialization, Sockets
55
import DistributedNext
6-
import DistributedNext: launch, manage
6+
import DistributedNext: launch, manage, getstatus, setstatus
77

88

99
@test cluster_cookie() isa String
@@ -1940,6 +1940,24 @@ include("splitrange.jl")
19401940
end
19411941
end
19421942

1943+
@testset "Worker statuses" begin
1944+
rmprocs(other_workers())
1945+
1946+
# Test with the local worker
1947+
@test isnothing(getstatus())
1948+
setstatus("foo")
1949+
@test getstatus() == "foo"
1950+
@test_throws ArgumentError getstatus(2)
1951+
1952+
# Test with a remote worker
1953+
pid = only(addprocs(1))
1954+
@test isnothing(getstatus(pid))
1955+
remotecall_wait(setstatus, pid, "bar", pid)
1956+
@test remotecall_fetch(getstatus, pid) == "bar"
1957+
1958+
rmprocs(pid)
1959+
end
1960+
19431961
@testset "Worker state callbacks" begin
19441962
rmprocs(other_workers())
19451963

@@ -1954,7 +1972,7 @@ end
19541972
starting_key = DistributedNext.add_worker_starting_callback((manager, kwargs) -> push!(starting_managers, manager))
19551973
started_key = DistributedNext.add_worker_started_callback(pid -> (push!(started_workers, pid); error("foo")))
19561974
exiting_key = DistributedNext.add_worker_exiting_callback(pid -> push!(exiting_workers, pid))
1957-
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> push!(exited_workers, (pid, state)))
1975+
exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> push!(exited_workers, (pid, state, status)))
19581976

19591977
# Test that the worker-started exception bubbles up
19601978
@test_throws TaskFailedException addprocs(1)
@@ -1964,7 +1982,7 @@ end
19641982
@test started_workers == [pid]
19651983
rmprocs(workers())
19661984
@test exiting_workers == [pid]
1967-
@test exited_workers == [(pid, DistributedNext.WorkerState_terminated)]
1985+
@test exited_workers == [(pid, DistributedNext.WorkerState_terminated, nothing)]
19681986

19691987
# Trying to reset an existing callback should fail
19701988
@test_throws ArgumentError DistributedNext.add_worker_started_callback(Returns(nothing); key=started_key)
@@ -1997,16 +2015,20 @@ end
19972015
@test length(exiting_workers) == 1
19982016
@test length(exited_workers) == 1
19992017

2000-
# Test that workers that were killed forcefully are detected as such
2018+
# Test that workers that were killed forcefully are detected as such, and
2019+
# that statuses are passed properly.
20012020
exit_state = nothing
2002-
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state)
2021+
last_status = nothing
2022+
exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> (exit_state = state; last_status = status))
20032023
pid = only(addprocs(1))
2024+
setstatus("foo", pid)
20042025

20052026
redirect_stderr(devnull) do
20062027
remote_do(exit, pid)
20072028
timedwait(() -> !isnothing(exit_state), 10)
20082029
end
20092030
@test exit_state == DistributedNext.WorkerState_exterminated
2031+
@test last_status == "foo"
20102032
DistributedNext.remove_worker_exited_callback(exited_key)
20112033
end
20122034

0 commit comments

Comments
 (0)