Skip to content

Commit 2fdaa89

Browse files
authored
Merge pull request #271 from JuliaParallel/jps/function-chunks
Wrap thunk functions in Chunk
2 parents ff601e8 + f7ac7a7 commit 2fdaa89

13 files changed

+235
-39
lines changed

Diff for: Project.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "Dagger"
22
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
3-
version = "0.13.2"
3+
version = "0.13.3"
44

55
[deps]
66
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"

Diff for: docs/src/processors.md

+43-1
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,56 @@
11
# Processors
22

33
Dagger contains a flexible mechanism to represent CPUs, GPUs, and other
4-
devices that the scheduler can place user work on. The indiviual devices that
4+
devices that the scheduler can place user work on. The individual devices that
55
are capable of computing a user operation are called "processors", and are
66
subtypes of `Dagger.Processor`. Processors are automatically detected by
77
Dagger at scheduler initialization, and placed in a hierarchy reflecting the
88
physical (network-, link-, or memory-based) boundaries between processors in
99
the hierarchy. The scheduler uses the information in this hierarchy to
1010
efficiently schedule and partition user operations.
1111

12+
Dagger's `Chunk` objects can have a processor associated with them that defines
13+
where the contained data "resides". Each processor has a set of functions that
14+
define the mechanisms and rules by which the data can be transferred between
15+
similar or different kinds of processors, and will be called by Dagger's
16+
scheduler automatically when fetching function arguments (or the function
17+
itself) for computation on a given processor.
18+
19+
Setting the processor on a function argument is done by wrapping it in a
20+
`Chunk` with `Dagger.tochunk`:
21+
22+
```julia
23+
a = 1
24+
b = 2
25+
# Let's say `b` "resides" on the second thread of the first worker:
26+
b_chunk = Dagger.tochunk(b, Dagger.ThreadProc(1, 2))::Dagger.Chunk
27+
c = Dagger.@spawn a + b_chunk
28+
fetch(c) == 3
29+
```
30+
31+
It's also simple to set the processor of the function being passed; it will be
32+
automatically wrapped in a `Chunk` if necessary:
33+
34+
```julia
35+
# `+` is treated as existing on the second thread of the first worker:
36+
Dagger.@spawn processor=Dagger.ThreadProc(1, 2) a + b
37+
```
38+
39+
You can also tell Dagger about the processor type for the returned value of a
40+
task by making it a `Chunk`:
41+
42+
```julia
43+
Dagger.spawn(a) do a
44+
c = a + 1
45+
return Dagger.tochunk(c, Dagger.ThreadProc(1, 2))
46+
end
47+
```
48+
49+
Note that unless you know that your function, arguments, or return value are
50+
associated with a specific processor, you don't need to assign one to them.
51+
Dagger will treat them as being simple values with no processor association,
52+
and will serialize them to wherever they're used.
53+
1254
## Hardware capabilities, topology, and data locality
1355

1456
The processor hierarchy is modeled as a multi-root tree, where each root is an

Diff for: docs/src/scopes.md

+20-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ considered valid.
1313

1414
## Scope Basics
1515

16-
Let's take the example of a webcam handle generated by VideoIO.jl. This handle is a C pointer, and thus has process scope. We can open the handle on a given process, and set the scope of the resulting data to a `ProcessScope()`, which defaults to the current Julia process:
16+
Let's take the example of a webcam handle generated by VideoIO.jl. This handle
17+
is a C pointer, and thus has process scope. We can open the handle on a given
18+
process, and set the scope of the resulting data to a `ProcessScope()`, which
19+
defaults to the current Julia process:
1720

