Skip to content

Commit 6418be9

Browse files
authored
Merge pull request #35521 from JuliaLang/sf/udp_multicast
2 parents 44f3bb3 + e1d8da1 commit 6418be9

File tree

3 files changed

+97
-11
lines changed

3 files changed

+97
-11
lines changed

NEWS.md

+2
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,8 @@ Standard library changes
215215

216216

217217
#### Sockets
218+
* Joining and leaving UDP multicast groups on a `UDPSocket` is now supported
219+
through `join_multicast_group()` and `leave_multicast_group()` ([#35521]).
218220

219221
#### Distributed
220222
* `launch_on_machine` now supports and parses ipv6 square-bracket notation ([#34430])

stdlib/Sockets/src/Sockets.jl

+52
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ export
2222
recv,
2323
recvfrom,
2424
send,
25+
join_multicast_group,
26+
leave_multicast_group,
2527
TCPSocket,
2628
UDPSocket,
2729
@ip_str,
@@ -727,6 +729,56 @@ end
727729

728730
listenany(default_port) = listenany(localhost, default_port)
729731

732+
function udp_set_membership(sock::UDPSocket, group_addr::String,
733+
interface_addr::Union{Nothing, String}, operation)
734+
if interface_addr === nothing
735+
interface_addr = C_NULL
736+
end
737+
r = ccall(:uv_udp_set_membership, Cint,
738+
(Ptr{Cvoid}, Cstring, Cstring, Cint),
739+
sock.handle, group_addr, interface_addr, operation)
740+
uv_error("uv_udp_set_membership", r)
741+
return
742+
end
743+
744+
"""
745+
join_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
746+
747+
Join a socket to a particular multicast group defined by `group_addr`.
748+
If `interface_addr` is given, specifies a particular interface for multi-homed
749+
systems. Use `leave_multicast_group()` to disable reception of a group.
750+
"""
751+
function join_multicast_group(sock::UDPSocket, group_addr::String,
752+
interface_addr::Union{Nothing, String} = nothing)
753+
return udp_set_membership(sock, group_addr, interface_addr, 1)
754+
end
755+
function join_multicast_group(sock::UDPSocket, group_addr::IPAddr,
756+
interface_addr::Union{Nothing, IPAddr} = nothing)
757+
if interface_addr !== nothing
758+
interface_addr = string(interface_addr)
759+
end
760+
return join_multicast_group(sock, string(group_addr), interface_addr)
761+
end
762+
763+
"""
764+
leave_multicast_group(sock::UDPSocket, group_addr, interface_addr = nothing)
765+
766+
Remove a socket from a particular multicast group defined by `group_addr`.
767+
If `interface_addr` is given, specifies a particular interface for multi-homed
768+
systems. Use `join_multicast_group()` to enable reception of a group.
769+
"""
770+
function leave_multicast_group(sock::UDPSocket, group_addr::String,
771+
interface_addr::Union{Nothing, String} = nothing)
772+
return udp_set_membership(sock, group_addr, interface_addr, 0)
773+
end
774+
function leave_multicast_group(sock::UDPSocket, group_addr::IPAddr,
775+
interface_addr::Union{Nothing, IPAddr} = nothing)
776+
if interface_addr !== nothing
777+
interface_addr = string(interface_addr)
778+
end
779+
return leave_multicast_group(sock, string(group_addr), interface_addr)
780+
end
781+
730782
"""
731783
getsockname(sock::Union{TCPServer, TCPSocket}) -> (IPAddr, UInt16)
732784

stdlib/Sockets/test/runtests.jl

+43-11
Original file line numberDiff line numberDiff line change
@@ -391,17 +391,17 @@ end
391391

392392
@testset "Local-machine broadcast" begin
393393
let a, b, c
394-
# (Mac OS X's loopback interface doesn't support broadcasts)
394+
# Apple does not support broadcasting on 127.255.255.255
395395
bcastdst = Sys.isapple() ? ip"255.255.255.255" : ip"127.255.255.255"
396396

397-
function create_socket()
397+
function create_socket(addr::IPAddr, port)
398398
s = UDPSocket()
399-
bind(s, ip"0.0.0.0", 2000, reuseaddr = true, enable_broadcast = true)
400-
s
399+
bind(s, addr, port, reuseaddr = true, enable_broadcast = true)
400+
return s
401401
end
402402

403-
function wait_with_timeout(recvs)
404-
TIMEOUT_VAL = 3*1e9 # nanoseconds
403+
# Wait for futures to finish with a given timeout
404+
function wait_with_timeout(recvs, TIMEOUT_VAL = 3*1e9)
405405
t0 = time_ns()
406406
recvs_check = copy(recvs)
407407
while ((length(filter!(t->!istaskdone(t), recvs_check)) > 0)
@@ -412,23 +412,55 @@ end
412412
map(wait, recvs)
413413
end
414414

415-
a, b, c = [create_socket() for i = 1:3]
415+
# First, test IPv4 broadcast
416+
port = 2000
417+
a, b, c = [create_socket(ip"0.0.0.0", port) for i in 1:3]
416418
try
417-
# bsd family do not allow broadcasting to ip"255.255.255.255"
418-
# or ip"127.255.255.255"
419+
# bsd family do not allow broadcasting on loopbacks
419420
@static if !Sys.isbsd() || Sys.isapple()
420-
send(c, bcastdst, 2000, "hello")
421+
send(c, bcastdst, port, "hello")
421422
recvs = [@async @test String(recv(s)) == "hello" for s in (a, b)]
422423
wait_with_timeout(recvs)
423424
end
424425
catch e
425426
if isa(e, Base.IOError) && Base.uverrorname(e.code) == "EPERM"
426-
@warn "UDP broadcast test skipped (permission denied upon send, restrictive firewall?)"
427+
@warn "UDP IPv4 broadcast test skipped (permission denied upon send, restrictive firewall?)"
427428
else
428429
rethrow()
429430
end
430431
end
431432
[close(s) for s in [a, b, c]]
433+
434+
# Test ipv6 broadcast groups
435+
a, b, c = [create_socket(ip"::", port) for i in 1:3]
436+
try
437+
# Exemplary Interface-local ipv6 multicast group, if we wanted this to actually be routed
438+
# to other computers, we should use a link-local or larger address scope group
439+
# bsd family and darwin do not allow broadcasting on loopbacks
440+
@static if !Sys.isbsd() && !Sys.isapple()
441+
group = ip"ff11::6a75:6c69:61"
442+
join_multicast_group(a, group)
443+
join_multicast_group(b, group)
444+
445+
send(c, group, port, "hello")
446+
recvs = [@async @test String(recv(s)) == "hello" for s in (a, b)]
447+
wait_with_timeout(recvs)
448+
449+
leave_multicast_group(a, group)
450+
leave_multicast_group(b, group)
451+
452+
send(c, group, port, "hello")
453+
recvs = [@async @test String(recv(s)) == "hello" for s in (a, b)]
454+
# We only wait 200ms since we're pretty sure this is going to time out
455+
@test_throws ErrorException wait_with_timeout(recvs, 2e8)
456+
end
457+
catch e
458+
if isa(e, Base.IOError) && Base.uverrorname(e.code) == "EPERM"
459+
@warn "UDP IPv6 broadcast test skipped (permission denied upon send, restrictive firewall?)"
460+
else
461+
rethrow()
462+
end
463+
end
432464
end
433465
end
434466

0 commit comments

Comments
 (0)