Skip to content

Commit 47c7e2a

Browse files
committed
Split Storage into StorageResource/StorageDevice
Remove CHUNK_REDIRECTS Make get_processors return a Set Add get_storage_resources, get_storage_devices Remove capacity() Reintroduce capacity constraints for storage Remove dead MaxUtilization logic for now Block execution when sufficient CPURAMStorage is not available
1 parent 895181e commit 47c7e2a

File tree

6 files changed

+285
-202
lines changed

6 files changed

+285
-202
lines changed

Diff for: src/Dagger.jl

+9-3
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,25 @@ function __init__()
9090
ThreadProc(myid(), tid)
9191
end
9292
end
93-
add_storage_callback!("__cpu_ram__") do
93+
add_storage_resource_callback!("__cpu_ram__") do
9494
CPURAMStorage(myid())
9595
end
96+
add_storage_device_callback!("__cpu_ram__") do
97+
CPURAMDevice(myid())
98+
end
9699

97100
# Register filesystem storage
98101
for entry in getmounts()
99102
entry.mnt_dir == "/" || continue # FIXME: Allow other mounts
100-
add_storage_callback!("__fs_$(entry.mnt_fsname)__") do
103+
add_storage_resource_callback!("__fs_$(entry.mnt_fsname)__") do
104+
FilesystemStorage(entry.mnt_dir)
105+
end
106+
add_storage_device_callback!("__fs_$(entry.mnt_fsname)__") do
101107
parent = FilesystemStorage(entry.mnt_dir)
102108
path = joinpath(first(DEPOT_PATH), "dagger", ".serfscache")
103109
# FIXME: Properly check that path is under mnt_dir
104110
mkpath(path)
105-
SerializedFilesystemStorage(parent, path)
111+
SerializedFilesystemDevice(parent, path)
106112
end
107113
end
108114
end

Diff for: src/chunks.jl

+2-8
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ unrelease(c::Chunk) = c
8080
Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle
8181
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, x)
8282

83-
const CHUNK_REDIRECTS = Dict{Chunk,Chunk}()
84-
8583
collect_remote(chunk::Chunk) =
8684
move(chunk.processor, OSProc(), poolget(chunk.handle))
8785

@@ -101,13 +99,9 @@ collect(ctx::Context, ref::FileRef; options=nothing) =
10199
poolget(ref) # FIXME: Do move call
102100

103101
# Unwrap Chunk, DRef, and FileRef by default
104-
function move(from_proc::Processor, to_proc::Processor, x::Chunk)
105-
# Handle storage redirect
106-
x = haskey(CHUNK_REDIRECTS, x) ? move(CPURAMStorage(), CHUNK_REDIRECTS[x]) : x
107-
108-
# Move to `to_proc`
102+
move(from_proc::Processor, to_proc::Processor, x::Chunk) =
109103
move(from_proc, to_proc, x.handle)
110-
end
104+
111105
move(from_proc::Processor, to_proc::Processor, x::Union{DRef,FileRef}) =
112106
move(from_proc, to_proc, poolget(x))
113107

Diff for: src/processor.jl

+57-36
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ make it easy to transfer data to/from other types of `Processor` at runtime.
1313
abstract type Processor end
1414

1515
const PROCESSOR_CALLBACKS = Dict{Symbol,Any}()
16-
const OSPROC_PROCESSOR_CACHE = Dict{Int,Vector{Processor}}()
16+
const OSPROC_PROCESSOR_CACHE = Dict{Int,Set{Processor}}()
1717

1818
add_processor_callback!(func, name::String) =
1919
add_processor_callback!(func, Symbol(name))
@@ -56,18 +56,39 @@ iscompatible_arg(proc::Processor, opts, x) = false
5656
"""
5757
default_enabled(proc::Processor) -> Bool
5858
59-
Returns whether processor `proc` is enabled by default (opt-out). `Processor` subtypes can override this function to make themselves opt-in (default returns `false`).
59+
Returns whether processor `proc` is enabled by default. The default value is
60+
`false`, which is an opt-out of the processor from execution when not
61+
specifically requested by the user, and `true` implies opt-in, which causes the
62+
processor to always participate in execution when possible.
6063
"""
6164
default_enabled(proc::Processor) = false
6265

