Skip to content

Commit 0394f47

Browse files
committed
Fix reuse of client port on Linux. Implement for OSX.
1 parent f988566 commit 0394f47

File tree

5 files changed

+88
-36
lines changed

5 files changed

+88
-36
lines changed

base/distributed/cluster.jl

+11-4
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,15 @@ function start_worker(out::IO, cookie::AbstractString)
153153
init_worker(cookie)
154154
interface = IPv4(LPROC.bind_addr)
155155
if LPROC.bind_port == 0
156-
(actual_port,sock) = listenany(interface, UInt16(9009))
157-
LPROC.bind_port = actual_port
156+
addr = Base.InetAddr(interface, 0)
157+
sock = Base.TCPServer()
158+
if bind(sock, addr) && Base.trylisten(sock) == 0
159+
_addr, port = Base._sockname(sock, true)
160+
LPROC.bind_port = port
161+
else
162+
close(sock)
163+
error("no ports available")
164+
end
158165
else
159166
sock = listen(interface, LPROC.bind_port)
160167
end
@@ -256,9 +263,9 @@ end
256263
function parse_connection_info(str)
257264
m = match(r"^julia_worker:(\d+)#(.*)", str)
258265
if m !== nothing
259-
(m.captures[2], parse(Int16, m.captures[1]))
266+
(m.captures[2], parse(UInt16, m.captures[1]))
260267
else
261-
("", Int16(-1))
268+
("", UInt16(0))
262269
end
263270
end
264271

base/distributed/managers.jl

+23-20
Original file line numberDiff line numberDiff line change
@@ -455,31 +455,34 @@ end
455455
const client_port = Ref{Cushort}(0)
456456

457457
function socket_reuse_port()
458-
s = TCPSocket()
459-
client_host = Ref{Cuint}(0)
460-
ccall(:jl_tcp_bind, Int32,
461-
(Ptr{Void}, UInt16, UInt32, Cuint),
462-
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))
463-
464-
# TODO: Support OSX and change the above code to call setsockopt before bind once libuv provides
465-
# early access to a socket fd, i.e., before a bind call.
466-
467-
@static if is_linux()
468-
try
469-
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
470-
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
471-
return s
472-
elseif rc < 0
473-
throw(SystemError("setsockopt() SO_REUSEPORT : "))
474-
end
475-
getsockname(s)
476-
catch e
458+
@static if is_linux() || is_apple()
459+
s = TCPSocket(delay = false)
460+
461+
# Linux requires the port to be bound before setting REUSEPORT, OSX after.
462+
is_linux() && bind_client_port(s)
463+
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
464+
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
465+
return s
466+
elseif rc < 0
477467
# This is an issue only on systems with lots of client connections, hence delay the warning
478-
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to plain socket : ", e)
468+
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to regular socket.")
469+
479470
# provide a clean new socket
480471
return TCPSocket()
481472
end
473+
is_apple() && bind_client_port(s)
474+
else
475+
return TCPSocket()
482476
end
477+
end
478+
479+
function bind_client_port(s)
480+
err = ccall(:jl_tcp_bind, Int32, (Ptr{Void}, UInt16, UInt32, Cuint),
481+
s.handle, hton(client_port[]), hton(UInt32(0)), 0)
482+
Base.uv_error("bind() failed", err)
483+
484+
_addr, port = Base._sockname(s, true)
485+
client_port[] = port
483486
return s
484487
end
485488

base/socket.jl

