Skip to content

Commit fd951c2

Browse files
authored
support lazy all_to_all connection setups (#22814)
1 parent d328a82 commit fd951c2

File tree

8 files changed

+190
-63
lines changed

8 files changed

+190
-63
lines changed

NEWS.md

+3
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ This section lists changes that do not have deprecation warnings.
7979
the type of `n`). Use the corresponding mutating functions `randperm!` and `randcycle!`
8080
to control the array type ([#22723]).
8181

82+
* Worker-worker connections are setup lazily for an `:all_to_all` topology. Use keyword
83+
arg `lazy=false` to force all connections to be setup during a `addprocs` call. ([#22814])
84+
8285
Library improvements
8386
--------------------
8487

base/distributed/cluster.jl

+77-19
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ mutable struct Worker
5959
state::WorkerState
6060
c_state::Condition # wait for state changes
6161
ct_time::Float64 # creation time
62+
conn_func::Nullable{Function} # Used to setup connections lazily
6263

6364
r_stream::IO
6465
w_stream::IO
@@ -82,12 +83,13 @@ mutable struct Worker
8283
w
8384
end
8485

85-
function Worker(id::Int)
86+
Worker(id::Int) = Worker(id, Nullable{Function}())
87+
function Worker(id::Int, conn_func)
8688
@assert id > 0
8789
if haskey(map_pid_wrkr, id)
8890
return map_pid_wrkr[id]
8991
end
90-
w=new(id, [], [], false, W_CREATED, Condition(), time())
92+
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
9193
register_worker(w)
9294
w
9395
end
@@ -102,21 +104,56 @@ end
102104

103105
function check_worker_state(w::Worker)
104106
if w.state == W_CREATED
105-
if PGRP.topology == :all_to_all
106-
# Since higher pids connect with lower pids, the remote worker
107-
# may not have connected to us yet. Wait for some time.
108-
timeout = worker_timeout() - (time() - w.ct_time)
109-
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
110-
111-
@schedule (sleep(timeout); notify(w.c_state; all=true))
112-
wait(w.c_state)
113-
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
107+
if !isclusterlazy()
108+
if PGRP.topology == :all_to_all
109+
# Since higher pids connect with lower pids, the remote worker
110+
# may not have connected to us yet. Wait for some time.
111+
wait_for_conn(w)
112+
else
113+
error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology))
114+
end
114115
else
115-
error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology))
116+
w.ct_time = time()
117+
if myid() > w.id
118+
@schedule exec_conn_func(w)
119+
else
120+
# route request via node 1
121+
@schedule remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
122+
end
123+
wait_for_conn(w)
116124
end
117125
end
118126
end
119127

128+
exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id))
129+
function exec_conn_func(w::Worker)
130+
if isnull(w.conn_func)
131+
return wait_for_conn(w) # Some other task may be trying to connect at the same time.
132+
end
133+
134+
try
135+
f = get(w.conn_func)
136+
w.conn_func = Nullable{Function}()
137+
f()
138+
catch e
139+
w.conn_func = () -> throw(e)
140+
rethrow(e)
141+
end
142+
nothing
143+
end
144+
145+
function wait_for_conn(w)
146+
if w.state == W_CREATED
147+
timeout = worker_timeout() - (time() - w.ct_time)
148+
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
149+
150+
@schedule (sleep(timeout); notify(w.c_state; all=true))
151+
wait(w.c_state)
152+
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
153+
end
154+
nothing
155+
end
156+
120157
## process group creation ##
121158

122159
mutable struct LocalProcess
@@ -340,6 +377,17 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
340377
params = merge(default_addprocs_params(), AnyDict(kwargs))
341378
topology(Symbol(params[:topology]))
342379

380+
if PGRP.topology != :all_to_all
381+
params[:lazy] = false
382+
end
383+
384+
if isnull(PGRP.lazy) || nprocs() == 1
385+
PGRP.lazy = Nullable{Bool}(params[:lazy])
386+
elseif isclusterlazy() != params[:lazy]
387+
throw(ArgumentError(string("Active workers with lazy=", isclusterlazy(),
388+
". Cannot set lazy=", params[:lazy])))
389+
end
390+
343391
# References to launched workers, filled when each worker is fully initialized and
344392
# has connected to all nodes.
345393
launched_q = Int[] # Asynchronously filled by the launch method
@@ -396,7 +444,8 @@ default_addprocs_params() = AnyDict(
396444
:dir => pwd(),
397445
:exename => joinpath(JULIA_HOME, julia_exename()),
398446
:exeflags => ``,
399-
:enable_threaded_blas => false)
447+
:enable_threaded_blas => false,
448+
:lazy => true)
400449

401450