1821
```julia
1922
using VideoIO
@@ -41,6 +44,22 @@ the `cam_handle` task was executed on. Of course, the resulting camera frame is
4144
*not* scoped to anywhere specific (denoted as `AnyScope()`), and thus
4245
computations on it may execute anywhere.
4346

47+
You may also encounter situations where you want to use a callable struct (such
48+
as a closure, or a Flux.jl layer) only within a certain scope; you can specify
49+
the scope of the function pretty easily:
50+
51+
```julia
52+
using Flux
53+
m = Chain(...)
54+
# If `m` is only safe to transfer to and execute on this process,
55+
# we can set a `ProcessScope` on it:
56+
result = Dagger.@spawn scope=ProcessScope() m(rand(8,8))
57+
```
58+
59+
Setting a scope on the function treats it as a regular piece of data (like the
60+
arguments to the function), so it participates in the scoping rules described
61+
in the following sections all the same.
62+
4463
Now, let's try out some other kinds of scopes, starting with `NodeScope`. This
4564
scope encompasses the server that one or more Julia processes may be running
4665
on. Say we want to use memory mapping (mmap) to more efficiently send arrays

Diff for: src/Dagger.jl

+3-5
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,10 @@ function __init__()
7979
include("ui/video.jl")
8080
end
8181
end
82-
@static if VERSION >= v"1.3.0-DEV.573"
83-
for tid in 1:Threads.nthreads()
84-
push!(PROCESSOR_CALLBACKS, ()->ThreadProc(myid(), tid))
82+
for tid in 1:Threads.nthreads()
83+
add_processor_callback!("__cpu_thread_$(tid)__") do
84+
ThreadProc(myid(), tid)
8585
end
86-
else
87-
push!(PROCESSOR_CALLBACKS, ()->ThreadProc(myid(), 1))
8886
end
8987
end
9088

Diff for: src/processor.jl

+14-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ make it easy to transfer data to/from other types of `Processor` at runtime.
1212
"""
1313
abstract type Processor end
1414

15-
const PROCESSOR_CALLBACKS = []
15+
const PROCESSOR_CALLBACKS = Dict{Symbol,Any}()
1616

1717
"""
1818
execute!(proc::Processor, f, args...) -> Any
@@ -104,7 +104,8 @@ const OSPROC_CACHE = Dict{Int,Vector{Processor}}()
104104
children(proc::OSProc) = OSPROC_CACHE[proc.pid]
105105
function get_proc_hierarchy()
106106
children = Processor[]
107-
for cb in PROCESSOR_CALLBACKS
107+
for name in keys(PROCESSOR_CALLBACKS)
108+
cb = PROCESSOR_CALLBACKS[name]
108109
try
109110
child = Base.invokelatest(cb)
110111
if (child isa Tuple) || (child isa Vector)
@@ -113,13 +114,21 @@ function get_proc_hierarchy()
113114
push!(children, child)
114115
end
115116
catch err
116-
@error "Error in processor callback" exception=(err,catch_backtrace())
117+
@error "Error in processor callback: $name" exception=(err,catch_backtrace())
117118
end
118119
end
119120
children
120121
end
121-
function add_callback!(func)
122-
push!(Dagger.PROCESSOR_CALLBACKS, func)
122+
add_processor_callback!(func, name::String) =
123+
add_processor_callback!(func, Symbol(name))
124+
function add_processor_callback!(func, name::Symbol)
125+
Dagger.PROCESSOR_CALLBACKS[name] = func
126+
empty!(OSPROC_CACHE)
127+
end
128+
delete_processor_callback!(name::String) =
129+
delete_processor_callback!(Symbol(name))
130+
function delete_processor_callback!(name::Symbol)
131+
delete!(Dagger.PROCESSOR_CALLBACKS, name)
123132
empty!(OSPROC_CACHE)
124133
end
125134
Base.:(==)(proc1::OSProc, proc2::OSProc) = proc1.pid == proc2.pid

Diff for: src/sch/Sch.jl

+19-7
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,11 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
589589
sig = signature(task, state)
590590

591591
# Calculate scope
592-
scope = AnyScope()
592+
scope = if task.f isa Chunk
593+
task.f.scope
594+
else
595+
AnyScope()
596+
end
593597
for input in unwrap_weak_checked.(task.inputs)
594598
chunk = if istask(input)
595599
state.cache[input]
@@ -763,6 +767,10 @@ function finish_task!(ctx, state, node, thunk_failed)
763767
fill_registered_futures!(state, node, thunk_failed)
764768

765769
to_evict = cleanup_inputs!(state, node)
770+
if node.f isa Chunk
771+
# FIXME: Check the graph for matching chunks
772+
push!(to_evict, node.f)
773+
end
766774
if haskey(state.waiting_data, node) && isempty(state.waiting_data[node])
767775
delete!(state.waiting_data, node)
768776
end
@@ -826,13 +834,15 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
826834
end
827835
end
828836

829-
ids = map(enumerate(thunk.inputs)) do (idx,x)
837+
ids = convert(Vector{Int}, map(enumerate(thunk.inputs)) do (idx,x)
830838
istask(x) ? unwrap_weak_checked(x).id : -idx
831-
end
839+
end)
840+
pushfirst!(ids, 0)
832841

833-
data = map(thunk.inputs) do x
842+
data = convert(Vector{Any}, map(Any[thunk.inputs...]) do x
834843
istask(x) ? state.cache[unwrap_weak_checked(x)] : x
835-
end
844+
end)
845+
pushfirst!(data, thunk.f)
836846
toptions = thunk.options !== nothing ? thunk.options : ThunkOptions()
837847
options = merge(ctx.options, toptions)
838848
@assert (options.single == 0) || (gproc.pid == options.single)
@@ -841,7 +851,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
841851
state.worker_pressure[gproc.pid][typeof(proc)] += util
842852

843853
# TODO: De-dup common fields (log_sink, uid, etc.)
844-
push!(to_send, (util, thunk.id, thunk.f, data, thunk.get_result,
854+
push!(to_send, (util, thunk.id, fn_type(thunk.f), data, thunk.get_result,
845855
thunk.persist, thunk.cache, thunk.meta, options, ids,
846856
(log_sink=ctx.log_sink, profile=ctx.profile),
847857
sch_handle, state.uid))
@@ -894,11 +904,12 @@ function do_tasks(to_proc, chan, tasks)
894904
end
895905
end
896906
"Executes a single task on `to_proc`."
897-
function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, cache, meta, options, ids, ctx_vars, sch_handle, uid)
907+
function do_task(to_proc, extra_util, thunk_id, Tf, data, send_result, persist, cache, meta, options, ids, ctx_vars, sch_handle, uid)
898908
ctx = Context(Processor[]; log_sink=ctx_vars.log_sink, profile=ctx_vars.profile)
899909

900910
from_proc = OSProc()
901911
Tdata = map(x->x isa Chunk ? chunktype(x) : x, data)
912+
f = isdefined(Tf, :instance) ? Tf.instance : nothing
902913

903914
# Fetch inputs
904915
fetched = if meta
@@ -929,6 +940,7 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c
929940
end
930941
end)
931942
end
943+
f = popfirst!(fetched)
932944

933945
# Check if we'll go over capacity from running this thunk
934946
real_util = lock(TASK_SYNC) do

Diff for: src/sch/eager.jl

+3
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,6 @@ function eager_cleanup(state, uid)
110110
delete!(state.thunk_dict, tid)
111111
end
112112
end
113+
114+
_find_thunk(e::Dagger.EagerThunk) =
115+
unwrap_weak_checked(EAGER_STATE[].thunk_dict[EAGER_ID_MAP[e.uid]])

Diff for: src/sch/util.jl

+3-1
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,11 @@ function fetch_report(task)
203203
end
204204
end
205205

206+
fn_type(x::Chunk) = x.chunktype
207+
fn_type(x) = typeof(x)
206208
function signature(task::Thunk, state)
207209
inputs = map(x->istask(x) ? state.cache[x] : x, unwrap_weak_checked.(task.inputs))
208-
Tuple{typeof(task.f), map(x->x isa Chunk ? x.chunktype : typeof(x), inputs)...}
210+
Tuple{fn_type(task.f), map(x->x isa Chunk ? x.chunktype : typeof(x), inputs)...}
209211
end
210212

211213
function report_catch_error(err, desc=nothing)

Diff for: src/scopes.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ function ProcessScope(wid)
3232
ProcessScope(NodeScope(system_uuid(wid)), wid)
3333
end
3434
end
35-
ProcessScope() = ProcessScope(myid)
35+
ProcessScope() = ProcessScope(myid())
3636

3737
"Scoped to a specific processor."
3838
struct ExactScope <: AbstractScope

Diff for: src/thunk.jl

+54-13
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,10 @@ created through a call to `delayed` or its macro equivalent `@par`.
1111
1212
## Constructors
1313
```julia
14-
delayed(f; options=nothing)(args...)
14+
delayed(f; kwargs...)(args...)
1515
@par [option=value]... f(args...)
1616
```
1717
18-
## Arguments
19-
- `f`: The function to be called upon execution of the `Thunk`.
20-
- `args`: The arguments to be passed to the `Thunk`.
21-
- `options`: A `Sch.ThunkOptions` struct providing the options for the `Thunk`,
22-
or `nothing`.
23-
- `option=value`: The same options available in `Sch.ThunkOptions`.
24-
2518
## Examples
2619
```julia
2720
julia> t = delayed(sin)(π) # creates a Thunk to be computed later
@@ -30,6 +23,30 @@ Thunk(sin, (π,))
3023
julia> collect(t) # computes the result and returns it to the current process
3124
1.2246467991473532e-16
3225
```
26+
27+
## Arguments
28+
- `f`: The function to be called upon execution of the `Thunk`.
29+
- `args`: The arguments to be passed to the `Thunk`.
30+
- `kwargs`: The properties describing unique behavior of this `Thunk`. Details
31+
for each property are described in the next section.
32+
- `option=value`: The same as passing `kwargs` to `delayed`.
33+
34+
## Public Properties
35+
- `meta::Bool=false`: If `true`, instead of fetching cached arguments from
36+
`Chunk`s and passing the raw arguments to `f`, instead pass the `Chunk`. Useful
37+
for doing manual fetching or manipulation of `Chunk` references. Non-`Chunk`
38+
arguments are still passed as-is.
39+
- `processor::Processor=OSProc()` - The processor associated with `f`. Useful if
40+
`f` is a callable struct that exists on a given processor and should be
41+
transferred appropriately.
42+
- `scope::Dagger.AbstractScope=AnyScope()` - The scope associated with `f`.
43+
Useful if `f` is a function or callable struct that may only be transferred to,
44+
and executed within, the specified scope.
45+
46+
## Options
47+
- `options`: A `Sch.ThunkOptions` struct providing the options for the `Thunk`.
48+
If omitted, options can also be specified by passing key-value pairs as
49+
`kwargs`.
3350
"""
3451
mutable struct Thunk
3552
f::Any # usually a Function, but could be any callable
@@ -53,9 +70,16 @@ mutable struct Thunk
5370
cache_ref=nothing,
5471
affinity=nothing,
5572
eager_ref=nothing,
73+
processor=nothing,
74+
scope=nothing,
5675
options=nothing,
5776
kwargs...
5877
)
78+
if !isnothing(processor) || !isnothing(scope)
79+
f = tochunk(f,
80+
something(processor, OSProc()),
81+
something(scope, AnyScope()))
82+
end
5983
if options !== nothing
6084
@assert isempty(kwargs)
6185
new(f, xs, id, get_result, meta, persist, cache, cache_ref,
@@ -103,11 +127,10 @@ function affinity(t::Thunk)
103127
end
104128

105129
"""
106-
delayed(f; options)(args...)
130+
delayed(f; kwargs...)(args...)
107131
108132
Creates a [`Thunk`](@ref) object which can be executed later, which will call
109-
`f` with `args`. Options are typically either `nothing` or of type
110-
`Sch.ThunkOptions`.
133+
`f` with `args`. `kwargs` controls various properties of the resulting `Thunk`.
111134
"""
112135
function delayed(f; kwargs...)
113136
(args...) -> Thunk(f, args...; kwargs...)
@@ -218,8 +241,16 @@ end
218241
219242
Spawns a task with `f` as the function and `args` as the arguments, returning
220243
an `EagerThunk`. Uses a scheduler running in the background to execute code.
244+
245+
Note that `kwargs` are passed to the `Thunk` constructor, and are documented in
246+
its docstring.
221247
"""
222-
function spawn(f, args...; kwargs...)
248+
function spawn(f, args...; processor=nothing, scope=nothing, kwargs...)
249+
if !isnothing(processor) || !isnothing(scope)
250+
f = tochunk(f,
251+
something(processor, OSProc()),
252+
something(scope, AnyScope()))
253+
end
223254
uid, future, finalizer_ref, thunk_ref = if myid() == 1
224255
_spawn(f, args...; kwargs...)
225256
else
@@ -316,7 +347,17 @@ Base.isequal(x::Thunk, y::Thunk) = x.id==y.id
316347

317348
function Base.show(io::IO, z::Thunk)
318349
lvl = get(io, :lazy_level, 2)
319-
print(io, "Thunk[$(z.id)]($(z.f), ")
350+
f = if z.f isa Chunk
351+
Tf = z.f.chunktype
352+
if isdefined(Tf, :instance)
353+
Tf.instance
354+
else
355+
"instance of $Tf"
356+
end
357+
else
358+
z.f
359+
end
360+
print(io, "Thunk[$(z.id)]($f, ")
320361
if lvl > 0
321362
show(IOContext(io, :lazy_level => lvl-1), z.inputs)
322363
else

Diff for: test/processors.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ end
5454
@testset "Add callback in same world" begin
5555
function addcb()
5656
cb = @eval ()->FakeProc(myid())
57-
@everywhere Dagger.add_callback!($cb)
57+
@everywhere Dagger.add_processor_callback!($cb, :fakeproc)
5858
@test any(x->x isa FakeProc, Dagger.children(OSProc()))
59-
@everywhere pop!(Dagger.PROCESSOR_CALLBACKS); empty!(Dagger.OSPROC_CACHE)
59+
@everywhere Dagger.delete_processor_callback!(:fakeproc)
6060
end
6161
addcb()
6262
end

0 commit comments

Comments
 (0)