+13-7
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,14 @@ mutable struct TCPSocket <: LibuvStream
282282
return tcp
283283
end
284284
end
285-
function TCPSocket()
285+
286+
# kw arg "delay": if true, libuv delays creation of the socket fd till the first bind call
287+
function TCPSocket(; delay=true)
286288
tcp = TCPSocket(Libc.malloc(_sizeof_uv_tcp), StatusUninit)
287-
err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}),
288-
eventloop(), tcp.handle)
289+
af_spec = delay ? 0 : 2 # AF_UNSPEC is 0, AF_INET is 2
290+
291+
err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Void}, Ptr{Void}, Cuint),
292+
eventloop(), tcp.handle, af_spec)
289293
uv_error("failed to create tcp socket", err)
290294
tcp.status = StatusInit
291295
return tcp
@@ -840,16 +844,18 @@ listenany(default_port) = listenany(IPv4(UInt32(0)), default_port)
840844
Get the IP address and the port that the given `TCPSocket` is connected to
841845
(or bound to, in the case of `TCPServer`).
842846
"""
843-
function getsockname(sock::Union{TCPServer,TCPSocket})
847+
getsockname(sock::Union{TCPServer, TCPSocket}) = _sockname(sock, isa(sock, TCPServer))
848+
849+
function _sockname(sock, self)
844850
rport = Ref{Cushort}(0)
845851
raddress = zeros(UInt8, 16)
846852
rfamily = Ref{Cuint}(0)
847-
r = if isa(sock, TCPServer)
848-
ccall(:jl_tcp_getsockname, Int32,
853+
if self
854+
r = ccall(:jl_tcp_getsockname, Int32,
849855
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
850856
sock.handle, rport, raddress, rfamily)
851857
else
852-
ccall(:jl_tcp_getpeername, Int32,
858+
r = ccall(:jl_tcp_getpeername, Int32,
853859
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
854860
sock.handle, rport, raddress, rfamily)
855861
end

doc/src/manual/parallel-computing.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -1231,8 +1231,8 @@ as local laptops, departmental clusters, or even the cloud. This section covers
12311231
requirements for the inbuilt `LocalManager` and `SSHManager`:
12321232
12331233
* The master process does not listen on any port. It only connects out to the workers.
1234-
* Each worker binds to only one of the local interfaces and listens on the first free port starting
1235-
from `9009`.
1234+
* Each worker binds to only one of the local interfaces and listens on an ephemeral port number
1235+
assigned by the OS.
12361236
* `LocalManager`, used by `addprocs(N)`, by default binds only to the loopback interface. This means
12371237
that workers started later on remote hosts (or by anyone with malicious intentions) are unable
12381238
to connect to the cluster. An `addprocs(4)` followed by an `addprocs(["remote_host"])` will fail.
@@ -1250,8 +1250,9 @@ requirements for the inbuilt `LocalManager` and `SSHManager`:
12501250
authenticated via public key infrastructure (PKI). Authentication credentials can be supplied
12511251
via `sshflags`, for example ```sshflags=`-e <keyfile>` ```.
12521252
1253-
Note that worker-worker connections are still plain TCP and the local security policy on the remote
1254-
cluster must allow for free connections between worker nodes, at least for ports 9009 and above.
1253+
In an all-to-all topology (the default), all workers connect to each other via plain TCP sockets.
1254+
The security policy on the cluster nodes must thus ensure free connectivity between workers for
1255+
the ephemeral port range (varies by OS).
12551256
12561257
Securing and encrypting all worker-worker traffic (via SSH) or encrypting individual messages
12571258
can be done via a custom ClusterManager.

test/distributed_exec.jl

+36-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,41 @@ include("testenv.jl")
1212

1313
addprocs_with_testenv(4)
1414

15+
# Test that the client port is reused. SO_REUSEPORT may not be supported on
16+
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
17+
if is_unix()
18+
# Run the test on all processes.
19+
results = asyncmap(procs()) do p
20+
remotecall_fetch(p) do
21+
ports_lower = [] # ports of pids lower than myid()
22+
ports_higher = [] # ports of pids higher than myid()
23+
for w in Base.Distributed.PGRP.workers
24+
w.id == myid() && continue
25+
port = Base._sockname(w.r_stream, true)[2]
26+
if (w.id == 1)
27+
# master connects to workers
28+
push!(ports_higher, port)
29+
elseif w.id < myid()
30+
push!(ports_lower, port)
31+
elseif w.id > myid()
32+
push!(ports_higher, port)
33+
end
34+
end
35+
@assert (length(ports_lower) + length(ports_higher)) == nworkers()
36+
for portset in [ports_lower, ports_higher]
37+
if (length(portset) > 0) && (length(unique(portset)) != 1)
38+
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
39+
return 0
40+
end
41+
end
42+
return myid()
43+
end
44+
end
45+
46+
# Ensure that the code has indeed been successfully executed everywhere
47+
@test all(p -> p in results, procs())
48+
end
49+
1550
id_me = myid()
1651
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]
1752

@@ -923,7 +958,7 @@ if is_unix() # aka have ssh
923958
end
924959
end
925960

926-
remotecall_fetch(plst->rmprocs(plst; waitfor=5.0), 1, new_pids)
961+
remotecall_fetch(rmprocs, 1, new_pids)
927962
end
928963

929964
print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n")

0 commit comments

Comments
 (0)