6366
"""
64-
get_processors(proc::Processor) -> Vector{T} where T<:Processor
67+
get_processors(proc::Processor) -> Set{<:Processor}
6568
66-
Returns the full list of processors contained in `proc`, if any. `Processor`
67-
subtypes should overload this function if they can contain sub-processors. The
68-
default method will return a `Vector` containing `proc` itself.
69+
Returns the set of processors contained in `proc`, if any. `Processor` subtypes
70+
should overload this function if they can contain sub-processors. The default
71+
method will return a `Vector` containing `proc` itself.
6972
"""
70-
get_processors(proc::Processor) = Processor[proc]
73+
get_processors(proc::Processor) = Set{Processor}(proc)
74+
75+
"""
76+
get_storage_resources(proc::Processor) -> Set{<:StorageResource}
77+
78+
Returns the full set of storage resources accessible to `proc`, if any.
79+
`Processor` subtypes should overload this function if they have direct access
80+
to any storage resources. The default method will return an empty set.
81+
"""
82+
get_storage_resources(proc::Processor) = Set{StorageResource}()
83+
84+
"""
85+
get_storage_devices(proc::Processor) -> Set{<:StorageDevice}
86+
87+
Returns the full set of storage devices accessible to `proc`, if any.
88+
`Processor` subtypes should overload this function if they have direct access
89+
to any storage devices. The default method will return an empty set.
90+
"""
91+
get_storage_devices(proc::Processor) = Set{StorageDevice}()
7192

7293
"""
7394
get_parent(proc::Processor) -> Processor
@@ -91,6 +112,7 @@ data movement should provide implementations where `x::Chunk`.
91112
"""
92113
move(from_proc::Processor, to_proc::Processor, x) = x
93114

