Skip to content

Commit e5fb87d

Browse files
authored
Merge pull request #21818 from JuliaLang/amitm/reuseportfix
Fix reuse of client port on Linux. Implement for OSX.
2 parents f760dad + fa8c4d2 commit e5fb87d

File tree

7 files changed

+108
-53
lines changed

7 files changed

+108
-53
lines changed

NEWS.md

+4
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,10 @@ This section lists changes that do not have deprecation warnings.
304304
* `homedir` now determines the user's home directory via `libuv`'s `uv_os_homedir`,
305305
rather than from environment variables ([#19636]).
306306
307+
* Workers now listen on an ephemeral port assigned by the OS. Previously workers would
308+
listen on the first free port available from 9009 ([#21818]).
309+
310+
307311
Library improvements
308312
--------------------
309313

base/distributed/cluster.jl

+4-4
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ function start_worker(out::IO, cookie::AbstractString)
153153
init_worker(cookie)
154154
interface = IPv4(LPROC.bind_addr)
155155
if LPROC.bind_port == 0
156-
(actual_port,sock) = listenany(interface, UInt16(9009))
157-
LPROC.bind_port = actual_port
156+
(port, sock) = listenany(interface, UInt16(0))
157+
LPROC.bind_port = port
158158
else
159159
sock = listen(interface, LPROC.bind_port)
160160
end
@@ -256,9 +256,9 @@ end
256256
function parse_connection_info(str)
257257
m = match(r"^julia_worker:(\d+)#(.*)", str)
258258
if m !== nothing
259-
(m.captures[2], parse(Int16, m.captures[1]))
259+
(m.captures[2], parse(UInt16, m.captures[1]))
260260
else
261-
("", Int16(-1))
261+
("", UInt16(0))
262262
end
263263
end
264264

base/distributed/managers.jl

+23-20
Original file line numberDiff line numberDiff line change
@@ -455,31 +455,34 @@ end
455455
const client_port = Ref{Cushort}(0)
456456

457457
function socket_reuse_port()
458-
s = TCPSocket()
459-
client_host = Ref{Cuint}(0)
460-
ccall(:jl_tcp_bind, Int32,
461-
(Ptr{Void}, UInt16, UInt32, Cuint),
462-
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))
463-
464-
# TODO: Support OSX and change the above code to call setsockopt before bind once libuv provides
465-
# early access to a socket fd, i.e., before a bind call.
466-
467-
@static if is_linux()
468-
try
469-
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
470-
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
471-
return s
472-
elseif rc < 0
473-
throw(SystemError("setsockopt() SO_REUSEPORT : "))
474-
end
475-
getsockname(s)
476-
catch e
458+
@static if is_linux() || is_apple()
459+
s = TCPSocket(delay = false)
460+
461+
# Linux requires the port to be bound before setting REUSEPORT, OSX after.
462+
is_linux() && bind_client_port(s)
463+
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
464+
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
465+
return s
466+
elseif rc < 0
477467
# This is an issue only on systems with lots of client connections, hence delay the warning
478-
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to plain socket : ", e)
468+
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to regular socket.")
469+
479470
# provide a clean new socket
480471
return TCPSocket()
481472
end
473+
is_apple() && bind_client_port(s)
474+
else
475+
return TCPSocket()
482476
end
477+
end
478+
479+
function bind_client_port(s)
480+
err = ccall(:jl_tcp_bind, Int32, (Ptr{Void}, UInt16, UInt32, Cuint),
481+
s.handle, hton(client_port[]), hton(UInt32(0)), 0)
482+
Base.uv_error("bind() failed", err)
483+
484+
_addr, port = Base._sockname(s, true)
485+
client_port[] = port
483486
return s
484487
end
485488

base/socket.jl

+17-7
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,14 @@ mutable struct TCPSocket <: LibuvStream
282282
return tcp
283283
end
284284
end
285-
function TCPSocket()
285+
286+
# kw arg "delay": if true, libuv delays creation of the socket fd till the first bind call
287+
function TCPSocket(; delay=true)
286288
tcp = TCPSocket(Libc.malloc(_sizeof_uv_tcp), StatusUninit)
287-
err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}),
288-
eventloop(), tcp.handle)
289+
af_spec = delay ? 0 : 2 # AF_UNSPEC is 0, AF_INET is 2
290+
291+
err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Void}, Ptr{Void}, Cuint),
292+
eventloop(), tcp.handle, af_spec)
289293
uv_error("failed to create tcp socket", err)
290294
tcp.status = StatusInit
291295
return tcp
@@ -822,6 +826,10 @@ function listenany(host::IPAddr, default_port)
822826
while true
823827
sock = TCPServer()
824828
if bind(sock, addr) && trylisten(sock) == 0
829+
if default_port == 0
830+
_addr, port = _sockname(sock, true)
831+
return (port, sock)
832+
end
825833
return (addr.port, sock)
826834
end
827835
close(sock)
@@ -840,16 +848,18 @@ listenany(default_port) = listenany(IPv4(UInt32(0)), default_port)
840848
Get the IP address and the port that the given `TCPSocket` is connected to
841849
(or bound to, in the case of `TCPServer`).
842850
"""
843-
function getsockname(sock::Union{TCPServer,TCPSocket})
851+
getsockname(sock::Union{TCPServer, TCPSocket}) = _sockname(sock, isa(sock, TCPServer))
852+
853+
function _sockname(sock, self)
844854
rport = Ref{Cushort}(0)
845855
raddress = zeros(UInt8, 16)
846856
rfamily = Ref{Cuint}(0)
847-
r = if isa(sock, TCPServer)
848-
ccall(:jl_tcp_getsockname, Int32,
857+
if self
858+
r = ccall(:jl_tcp_getsockname, Int32,
849859
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
850860
sock.handle, rport, raddress, rfamily)
851861
else
852-
ccall(:jl_tcp_getpeername, Int32,
862+
r = ccall(:jl_tcp_getpeername, Int32,
853863
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
854864
sock.handle, rport, raddress, rfamily)
855865
end

doc/src/manual/parallel-computing.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -1231,8 +1231,8 @@ as local laptops, departmental clusters, or even the cloud. This section covers
12311231
requirements for the inbuilt `LocalManager` and `SSHManager`:
12321232
12331233
* The master process does not listen on any port. It only connects out to the workers.
1234-
* Each worker binds to only one of the local interfaces and listens on the first free port starting
1235-
from `9009`.
1234+
* Each worker binds to only one of the local interfaces and listens on an ephemeral port number
1235+
assigned by the OS.
12361236
* `LocalManager`, used by `addprocs(N)`, by default binds only to the loopback interface. This means
12371237
that workers started later on remote hosts (or by anyone with malicious intentions) are unable
12381238
to connect to the cluster. An `addprocs(4)` followed by an `addprocs(["remote_host"])` will fail.
@@ -1250,8 +1250,9 @@ requirements for the inbuilt `LocalManager` and `SSHManager`:
12501250
authenticated via public key infrastructure (PKI). Authentication credentials can be supplied
12511251
via `sshflags`, for example ```sshflags=`-e <keyfile>` ```.
12521252
1253-
Note that worker-worker connections are still plain TCP and the local security policy on the remote
1254-
cluster must allow for free connections between worker nodes, at least for ports 9009 and above.
1253+
In an all-to-all topology (the default), all workers connect to each other via plain TCP sockets.
1254+
The security policy on the cluster nodes must thus ensure free connectivity between workers for
1255+
the ephemeral port range (varies by OS).
12551256
12561257
Securing and encrypting all worker-worker traffic (via SSH) or encrypting individual messages
12571258
can be done via a custom ClusterManager.

test/distributed_exec.jl

+36-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,41 @@ include("testenv.jl")
1212

1313
addprocs_with_testenv(4)
1414

15+
# Test that the client port is reused. SO_REUSEPORT may not be supported on
16+
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
17+
if is_unix()
18+
# Run the test on all processes.
19+
results = asyncmap(procs()) do p
20+
remotecall_fetch(p) do
21+
ports_lower = [] # ports of pids lower than myid()
22+
ports_higher = [] # ports of pids higher than myid()
23+
for w in Base.Distributed.PGRP.workers
24+
w.id == myid() && continue
25+
port = Base._sockname(w.r_stream, true)[2]
26+
if (w.id == 1)
27+
# master connects to workers
28+
push!(ports_higher, port)
29+
elseif w.id < myid()
30+
push!(ports_lower, port)
31+
elseif w.id > myid()
32+
push!(ports_higher, port)
33+
end
34+
end
35+
@assert (length(ports_lower) + length(ports_higher)) == nworkers()
36+
for portset in [ports_lower, ports_higher]
37+
if (length(portset) > 0) && (length(unique(portset)) != 1)
38+
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
39+
return 0
40+
end
41+
end
42+
return myid()
43+
end
44+
end
45+
46+
# Ensure that the code has indeed been successfully executed everywhere
47+
@test all(p -> p in results, procs())
48+
end
49+
1550
id_me = myid()
1651
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]
1752

@@ -923,7 +958,7 @@ if is_unix() # aka have ssh
923958
end
924959
end
925960

926-
remotecall_fetch(plst->rmprocs(plst; waitfor=5.0), 1, new_pids)
961+
remotecall_fetch(rmprocs, 1, new_pids)
927962
end
928963

929964
print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n")

test/socket.jl

+19-17
Original file line numberDiff line numberDiff line change
@@ -69,27 +69,29 @@ end
6969
# test show() function for UDPSocket()
7070
@test repr(UDPSocket()) == "UDPSocket(init)"
7171

72-
port = Channel(1)
7372
defaultport = rand(2000:4000)
74-
tsk = @async begin
75-
p, s = listenany(defaultport)
76-
put!(port, p)
77-
sock = accept(s)
78-
# test write call
79-
write(sock,"Hello World\n")
80-
81-
# test "locked" println to a socket
82-
@sync begin
83-
for i in 1:100
84-
@async println(sock, "a", 1)
73+
for testport in [0, defaultport]
74+
port = Channel(1)
75+
tsk = @async begin
76+
p, s = listenany(testport)
77+
put!(port, p)
78+
sock = accept(s)
79+
# test write call
80+
write(sock,"Hello World\n")
81+
82+
# test "locked" println to a socket
83+
@sync begin
84+
for i in 1:100
85+
@async println(sock, "a", 1)
86+
end
8587
end
88+
close(s)
89+
close(sock)
8690
end
87-
close(s)
88-
close(sock)
91+
wait(port)
92+
@test readstring(connect(fetch(port))) == "Hello World\n" * ("a1\n"^100)
93+
wait(tsk)
8994
end
90-
wait(port)
91-
@test readstring(connect(fetch(port))) == "Hello World\n" * ("a1\n"^100)
92-
wait(tsk)
9395

9496
mktempdir() do tmpdir
9597
socketname = is_windows() ? ("\\\\.\\pipe\\uv-test-" * randstring(6)) : joinpath(tmpdir, "socket")

0 commit comments

Comments
 (0)