Skip to content

Commit 4c0049c

Browse files
tanmaykmJeffBezanson
authored andcommitted
do not use myid() to differentiate master & worker (#32879)
Occasionally while adding a large number of workers and particularly worker-worker connections are not lazy, it is possible to encounter the following error: ``` ERROR (unhandled task failure): MethodError: no method matching manage(::Base.Distributed.DefaultClusterManager, ::Int64, ::WorkerConfig, ::Symbol) Closest candidates are: manage(!Matched::Base.Distributed.SSHManager, ::Integer, ::WorkerConfig, ::Symbol) at distributed/managers.jl:224 manage(!Matched::Base.Distributed.LocalManager, ::Integer, ::WorkerConfig, ::Symbol) at distributed/managers.jl:337 manage(!Matched::Union{ClusterManagers.PBSManager, ClusterManagers.QRSHManager, ClusterManagers.SGEManager}, ::Int64, ::WorkerConfig, ::Symbol) at /home/jrun/.julia/v0.6/ClusterManagers/src/qsub.jl:115 ... Stacktrace: [1] deregister_worker(::Base.Distributed.ProcessGroup, ::Int64) at ./distributed/cluster.jl:903 [2] message_handler_loop(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:220 [3] process_tcp_streams(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:118 [4] (::Base.Distributed.##101#102{TCPSocket,TCPSocket,Bool})() at ./event.jl:73 ``` It can be simulated with this exact sequence of events: - worker2 in process of connecting to master - master has received the worker2s listen port, connected to it, sent the JoinPGRP message to it - master is now aware of worker2, and has added it to its list of workers - worker2 has still not processed the JoinPGRP message, so it is still unaware of its worker id - worker3 now connects to master - master sends the JoinPGRP message along with list of existing workers that includes worker2 - worker3 connects to worker2 - worker2 receives a new connection from worker3 and attempts to process it - worker3 faces an error and exits, thus breaking the connection - worker2 gets an error processing message from worker3 - goes into error handling - the current error handling code sees the self pid as 1 and incorrectly thinks it is the master - attempts to process the worker disconnection as a master and gets the error we see The MethodError prevents proper cleanup at the worker where it happens. The issue seems to be that it is not correct to identify whether a Julia process is master or worker by looking at the process id. Instead we should have a dedicated indicator for that. This change adds a new local process role variable that is set to `:master` by default, but is set to `:worker` when `start_worker` is invoked. This allows a process to know that it is running as a worker irrespective of whether it has received a process id or not.
1 parent 53397c9 commit 4c0049c

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

stdlib/Distributed/src/cluster.jl

+10-1
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ process as a worker using TCP/IP sockets for transport.
361361
`cookie` is a [`cluster_cookie`](@ref).
362362
"""
363363
function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
364+
myrole!(:worker)
365+
364366
# On workers, the default cluster manager connects via TCP sockets. Custom
365367
# transports will need to call this function with their own manager.
366368
global cluster_manager
@@ -783,12 +785,19 @@ end
783785

784786
# globals
785787
const LPROC = LocalProcess()
788+
const LPROCROLE = Ref{Symbol}(:master)
786789
const HDR_VERSION_LEN=16
787790
const HDR_COOKIE_LEN=16
788791
const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}()
789792
const map_sock_wrkr = IdDict()
790793
const map_del_wrkr = Set{Int}()
791794

795+
# whether process is a master or worker in a distributed setup
796+
myrole() = LPROCROLE[]
797+
function myrole!(proctype::Symbol)
798+
LPROCROLE[] = proctype
799+
end
800+
792801
# cluster management related API
793802
"""
794803
myid()
@@ -1108,7 +1117,7 @@ function deregister_worker(pg, pid)
11081117
end
11091118
end
11101119

1111-
if myid() == 1 && isdefined(w, :config)
1120+
if myid() == 1 && (myrole() === :master) && isdefined(w, :config)
11121121
# Notify the cluster manager of this workers death
11131122
manage(w.manager, w.id, w.config, :deregister)
11141123
if PGRP.topology != :all_to_all || isclusterlazy()

stdlib/Distributed/test/distributed_exec.jl

+10
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ end
4545
id_me = myid()
4646
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]
4747

48+
# Test role
49+
@everywhere using Distributed
50+
@test Distributed.myrole() === :master
51+
for wid = workers()
52+
wrole = remotecall_fetch(wid) do
53+
Distributed.myrole()
54+
end
55+
@test wrole === :worker
56+
end
57+
4858
# Test remote()
4959
let
5060
pool = default_worker_pool()

0 commit comments

Comments
 (0)