115+
#=
94116
"""
95117
capacity(proc::Processor=OSProc()) -> Int
96118
@@ -99,6 +121,7 @@ Returns the total processing capacity of `proc`.
99121
capacity(proc=OSProc()) = length(get_processors(proc))
100122
capacity(proc, ::Type{T}) where T =
101123
length(filter(x->x isa T, get_processors(proc)))
124+
=#
102125

103126
"""
104127
OSProc <: Processor
@@ -111,37 +134,34 @@ struct OSProc <: Processor
111134
pid::Int
112135
function OSProc(pid::Int=myid())
113136
get!(OSPROC_PROCESSOR_CACHE, pid) do
114-
remotecall_fetch(get_processor_hierarchy, pid)
137+
remotecall_fetch(detect_hierarchy, pid, :processor)
115138
end
116-
get!(OSPROC_STORAGE_CACHE, pid) do
117-
remotecall_fetch(get_storage_hierarchy, pid)
139+
get!(OSPROC_STORAGE_RESOURCE_CACHE, pid) do
140+
remotecall_fetch(detect_hierarchy, pid, :storage_resource)
141+
end
142+
get!(OSPROC_STORAGE_DEVICE_CACHE, pid) do
143+
remotecall_fetch(detect_hierarchy, pid, :storage_device)
118144
end
119145
new(pid)
120146
end
121147
end
122148
get_parent(proc::OSProc) = proc
123-
children(proc::OSProc) = get(OSPROC_PROCESSOR_CACHE, proc.pid, Processor[])
124-
function get_processor_hierarchy()
125-
children = Processor[]
126-
for name in keys(PROCESSOR_CALLBACKS)
127-
cb = PROCESSOR_CALLBACKS[name]
128-
try
129-
child = Base.invokelatest(cb)
130-
if (child isa Tuple) || (child isa Vector)
131-
append!(children, child)
132-
elseif child !== nothing
133-
push!(children, child)
134-
end
135-
catch err
136-
@error "Error in processor callback: $name" exception=(err,catch_backtrace())
137-
end
149+
get_processors(proc::OSProc) = get(OSPROC_PROCESSOR_CACHE, proc.pid, Set{Processor}())
150+
get_storage_resources(proc::OSProc) = get(OSPROC_STORAGE_RESOURCE_CACHE, proc.pid, Set{StorageResource}())
151+
get_storage_devices(proc::OSProc) = get(OSPROC_STORAGE_DEVICE_CACHE, proc.pid, Set{StorageDevice}())
152+
children(proc::OSProc) = get_processors(proc)
153+
function detect_hierarchy(kind::Symbol)
154+
cb_dict, children = if kind == :processor
155+
PROCESSOR_CALLBACKS, Set{Processor}()
156+
elseif kind == :storage_resource
157+
STORAGE_RESOURCE_CALLBACKS, Set{StorageResource}()
158+
elseif kind == :storage_device
159+
STORAGE_DEVICE_CALLBACKS, Set{StorageDevice}()
160+
else
161+
throw(ArgumentError("Invalid hierarchy kind: $kind"))
138162
end
139-
children
140-
end
141-
function get_storage_hierarchy()
142-
children = Processor[]
143-
for name in keys(STORAGE_CALLBACKS)
144-
cb = STORAGE_CALLBACKS[name]
163+
for name in keys(cb_dict)
164+
cb = cb_dict[name]
145165
try
146166
child = Base.invokelatest(cb)
147167
if (child isa Tuple) || (child isa Vector)
@@ -150,7 +170,7 @@ function get_storage_hierarchy()
150170
push!(children, child)
151171
end
152172
catch err
153-
@error "Error in storage callback: $name" exception=(err,catch_backtrace())
173+
@error "Error in $(String(kind)) callback: $name" exception=(err,catch_backtrace())
154174
end
155175
end
156176
children
@@ -164,8 +184,6 @@ iscompatible_arg(proc::OSProc, opts, args...) =
164184
any(child->
165185
all(arg->iscompatible_arg(child, opts, arg), args),
166186
children(proc))
167-
get_processors(proc::OSProc) =
168-
vcat((get_processors(child) for child in children(proc))...)
169187

170188
"""
171189
ThreadProc <: Processor
@@ -229,6 +247,7 @@ else
229247
end
230248
get_parent(proc::ThreadProc) = OSProc(proc.owner)
231249
default_enabled(proc::ThreadProc) = true
250+
storage_resource(proc::ThreadProc) = CPURAMStorage(proc.owner)
232251

233252
# TODO: ThreadGroupProc?
234253

@@ -340,7 +359,8 @@ get_tls() = (
340359
sch_uid=task_local_storage(:_dagger_sch_uid),
341360
sch_handle=task_local_storage(:_dagger_sch_handle),
342361
processor=thunk_processor(),
343-
utilization=task_local_storage(:_dagger_utilization),
362+
time_utilization=task_local_storage(:_dagger_time_utilization),
363+
alloc_utilization=task_local_storage(:_dagger_alloc_utilization),
344364
)
345365

346366
"""
@@ -352,5 +372,6 @@ function set_tls!(tls)
352372
task_local_storage(:_dagger_sch_uid, tls.sch_uid)
353373
task_local_storage(:_dagger_sch_handle, tls.sch_handle)
354374
task_local_storage(:_dagger_processor, tls.processor)
355-
task_local_storage(:_dagger_utilization, tls.utilization)
375+
task_local_storage(:_dagger_time_utilization, tls.time_utilization)
376+
task_local_storage(:_dagger_alloc_utilization, tls.alloc_utilization)
356377
end

0 commit comments

Comments
 (0)