Skip to content

OSProc: Add locking around OSPROC_PROCESSOR_CACHE #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Dagger.jl
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ include("lib/util.jl")
include("utils/dagdebug.jl")

# Distributed data
include("utils/locked-object.jl")
include("options.jl")
include("processor.jl")
include("scopes.jl")
@@ -37,7 +38,6 @@ include("chunks.jl")
include("compute.jl")
include("utils/clock.jl")
include("utils/system_uuid.jl")
include("utils/locked-object.jl")
include("utils/caching.jl")
include("sch/Sch.jl"); using .Sch

19 changes: 12 additions & 7 deletions src/processor.jl
Original file line number Diff line number Diff line change
@@ -15,19 +15,19 @@ transfer data to/from other types of `Processor` at runtime.
abstract type Processor end

const PROCESSOR_CALLBACKS = Dict{Symbol,Any}()
const OSPROC_PROCESSOR_CACHE = Dict{Int,Set{Processor}}()
const OSPROC_PROCESSOR_CACHE = LockedObject(Dict{Int,Set{Processor}}())

add_processor_callback!(func, name::String) =
add_processor_callback!(func, Symbol(name))
function add_processor_callback!(func, name::Symbol)
Dagger.PROCESSOR_CALLBACKS[name] = func
delete!(OSPROC_PROCESSOR_CACHE, myid())
@safe_lock1 OSPROC_PROCESSOR_CACHE cache delete!(cache, myid())
end
delete_processor_callback!(name::String) =
delete_processor_callback!(Symbol(name))
function delete_processor_callback!(name::Symbol)
delete!(Dagger.PROCESSOR_CALLBACKS, name)
delete!(OSPROC_PROCESSOR_CACHE, myid())
@safe_lock1 OSPROC_PROCESSOR_CACHE cache delete!(cache, myid())
end

"""
@@ -106,14 +106,19 @@ computations.
struct OSProc <: Processor
pid::Int
function OSProc(pid::Int=myid())
get!(OSPROC_PROCESSOR_CACHE, pid) do
remotecall_fetch(get_processor_hierarchy, pid)
if !(@safe_lock1 OSPROC_PROCESSOR_CACHE cache haskey(cache, pid))
procs = remotecall_fetch(get_processor_hierarchy, pid)
@safe_lock1 OSPROC_PROCESSOR_CACHE cache begin
cache[pid] = procs
end
end
new(pid)
return new(pid)
end
end
get_parent(proc::OSProc) = proc
get_processors(proc::OSProc) = get(OSPROC_PROCESSOR_CACHE, proc.pid, Set{Processor}())
get_processors(proc::OSProc) = @safe_lock1 OSPROC_PROCESSOR_CACHE cache begin
get(cache, proc.pid, Set{Processor}())
end
children(proc::OSProc) = get_processors(proc)
function get_processor_hierarchy()
children = Set{Processor}()
17 changes: 16 additions & 1 deletion src/utils/locked-object.jl
Original file line number Diff line number Diff line change
@@ -14,11 +14,26 @@ function Base.lock(f, x::LockedObject)
unlock(x.lock)
end
end
Base.lock(x::LockedObject) = lock(x.lock)
Base.trylock(x::LockedObject) = trylock(x.lock)
Base.unlock(x::LockedObject) = unlock(x.lock)
payload(x::LockedObject) = x.payload

# TODO: Move this back to MemPool
# TODO: Move these back to MemPool
macro safe_lock1(l, o, ex)
quote
temp = $(esc(l))
lock(temp)
MemPool.enable_finalizers(false)
try
$(esc(o)) = $payload(temp)
$(esc(ex))
finally
unlock(temp)
MemPool.enable_finalizers(true)
end
end
end
# If we actually want to acquire a lock from a finalizer, we can't cause a task
# switch. As a NonReentrantLock can only be taken by another thread that should
# be running, and not a concurrent task we'd need to switch to, we can safely