402451
function setup_launched_worker(manager, wconfig, launched_q)
@@ -517,7 +566,7 @@ function create_worker(manager, wconfig)
517566

518567
all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list)
519568
send_connection_hdr(w, true)
520-
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false))
569+
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false), isclusterlazy())
521570
send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message)
522571

523572
@schedule manage(w.manager, w.id, w.config, :register)
@@ -619,8 +668,9 @@ mutable struct ProcessGroup
619668
workers::Array{Any,1}
620669
refs::Dict # global references
621670
topology::Symbol
671+
lazy::Nullable{Bool}
622672

623-
ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all)
673+
ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, Nullable{Bool}())
624674
end
625675
const PGRP = ProcessGroup([])
626676

@@ -634,6 +684,14 @@ function topology(t)
634684
t
635685
end
636686

687+
function isclusterlazy()
688+
if isnull(PGRP.lazy)
689+
return false
690+
else
691+
return get(PGRP.lazy)
692+
end
693+
end
694+
637695
get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid))
638696
get_bind_addr(w::LocalProcess) = LPROC.bind_addr
639697
function get_bind_addr(w::Worker)
@@ -667,7 +725,7 @@ myid() = LPROC.id
667725
Get the number of available processes.
668726
"""
669727
function nprocs()
670-
if myid() == 1 || PGRP.topology == :all_to_all
728+
if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy())
671729
n = length(PGRP.workers)
672730
# filter out workers in the process of being setup/shutdown.
673731
for jw in PGRP.workers
@@ -698,7 +756,7 @@ end
698756
Returns a list of all process identifiers.
699757
"""
700758
function procs()
701-
if myid() == 1 || PGRP.topology == :all_to_all
759+
if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy())
702760
# filter out workers in the process of being setup/shutdown.
703761
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state == W_CONNECTED)]
704762
else
@@ -707,7 +765,7 @@ function procs()
707765
end
708766

709767
function id_in_procs(id) # faster version of `id in procs()`
710-
if myid() == 1 || PGRP.topology == :all_to_all
768+
if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy())
711769
for x in PGRP.workers
712770
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state == W_CONNECTED)
713771
return true
@@ -903,7 +961,7 @@ function deregister_worker(pg, pid)
903961
if myid() == 1 && isdefined(w, :config)
904962
# Notify the cluster manager of this workers death
905963
manage(w.manager, w.id, w.config, :deregister)
906-
if PGRP.topology != :all_to_all
964+
if PGRP.topology != :all_to_all || isclusterlazy()
907965
for rpid in workers()
908966
try
909967
remote_do(deregister_worker, rpid, pid)

base/distributed/managers.jl

