Skip to content

Commit 30a6f8b

Browse files
committed
make WeakKeyDict finalizer usage gc-safe
also use this `client_refs.lock` to protect other data-structures from being interrupted by finalizers, in the multi.jl logic we may want to start indicating which mutable data-structures are safe to call from finalizers, since generally that isn't possible to make a finalizer API gc-safe, that code should observe the standard thread-safe restrictions (there's no guarantee of which thread it'll run on), plus, if the data-structures uses locks for synchronization, use the `islocked` pattern (demonstrated herein) in the `finalizer` to re-schedule the finalizer when the mutable data-structure is not available for mutation. this ensures that the lock cannot be acquired recursively, and furthermore, this pattern will continue to work if finalizers get moved to their own separate thread. close #14445 fix #16550 reverts workaround #14456 (shouldn't break #14295, due to new locks) should fix #16091 (with #17619)
1 parent d572a59 commit 30a6f8b

File tree

8 files changed

+180
-124
lines changed

8 files changed

+180
-124
lines changed

base/dict.jl

-10
Original file line numberDiff line numberDiff line change
@@ -309,16 +309,6 @@ get!(o::ObjectIdDict, key, default) = (o[key] = get(o, key, default))
309309

310310
abstract AbstractSerializer
311311

312-
# Serializer type needed as soon as ObjectIdDict is available
313-
type SerializationState{I<:IO} <: AbstractSerializer
314-
io::I
315-
counter::Int
316-
table::ObjectIdDict
317-
SerializationState(io::I) = new(io, 0, ObjectIdDict())
318-
end
319-
320-
SerializationState(io::IO) = SerializationState{typeof(io)}(io)
321-
322312
# dict
323313

324314
# These can be changed, to trade off better performance for space

base/lock.jl

+20
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,26 @@ function unlock(rl::ReentrantLock)
9595
return
9696
end
9797

