Skip to content

Commit 557352f

Browse files
committed
Fix reuse of client port on Linux. Implement for OSX. [ci skip]
1 parent 7ea1620 commit 557352f

File tree

3 files changed

+71
-12
lines changed

3 files changed

+71
-12
lines changed

base/distributed/managers.jl

+23-9
Original file line numberDiff line numberDiff line change
@@ -455,31 +455,45 @@ end
455455
const client_port = Ref{Cushort}(0)
456456

457457
function socket_reuse_port()
458-
s = TCPSocket()
458+
s = TCPSocket(delay = false)
459459
client_host = Ref{Cuint}(0)
460-
ccall(:jl_tcp_bind, Int32,
461-
(Ptr{Void}, UInt16, UInt32, Cuint),
462-
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))
463-
464-
# TODO: Support OSX and change the above code to call setsockopt before bind once libuv provides
465-
# early access to a socket fd, i.e., before a bind call.
466460

467-
@static if is_linux()
461+
@static if is_linux() || is_apple()
468462
try
469463
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
470464
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
471465
return s
472466
elseif rc < 0
473467
throw(SystemError("setsockopt() SO_REUSEPORT : "))
474468
end
475-
getsockname(s)
476469
catch e
477470
# This is an issue only on systems with lots of client connections, hence delay the warning
478471
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to plain socket : ", e)
479472
# provide a clean new socket
480473
return TCPSocket()
481474
end
482475
end
476+
477+
ccall(:jl_tcp_bind, Int32,
478+
(Ptr{Void}, UInt16, UInt32, Cuint),
479+
s.handle, hton(client_port[]), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))
480+
481+
rport = Ref{Cushort}(0)
482+
raddress = zeros(UInt8, 16)
483+
rfamily = Ref{Cuint}(0)
484+
485+
r = ccall(:jl_tcp_getsockname, Int32,
486+
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
487+
s.handle, rport, raddress, rfamily)
488+
Base.uv_error("cannot obtain socket name", r)
489+
if r == 0
490+
port = ntoh(rport[])
491+
else
492+
error("cannot obtain socket name")
493+
end
494+
495+
client_port[] = port
496+
483497
return s
484498
end
485499

base/socket.jl

+10-3
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,17 @@ mutable struct TCPSocket <: LibuvStream
282282
return tcp
283283
end
284284
end
285-
function TCPSocket()
285+
function TCPSocket(; delay=true) # kw arg "delay": if true, libuv delays creation of the socket
286+
# fd till the first bind call
287+
286288
tcp = TCPSocket(Libc.malloc(_sizeof_uv_tcp), StatusUninit)
287-
err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}),
288-
eventloop(), tcp.handle)
289+
if delay
290+
err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}),
291+
eventloop(), tcp.handle)
292+
else
293+
err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Void}, Ptr{Void}, Cuint),
294+
eventloop(), tcp.handle, 2) # AF_INET is 2
295+
end
289296
uv_error("failed to create tcp socket", err)
290297
tcp.status = StatusInit
291298
return tcp

test/distributed_exec.jl

+38
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,44 @@ include("testenv.jl")
1212

1313
addprocs_with_testenv(4)
1414

15+
# Test that the client port is reused. SO_REUSEPORT may not be supported on
16+
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
17+
if is_unix()
18+
# Run the test on all workers.
19+
results = asyncmap(procs()) do p
20+
remotecall_fetch(p) do
21+
client_ports = []
22+
for w in Base.Distributed.PGRP.workers
23+
if isa(w, Base.Distributed.Worker)
24+
s = w.r_stream
25+
rport = Ref{Cushort}(0)
26+
raddress = zeros(UInt8, 16)
27+
rfamily = Ref{Cuint}(0)
28+
29+
r = ccall(:jl_tcp_getsockname, Int32,
30+
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
31+
s.handle, rport, raddress, rfamily)
32+
Base.uv_error("cannot obtain socket name", r)
33+
if r == 0
34+
push!(client_ports, ntoh(rport[]))
35+
else
36+
error("cannot obtain socket name")
37+
end
38+
end
39+
end
40+
@assert length(client_ports) == nworkers()
41+
if !all(i -> i == client_ports[1], client_ports[2:end])
42+
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
43+
return 0
44+
end
45+
return myid()
46+
end
47+
end
48+
49+
# Ensure that the code has indeed been executed on all processes
50+
@test all(p -> p in results, procs())
51+
end
52+
1553
id_me = myid()
1654
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]
1755

0 commit comments

Comments
 (0)