+5-1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ Keyword arguments:
100100
A worker with a cluster manager identity `ident` will connect to all workers specified
101101
in `connect_idents`.
102102
103+
* `lazy`: Applicable only with `topology=:all_to_all`. If `true`, worker-worker connections
104+
are setup lazily, i.e. they are setup at the first instance of a remote call between
105+
workers. Default is true.
106+
103107
104108
Environment variables :
105109
@@ -302,7 +306,7 @@ addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...)
302306
Launches workers using the in-built `LocalManager` which only launches workers on the
303307
local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4
304308
processes on the local machine. If `restrict` is `true`, binding is restricted to
305-
`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, and
309+
`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, `lazy` and
306310
`enable_threaded_blas` have the same effect as documented for `addprocs(machines)`.
307311
"""
308312
function addprocs(np::Integer; restrict=true, kwargs...)

base/distributed/messages.jl

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ struct JoinPGRPMsg <: AbstractMsg
6868
other_workers::Array
6969
topology::Symbol
7070
enable_threaded_blas::Bool
71+
lazy::Bool
7172
end
7273
struct JoinCompleteMsg <: AbstractMsg
7374
cpu_cores::Int

base/distributed/process_messages.jl

+10-2
Original file line numberDiff line numberDiff line change
@@ -310,14 +310,22 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
310310
disable_threaded_libs()
311311
end
312312

313+
lazy = msg.lazy
314+
PGRP.lazy = Nullable{Bool}(lazy)
315+
313316
wait_tasks = Task[]
314317
for (connect_at, rpid) in msg.other_workers
315318
wconfig = WorkerConfig()
316319
wconfig.connect_at = connect_at
317320

318321
let rpid=rpid, wconfig=wconfig
319-
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
320-
push!(wait_tasks, t)
322+
if lazy
323+
# The constructor registers the object with a global registry.
324+
Worker(rpid, Nullable{Function}(()->connect_to_peer(cluster_manager, rpid, wconfig)))
325+
else
326+
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
327+
push!(wait_tasks, t)
328+
end
321329
end
322330
end
323331

doc/src/manual/parallel-computing.md

+6
Original file line numberDiff line numberDiff line change
@@ -1300,6 +1300,12 @@ connected to each other:
13001300
fields `ident` and `connect_idents` in `WorkerConfig`. A worker with a cluster-manager-provided
13011301
identity `ident` will connect to all workers specified in `connect_idents`.
13021302

1303+
Keyword argument `lazy=true|false` only affects `topology` option `:all_to_all`. If `true`, the cluster
1304+
starts off with the master connected to all workers. Specific worker-worker connections are established
1305+
at the first remote invocation between two workers. This helps in reducing initial resources allocated for
1306+
intra-cluster communication. Connections are setup depending on the runtime requirements of a parallel
1307+
program. Default value for `lazy` is `true`.
1308+
13031309
Currently, sending a message between unconnected workers results in an error. This behaviour,
13041310
as with the functionality and interface, should be considered experimental in nature and may change
13051311
in future releases.

test/distributed_exec.jl

+43-41
Original file line numberDiff line numberDiff line change
@@ -13,47 +13,6 @@ include("testenv.jl")
1313
addprocs_with_testenv(4)
1414
@test nprocs() == 5
1515

16-
function reuseport_tests()
17-
# Run the test on all processes.
18-
results = asyncmap(procs()) do p
19-
remotecall_fetch(p) do
20-
ports_lower = [] # ports of pids lower than myid()
21-
ports_higher = [] # ports of pids higher than myid()
22-
for w in Base.Distributed.PGRP.workers
23-
w.id == myid() && continue
24-
port = Base._sockname(w.r_stream, true)[2]
25-
if (w.id == 1)
26-
# master connects to workers
27-
push!(ports_higher, port)
28-
elseif w.id < myid()
29-
push!(ports_lower, port)
30-
elseif w.id > myid()
31-
push!(ports_higher, port)
32-
end
33-
end
34-
@assert (length(ports_lower) + length(ports_higher)) == nworkers()
35-
for portset in [ports_lower, ports_higher]
36-
if (length(portset) > 0) && (length(unique(portset)) != 1)
37-
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
38-
return 0
39-
end
40-
end
41-
return myid()
42-
end
43-
end
44-
45-
# Ensure that the code has indeed been successfully executed everywhere
46-
@test all(p -> p in results, procs())
47-
end
48-
49-
# Test that the client port is reused. SO_REUSEPORT may not be supported on
50-
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
51-
if ccall(:jl_has_so_reuseport, Int32, ()) == 1
52-
reuseport_tests()
53-
else
54-
info("SO_REUSEPORT is unsupported, skipping reuseport tests.")
55-
end
56-
5716
id_me = myid()
5817
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]
5918

@@ -1817,6 +1776,49 @@ p1,p2 = addprocs_with_testenv(2)
18171776
@everywhere f22865(p) = remotecall_fetch(x->x.*2, p, ones(2))
18181777
@test ones(2).*2 == remotecall_fetch(f22865, p1, p2)
18191778

1779+
function reuseport_tests()
1780+
# Run the test on all processes.
1781+
results = asyncmap(procs()) do p
1782+
remotecall_fetch(p) do
1783+
ports_lower = [] # ports of pids lower than myid()
1784+
ports_higher = [] # ports of pids higher than myid()
1785+
for w in Base.Distributed.PGRP.workers
1786+
w.id == myid() && continue
1787+
port = Base._sockname(w.r_stream, true)[2]
1788+
if (w.id == 1)
1789+
# master connects to workers
1790+
push!(ports_higher, port)
1791+
elseif w.id < myid()
1792+
push!(ports_lower, port)
1793+
elseif w.id > myid()
1794+
push!(ports_higher, port)
1795+
end
1796+
end
1797+
@assert (length(ports_lower) + length(ports_higher)) == nworkers()
1798+
for portset in [ports_lower, ports_higher]
1799+
if (length(portset) > 0) && (length(unique(portset)) != 1)
1800+
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
1801+
return 0
1802+
end
1803+
end
1804+
return myid()
1805+
end
1806+
end
1807+
1808+
# Ensure that the code has indeed been successfully executed everywhere
1809+
@test all(p -> p in results, procs())
1810+
end
1811+
1812+
# Test that the client port is reused. SO_REUSEPORT may not be supported on
1813+
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
1814+
if ccall(:jl_has_so_reuseport, Int32, ()) == 1
1815+
rmprocs(workers())
1816+
addprocs_with_testenv(4; lazy=false)
1817+
reuseport_tests()
1818+
else
1819+
info("SO_REUSEPORT is unsupported, skipping reuseport tests.")
1820+
end
1821+
18201822
# Run topology tests last after removing all workers, since a given
18211823
# cluster at any time only supports a single topology.
18221824
rmprocs(workers())

0 commit comments

Comments
 (0)