98+
function lock(f, l)
99+
lock(l)
100+
try
101+
return f()
102+
finally
103+
unlock(l)
104+
end
105+
end
106+
107+
function trylock(f, l)
108+
if trylock(l)
109+
try
110+
return f()
111+
finally
112+
unlock(l)
113+
end
114+
end
115+
return false
116+
end
117+
98118
"""
99119
Semaphore(sem_size)
100120

base/multi.jl

+84-71
Original file line numberDiff line numberDiff line change
@@ -637,27 +637,38 @@ function deregister_worker(pg, pid)
637637
# delete this worker from our remote reference client sets
638638
ids = []
639639
tonotify = []
640-
for (id,rv) in pg.refs
641-
if in(pid,rv.clientset)
642-
push!(ids, id)
640+
lock(client_refs) do
641+
for (id,rv) in pg.refs
642+
if in(pid,rv.clientset)
643+
push!(ids, id)
644+
end
645+
if rv.waitingfor == pid
646+
push!(tonotify, (id,rv))
647+
end
643648
end
644-
if rv.waitingfor == pid
645-
push!(tonotify, (id,rv))
649+
for id in ids
650+
del_client(pg, id, pid)
646651
end
647-
end
648-
for id in ids
649-
del_client(pg, id, pid)
650-
end
651652

652-
# throw exception to tasks waiting for this pid
653-
for (id,rv) in tonotify
654-
notify_error(rv.c, ProcessExitedException())
655-
delete!(pg.refs, id)
653+
# throw exception to tasks waiting for this pid
654+
for (id,rv) in tonotify
655+
notify_error(rv.c, ProcessExitedException())
656+
delete!(pg.refs, id)
657+
end
656658
end
657659
end
658660

659661
## remote refs ##
660-
const client_refs = WeakKeyDict()
662+
663+
"""
664+
client_refs
665+
666+
Tracks whether a particular AbstractRemoteRef
667+
(identified by its RRID) exists on this worker.
668+
669+
The client_refs lock is also used to synchronize access to `.refs` and associated clientset state
670+
"""
671+
const client_refs = WeakKeyDict{Any, Void}() # used as a WeakKeySet
661672

662673
abstract AbstractRemoteRef
663674

@@ -680,34 +691,26 @@ type RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
680691
end
681692

682693
function test_existing_ref(r::AbstractRemoteRef)
683-
found = getkey(client_refs, r, false)
684-
if found !== false
685-
if client_refs[r] == true
686-
@assert r.where > 0
687-
if isa(r, Future) && isnull(found.v) && !isnull(r.v)
688-
# we have recd the value from another source, probably a deserialized ref, send a del_client message
689-
send_del_client(r)
690-
found.v = r.v
691-
end
692-
return found
693-
else
694-
# just delete the entry.
695-
delete!(client_refs, found)
696-
end
694+
found = getkey(client_refs, r, nothing)
695+
if found !== nothing
696+
@assert r.where > 0
697+
if isa(r, Future) && isnull(found.v) && !isnull(r.v)
698+
# we have recd the value from another source, probably a deserialized ref, send a del_client message
699+
send_del_client(r)
700+
found.v = r.v
701+
end
702+
return found::typeof(r)
697703
end
698704

699-
client_refs[r] = true
705+
client_refs[r] = nothing
700706
finalizer(r, finalize_ref)
701707
return r
702708
end
703709

704710
function finalize_ref(r::AbstractRemoteRef)
705-
if r.where > 0 # Handle the case of the finalizer having being called manually
706-
if haskey(client_refs, r)
707-
# NOTE: Change below line to deleting the entry once issue https://github.com/JuliaLang/julia/issues/14445
708-
# is fixed.
709-
client_refs[r] = false
710-
end
711+
if r.where > 0 # Handle the case of the finalizer having been called manually
712+
islocked(client_refs) && return finalizer(r, finalize_ref) # delay finalizer for later, when it's not already locked
713+
delete!(client_refs, r)
711714
if isa(r, RemoteChannel)
712715
send_del_client(r)
713716
else
@@ -717,7 +720,7 @@ function finalize_ref(r::AbstractRemoteRef)
717720
end
718721
r.where = 0
719722
end
720-
return r
723+
nothing
721724
end
722725

723726
Future(w::LocalProcess) = Future(w.id)
@@ -791,23 +794,27 @@ A low-level API which returns the backing `AbstractChannel` for an `id` returned
791794
The call is valid only on the node where the backing channel exists.
792795
"""
793796
function channel_from_id(id)
794-
rv = get(PGRP.refs, id, false)
797+
rv = lock(client_refs) do
798+
return get(PGRP.refs, id, false)
799+
end
795800
if rv === false
796801
throw(ErrorException("Local instance of remote reference not found"))
797802
end
798-
rv.c
803+
return rv.c
799804
end
800805

801806
lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(PGRP, rrid, f)
802807
function lookup_ref(pg, rrid, f)
803-
rv = get(pg.refs, rrid, false)
804-
if rv === false
805-
# first we've heard of this ref
806-
rv = RemoteValue(f())
807-
pg.refs[rrid] = rv
808-
push!(rv.clientset, rrid.whence)
809-
end
810-
rv
808+
return lock(client_refs) do
809+
rv = get(pg.refs, rrid, false)
810+
if rv === false
811+
# first we've heard of this ref
812+
rv = RemoteValue(f())
813+
pg.refs[rrid] = rv
814+
push!(rv.clientset, rrid.whence)
815+
end
816+
return rv
817+
end::RemoteValue
811818
end
812819

813820
"""
@@ -827,7 +834,7 @@ function isready(rr::Future)
827834
!isnull(rr.v) && return true
828835

829836
rid = remoteref_id(rr)
830-
if rr.where == myid()
837+
return if rr.where == myid()
831838
isready(lookup_ref(rid).c)
832839
else
833840
remotecall_fetch(rid->isready(lookup_ref(rid).c), rr.where, rid)
@@ -844,7 +851,7 @@ it can be safely used on a `Future` since they are assigned only once.
844851
"""
845852
function isready(rr::RemoteChannel, args...)
846853
rid = remoteref_id(rr)
847-
if rr.where == myid()
854+
return if rr.where == myid()
848855
isready(lookup_ref(rid).c, args...)
849856
else
850857
remotecall_fetch(rid->isready(lookup_ref(rid).c, args...), rr.where, rid)
@@ -855,11 +862,7 @@ del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid())
855862

856863
del_client(id, client) = del_client(PGRP, id, client)
857864
function del_client(pg, id, client)
858-
# As a workaround to issue https://github.com/JuliaLang/julia/issues/14445
859-
# the dict/set updates are executed asynchronously so that they do
860-
# not occur in the midst of a gc. The `@async` prefix must be removed once
861-
# 14445 is fixed.
862-
@async begin
865+
lock(client_refs) do
863866
rv = get(pg.refs, id, false)
864867
if rv !== false
865868
delete!(rv.clientset, client)
@@ -898,8 +901,10 @@ function send_del_client(rr)
898901
end
899902

900903
function add_client(id, client)
901-
rv = lookup_ref(id)
902-
push!(rv.clientset, client)
904+
lock(client_refs) do
905+
rv = lookup_ref(id)
906+
push!(rv.clientset, client)
907+
end
903908
nothing
904909
end
905910

@@ -999,19 +1004,21 @@ function run_work_thunk(thunk, print_error)
9991004
result = RemoteException(ce)
10001005
print_error && showerror(STDERR, ce)
10011006
end
1002-
result
1007+
return result
10031008
end
10041009
function run_work_thunk(rv::RemoteValue, thunk)
10051010
put!(rv, run_work_thunk(thunk, false))
10061011
nothing
10071012
end
10081013

10091014
function schedule_call(rid, thunk)
1010-
rv = RemoteValue(def_rv_channel())
1011-
(PGRP::ProcessGroup).refs[rid] = rv
1012-
push!(rv.clientset, rid.whence)
1013-
schedule(@task(run_work_thunk(rv,thunk)))
1014-
rv
1015+
return lock(client_refs) do
1016+
rv = RemoteValue(def_rv_channel())
1017+
(PGRP::ProcessGroup).refs[rid] = rv
1018+
push!(rv.clientset, rid.whence)
1019+
@schedule run_work_thunk(rv, thunk)
1020+
return rv
1021+
end
10151022
end
10161023

10171024
# make a thunk to call f on args in a way that simulates what would happen if
@@ -1026,13 +1033,13 @@ end
10261033
function remotecall(f, w::LocalProcess, args...; kwargs...)
10271034
rr = Future(w)
10281035
schedule_call(remoteref_id(rr), local_remotecall_thunk(f, args, kwargs))
1029-
rr
1036+
return rr
10301037
end
10311038

10321039
function remotecall(f, w::Worker, args...; kwargs...)
10331040
rr = Future(w)
10341041
send_msg(w, MsgHeader(remoteref_id(rr)), CallMsg{:call}(f, args, kwargs))
1035-
rr
1042+
return rr
10361043
end
10371044

10381045
"""
@@ -1046,7 +1053,7 @@ remotecall(f, id::Integer, args...; kwargs...) = remotecall(f, worker_from_id(id
10461053

10471054
function remotecall_fetch(f, w::LocalProcess, args...; kwargs...)
10481055
v=run_work_thunk(local_remotecall_thunk(f,args, kwargs), false)
1049-
isa(v, RemoteException) ? throw(v) : v
1056+
return isa(v, RemoteException) ? throw(v) : v
10501057
end
10511058

10521059
function remotecall_fetch(f, w::Worker, args...; kwargs...)
@@ -1057,8 +1064,10 @@ function remotecall_fetch(f, w::Worker, args...; kwargs...)
10571064
rv.waitingfor = w.id
10581065
send_msg(w, MsgHeader(RRID(0,0), oid), CallMsg{:call_fetch}(f, args, kwargs))
10591066
v = take!(rv)
1060-
delete!(PGRP.refs, oid)
1061-
isa(v, RemoteException) ? throw(v) : v
1067+
lock(client_refs) do
1068+
delete!(PGRP.refs, oid)
1069+
end
1070+
return isa(v, RemoteException) ? throw(v) : v
10621071
end
10631072

10641073
"""
@@ -1080,9 +1089,11 @@ function remotecall_wait(f, w::Worker, args...; kwargs...)
10801089
rr = Future(w)
10811090
send_msg(w, MsgHeader(remoteref_id(rr), prid), CallWaitMsg(f, args, kwargs))
10821091
v = fetch(rv.c)
1083-
delete!(PGRP.refs, prid)
1092+
lock(client_refs) do
1093+
delete!(PGRP.refs, prid)
1094+
end
10841095
isa(v, RemoteException) && throw(v)
1085-
rr
1096+
return rr
10861097
end
10871098

10881099
"""
@@ -1834,9 +1845,11 @@ function create_worker(manager, wconfig)
18341845

18351846
@schedule manage(w.manager, w.id, w.config, :register)
18361847
wait(rr_ntfy_join)
1837-
delete!(PGRP.refs, ntfy_oid)
1848+
lock(client_refs) do
1849+
delete!(PGRP.refs, ntfy_oid)
1850+
end
18381851

1839-
w.id
1852+
return w.id
18401853
end
18411854

18421855

@@ -1859,7 +1872,7 @@ function launch_additional(np::Integer, cmd::Cmd)
18591872
additional_io_objs[port] = io
18601873
end
18611874

1862-
addresses
1875+
return addresses
18631876
end
18641877

18651878
function redirect_output_from_additional_worker(pid, port)

base/precompile.jl

-1
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,6 @@ precompile(Base.get, (Base.Dict{Any, Any}, Tuple{Int64, Int64}, Bool))
444444
precompile(Base.LineEdit.refresh_multi_line, (Array{Any, 1}, Base.Terminals.TerminalBuffer, Base.Terminals.TTYTerminal, Base.IOBuffer, Base.LineEdit.InputAreaState, Base.LineEdit.PromptState))
445445
precompile(Base.schedule, (Array{Any, 1}, Task, Void))
446446
precompile(Base.LineEdit.match_input, (Function, Base.LineEdit.MIState, Base.Terminals.TTYTerminal, Array{Char, 1}, Base.Dict{Char, Any}))
447-
precompile(Base.weak_key_delete!, (Base.Dict{Any, Any}, Base.RemoteChannel))
448447
precompile(==, (Base.RemoteChannel, WeakRef))
449448
precompile(==, (Base.RemoteChannel, Base.RemoteChannel))
450449
precompile(Base.send_del_client, (Base.RemoteChannel,))

base/serialize.jl

+10-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,16 @@ import Base: GMP, Bottom, unsafe_convert, uncompressed_ast, datatype_pointerfree
66
import Core: svec
77
using Base: ViewIndex, index_lengths
88

9-
export serialize, deserialize
9+
export serialize, deserialize, SerializationState
10+
11+
type SerializationState{I<:IO} <: AbstractSerializer
12+
io::I
13+
counter::Int
14+
table::ObjectIdDict
15+
SerializationState(io::I) = new(io, 0, ObjectIdDict())
16+
end
17+
18+
SerializationState(io::IO) = SerializationState{typeof(io)}(io)
1019

1120
## serializing values ##
1221

base/sysimg.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ include("reshapedarray.jl")
115115
include("bitarray.jl")
116116
include("intset.jl")
117117
include("dict.jl")
118-
include("weakkeydict.jl")
119118
include("set.jl")
120119
include("iterator.jl")
121120

@@ -173,6 +172,7 @@ include("event.jl")
173172
include("task.jl")
174173
include("lock.jl")
175174
include("threads.jl")
175+
include("weakkeydict.jl")
176176

177177
# I/O
178178
include("stream.jl")

0 commit comments

Comments
 (0)