Skip to content

Commit 66ba6a9

Browse files
authored
Merge pull request #518 from JuliaParallel/jps/dtask
Rename EagerThunk to DTask
2 parents 0f1db98 + c1b0b36 commit 66ba6a9

24 files changed

+195
-171
lines changed

Diff for: docs/src/api-dagger/functions.md

-12
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ Pages = ["functions.md"]
1111
```@docs
1212
@spawn
1313
spawn
14-
delayed
15-
@par
1614
```
1715

1816
## Task Options Functions/Macros
@@ -38,16 +36,6 @@ scope
3836
constrain
3937
```
4038

41-
## Lazy Task Functions
42-
```@docs
43-
domain
44-
compute
45-
dependents
46-
noffspring
47-
order
48-
treereduce
49-
```
50-
5139
## Processor Functions
5240
```@docs
5341
execute!

Diff for: docs/src/api-dagger/types.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Pages = ["types.md"]
1010
## Task Types
1111
```@docs
1212
Thunk
13-
EagerThunk
13+
DTask
1414
```
1515

1616
## Task Options Types

Diff for: docs/src/darray.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,8 @@ Now, `DZ` will contain the result of computing `(DX .+ DX) .* 3`.
353353
```
354354
julia> Dagger.chunks(DZ)
355355
2×2 Matrix{Any}:
356-
EagerThunk (finished) EagerThunk (finished)
357-
EagerThunk (finished) EagerThunk (finished)
356+
DTask (finished) DTask (finished)
357+
DTask (finished) DTask (finished)
358358
359359
julia> Dagger.chunks(fetch(DZ))
360360
2×2 Matrix{Union{Thunk, Dagger.Chunk}}:
@@ -363,7 +363,7 @@ julia> Dagger.chunks(fetch(DZ))
363363
```
364364

365365
Here we can see the `DArray`'s internal representation of the partitions, which
366-
are stored as either `EagerThunk` objects (representing an ongoing or completed
366+
are stored as either `DTask` objects (representing an ongoing or completed
367367
computation) or `Chunk` objects (which reference data which exist locally or on
368368
other Julia workers). Of course, one doesn't typically need to worry about
369369
these internal details unless implementing low-level operations on `DArray`s.

Diff for: docs/src/task-queues.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ queues".
1414
A task queue in Dagger is an object that can be configured to accept unlaunched
1515
tasks from `@spawn`/`spawn` and either modify them or delay their launching
1616
arbitrarily. By default, Dagger tasks are enqueued through the
17-
`EagerTaskQueue`, which submits tasks directly into the scheduler before
17+
`DefaultTaskQueue`, which submits tasks directly into the scheduler before
1818
`@spawn`/`spawn` returns. However, Dagger also has an `InOrderTaskQueue`, which
1919
ensures that tasks enqueued through it execute sequentially with respect to
2020
each other. This queue can be allocated with `Dagger.spawn_sequential`:

Diff for: docs/src/task-spawning.md

+5-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ or `spawn` if it's more convenient:
1212

1313
`Dagger.spawn(f, Dagger.Options(options), args...; kwargs...)`
1414

15-
When called, it creates an [`EagerThunk`](@ref) (also known as a "thunk" or
15+
When called, it creates an [`DTask`](@ref) (also known as a "thunk" or
1616
"task") object representing a call to function `f` with the arguments `args` and
1717
keyword arguments `kwargs`. If it is called with other thunks as args/kwargs,
1818
such as in `Dagger.@spawn f(Dagger.@spawn g())`, then, in this example, the
@@ -22,9 +22,9 @@ waits on `g()` to complete before executing.
2222

2323
An important observation to make is that, for each argument to
2424
`@spawn`/`spawn`, if the argument is the result of another `@spawn`/`spawn`
25-
call (thus it's an [`EagerThunk`](@ref)), the argument will be computed first, and then
25+
call (thus it's an [`DTask`](@ref)), the argument will be computed first, and then
2626
its result will be passed into the function receiving the argument. If the
27-
argument is *not* an [`EagerThunk`](@ref) (instead, some other type of Julia object),
27+
argument is *not* an [`DTask`](@ref) (instead, some other type of Julia object),
2828
it'll be passed as-is to the function `f` (with some exceptions).
2929

3030
## Options
@@ -75,7 +75,7 @@ The final result (from `fetch(s)`) is the obvious consequence of the operation:
7575

7676
Dagger's `@spawn` macro works similarly to `@async` and `Threads.@spawn`: when
7777
called, it wraps the function call specified by the user in an
78-
[`EagerThunk`](@ref) object, and immediately places it onto a running scheduler,
78+
[`DTask`](@ref) object, and immediately places it onto a running scheduler,
7979
to be executed once its dependencies are fulfilled.
8080

8181
```julia
@@ -114,7 +114,7 @@ One can also safely call `@spawn` from another worker (not ID 1), and it will be
114114

115115
```
116116
x = fetch(Distributed.@spawnat 2 Dagger.@spawn 1+2) # fetches the result of `@spawnat`
117-
x::EagerThunk
117+
x::DTask
118118
@assert fetch(x) == 3 # fetch the result of `@spawn`
119119
```
120120

Diff for: docs/src/use-cases/parallel-nested-loops.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ end
7272

7373
In this code we have job interdependence. Firstly, we are calculating the
7474
standard deviation `σ` and than we are using that value in the function `f`.
75-
Since `Dagger.@spawn` yields an `EagerThunk` rather than actual values, we need
75+
Since `Dagger.@spawn` yields an `DTask` rather than actual values, we need
7676
to use the `fetch` function to obtain those values. In this example, the value
7777
fetching is perfomed once all computations are completed (note that `@sync`
7878
preceding the loop forces the loop to wait for all jobs to complete). Also,
7979
note that contrary to the previous example, we do not need to implement locking
80-
as we are just pushing the `EagerThunk` results of `Dagger.@spawn` serially
80+
as we are just pushing the `DTask` results of `Dagger.@spawn` serially
8181
into the DataFrame (which is fast since `Dagger.@spawn` doesn't block).
8282

8383
The above use case scenario has been tested by running `julia -t 8` (or with

Diff for: ext/GraphVizExt.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ else
77
end
88

99
import Dagger
10-
import Dagger: EagerThunk, Chunk, Processor
10+
import Dagger: DTask, Chunk, Processor
1111
import Dagger.TimespanLogging: Timespan
1212
import Graphs: SimpleDiGraph, add_edge!, add_vertex!, inneighbors, outneighbors, vertices, is_directed, edges, nv, src, dst
1313

Diff for: ext/PlotsExt.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ else
99
end
1010

1111
import Dagger
12-
import Dagger: EagerThunk, Chunk, Processor
12+
import Dagger: DTask, Chunk, Processor
1313
import Dagger.TimespanLogging: Timespan
1414

1515
function logs_to_df(logs::Dict)

Diff for: src/Dagger.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ include("utils/processors.jl")
4545
include("task-tls.jl")
4646
include("scopes.jl")
4747
include("utils/scopes.jl")
48-
include("eager_thunk.jl")
48+
include("dtask.jl")
4949
include("queue.jl")
5050
include("thunk.jl")
5151
include("submission.jl")

Diff for: src/array/matrix.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ function (+)(a::ArrayDomain, b::ArrayDomain)
6666
end
6767

6868
struct BinaryComputeOp{F} end
69-
BinaryComputeOp{F}(x::Union{Chunk,EagerThunk}, y::Union{Chunk,EagerThunk}) where F = @spawn F(x, y)
69+
BinaryComputeOp{F}(x::Union{Chunk,DTask}, y::Union{Chunk,DTask}) where F = @spawn F(x, y)
7070
BinaryComputeOp{F}(x, y) where F = F(x, y)
7171

7272
const AddComputeOp = BinaryComputeOp{+}

Diff for: src/array/sort.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ function dsort_chunks(cs, nchunks=length(cs), nsamples=2000;
305305
end
306306

307307
function propagate_affinity!(c, aff)
308-
if !isa(c, EagerThunk)
308+
if !isa(c, DTask)
309309
return
310310
end
311311
if c.affinity !== nothing

Diff for: src/chunks.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ function shard(@nospecialize(f); procs=nothing, workers=nothing, per_thread=fals
199199
end
200200
end
201201
isempty(procs) && throw(ArgumentError("Cannot create empty Shard"))
202-
shard_running_dict = Dict{Processor,EagerThunk}()
202+
shard_running_dict = Dict{Processor,DTask}()
203203
for proc in procs
204204
scope = proc isa OSProc ? ProcessScope(proc) : ExactScope(proc)
205205
thunk = Dagger.@spawn scope=scope _mutable_inner(f, proc, scope)

Diff for: src/datadeps.jl

+20-20
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,27 @@ struct DataDepsTaskQueue <: AbstractTaskQueue
2020
upper_queue::AbstractTaskQueue
2121
# The mapping of unique objects to previously-launched tasks,
2222
# and their data dependency on the object (read, write)
23-
deps::IdDict{Any, Vector{Pair{Tuple{Bool,Bool}, EagerThunk}}}
23+
deps::IdDict{Any, Vector{Pair{Tuple{Bool,Bool}, DTask}}}
2424

2525
# Whether to analyze the DAG statically or eagerly
2626
# The fields following only apply when static==true
2727
static::Bool
2828
# The set of tasks that have already been seen
29-
seen_tasks::Union{Vector{Pair{EagerTaskSpec,EagerThunk}},Nothing}
29+
seen_tasks::Union{Vector{Pair{DTaskSpec,DTask}},Nothing}
3030
# The data-dependency graph of all tasks
3131
g::Union{SimpleDiGraph{Int},Nothing}
3232
# The mapping from task to graph ID
33-
task_to_id::Union{Dict{EagerThunk,Int},Nothing}
33+
task_to_id::Union{Dict{DTask,Int},Nothing}
3434
# How to traverse the dependency graph when launching tasks
3535
traversal::Symbol
3636

3737
function DataDepsTaskQueue(upper_queue; static::Bool=true,
3838
traversal::Symbol=:inorder)
39-
deps = IdDict{Any, Vector{Pair{Tuple{Bool,Bool}, EagerThunk}}}()
39+
deps = IdDict{Any, Vector{Pair{Tuple{Bool,Bool}, DTask}}}()
4040
if static
41-
seen_tasks = Pair{EagerTaskSpec,EagerThunk}[]
41+
seen_tasks = Pair{DTaskSpec,DTask}[]
4242
g = SimpleDiGraph()
43-
task_to_id = Dict{EagerThunk,Int}()
43+
task_to_id = Dict{DTask,Int}()
4444
else
4545
seen_tasks = nothing
4646
g = nothing
@@ -51,7 +51,7 @@ struct DataDepsTaskQueue <: AbstractTaskQueue
5151
end
5252
end
5353

54-
function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerThunk})
54+
function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{DTaskSpec,DTask})
5555
# If static, record this task and its edges in the graph
5656
if queue.static
5757
g = queue.g
@@ -91,18 +91,18 @@ function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerT
9191
readdep = true
9292
end
9393
spec.args[idx] = pos => arg
94-
arg_data = arg isa EagerThunk ? fetch(arg; raw=true) : arg
94+
arg_data = arg isa DTask ? fetch(arg; raw=true) : arg
9595

9696
push!(deps_to_add, arg_data => (readdep, writedep))
9797

9898
if !haskey(queue.deps, arg_data)
9999
continue
100100
end
101-
argdeps = queue.deps[arg_data]::Vector{Pair{Tuple{Bool,Bool}, EagerThunk}}
101+
argdeps = queue.deps[arg_data]::Vector{Pair{Tuple{Bool,Bool}, DTask}}
102102
if readdep
103103
# When you have an in dependency, sync with the previous out
104104
for ((other_readdep::Bool, other_writedep::Bool),
105-
other_task::EagerThunk) in argdeps
105+
other_task::DTask) in argdeps
106106
if other_writedep
107107
if queue.static
108108
other_task_id = task_to_id[other_task]
@@ -116,7 +116,7 @@ function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerT
116116
if writedep
117117
# When you have an out dependency, sync with the previous in or out
118118
for ((other_readdep::Bool, other_writedep::Bool),
119-
other_task::EagerThunk) in argdeps
119+
other_task::DTask) in argdeps
120120
if other_readdep || other_writedep
121121
if queue.static
122122
other_task_id = task_to_id[other_task]
@@ -130,7 +130,7 @@ function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerT
130130
end
131131
for (arg_data, (readdep, writedep)) in deps_to_add
132132
argdeps = get!(queue.deps, arg_data) do
133-
Vector{Pair{Tuple{Bool,Bool}, EagerThunk}}()
133+
Vector{Pair{Tuple{Bool,Bool}, DTask}}()
134134
end
135135
push!(argdeps, (readdep, writedep) => task)
136136
end
@@ -139,15 +139,15 @@ function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerT
139139
spec.options = merge(opts, (;syncdeps, scope))
140140
end
141141
end
142-
function enqueue!(queue::DataDepsTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk})
142+
function enqueue!(queue::DataDepsTaskQueue, spec::Pair{DTaskSpec,DTask})
143143
_enqueue!(queue, spec)
144144
if queue.static
145145
push!(queue.seen_tasks, spec)
146146
else
147147
enqueue!(queue.upper_queue, spec)
148148
end
149149
end
150-
function enqueue!(queue::DataDepsTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}})
150+
function enqueue!(queue::DataDepsTaskQueue, specs::Vector{Pair{DTaskSpec,DTask}})
151151
for spec in specs
152152
_enqueue!(queue, spec)
153153
end
@@ -173,7 +173,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
173173
# Determine which arguments could be written to, and thus need tracking
174174
arg_has_writedep = IdDict{Any,Bool}(arg=>any(argdep->argdep[1][2], argdeps) for (arg, argdeps) in queue.deps)
175175
has_writedep(arg) = haskey(arg_has_writedep, arg) && arg_has_writedep[arg]
176-
function has_writedep(arg, task::EagerThunk)
176+
function has_writedep(arg, task::DTask)
177177
haskey(arg_has_writedep, arg) || return false
178178
any_writedep = false
179179
for ((readdep, writedep), other_task) in queue.deps[arg]
@@ -184,7 +184,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
184184
end
185185
error("Task isn't in argdeps set")
186186
end
187-
function is_writedep(arg, task::EagerThunk)
187+
function is_writedep(arg, task::DTask)
188188
haskey(arg_has_writedep, arg) || return false
189189
for ((readdep, writedep), other_task) in queue.deps[arg]
190190
if task === other_task
@@ -221,8 +221,8 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
221221
data_locality = IdDict{Any,MemorySpace}(data=>memory_space(data) for data in keys(queue.deps))
222222

223223
# Track writers ("owners") and readers
224-
args_owner = IdDict{Any,Union{EagerThunk,Nothing}}(arg=>nothing for arg in keys(queue.deps))
225-
args_readers = IdDict{Any,Vector{EagerThunk}}(arg=>EagerThunk[] for arg in keys(queue.deps))
224+
args_owner = IdDict{Any,Union{DTask,Nothing}}(arg=>nothing for arg in keys(queue.deps))
225+
args_readers = IdDict{Any,Vector{DTask}}(arg=>DTask[] for arg in keys(queue.deps))
226226
function get_write_deps!(arg, syncdeps)
227227
haskey(args_owner, arg) || return
228228
if (owner = args_owner[arg]) !== nothing
@@ -327,7 +327,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
327327

328328
# Spawn copies before user's task, as necessary
329329
@dagdebug nothing :spawn_datadeps "($(spec.f)) Scheduling: $our_proc ($our_space)"
330-
task_args = map(((pos, arg)=_arg,)->pos=>(arg isa EagerThunk ? fetch(arg; raw=true) : arg), copy(spec.args))
330+
task_args = map(((pos, arg)=_arg,)->pos=>(arg isa DTask ? fetch(arg; raw=true) : arg), copy(spec.args))
331331

332332
# Copy args from local to remote
333333
for (idx, (pos, arg)) in enumerate(task_args)
@@ -467,7 +467,7 @@ is experimental and subject to change.
467467
function spawn_datadeps(f::Base.Callable; static::Bool=true,
468468
traversal::Symbol=:inorder)
469469
wait_all(; check_errors=true) do
470-
queue = DataDepsTaskQueue(get_options(:task_queue, EagerTaskQueue());
470+
queue = DataDepsTaskQueue(get_options(:task_queue, DefaultTaskQueue());
471471
static, traversal)
472472
result = with_options(f; task_queue=queue)
473473
if queue.static

Diff for: src/eager_thunk.jl renamed to src/dtask.jl

+21-17
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ ThunkFuture() = ThunkFuture(Future())
77
Base.isready(t::ThunkFuture) = isready(t.future)
88
Base.wait(t::ThunkFuture) = Dagger.Sch.thunk_yield() do
99
wait(t.future)
10+
return
1011
end
1112
function Base.fetch(t::ThunkFuture; proc=OSProc(), raw=false)
1213
error, value = Dagger.Sch.thunk_yield() do
@@ -37,47 +38,50 @@ Options(;options...) = Options((;options...))
3738
Options(options...) = Options((;options...))
3839

3940
"""
40-
EagerThunk
41+
DTask
4142
42-
Returned from `spawn`/`@spawn` calls. Represents a task that is in the
43-
scheduler, potentially ready to execute, executing, or finished executing. May
44-
be `fetch`'d or `wait`'d on at any time.
43+
Returned from `Dagger.@spawn`/`Dagger.spawn` calls. Represents a task that is
44+
in the scheduler, potentially ready to execute, executing, or finished
45+
executing. May be `fetch`'d or `wait`'d on at any time. See `Dagger.@spawn` for
46+
more details.
4547
"""
46-
mutable struct EagerThunk
48+
mutable struct DTask
4749
uid::UInt
4850
future::ThunkFuture
4951
finalizer_ref::DRef
5052
thunk_ref::DRef
51-
EagerThunk(uid, future, finalizer_ref) = new(uid, future, finalizer_ref)
53+
DTask(uid, future, finalizer_ref) = new(uid, future, finalizer_ref)
5254
end
5355

54-
Base.isready(t::EagerThunk) = isready(t.future)
55-
function Base.wait(t::EagerThunk)
56+
const EagerThunk = DTask
57+
58+
Base.isready(t::DTask) = isready(t.future)
59+
function Base.wait(t::DTask)
5660
if !isdefined(t, :thunk_ref)
57-
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `EagerThunk`"))
61+
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `DTask`"))
5862
end
5963
wait(t.future)
6064
end
61-
function Base.fetch(t::EagerThunk; raw=false)
65+
function Base.fetch(t::DTask; raw=false)
6266
if !isdefined(t, :thunk_ref)
63-
throw(ConcurrencyViolationError("Cannot `fetch` an unlaunched `EagerThunk`"))
67+
throw(ConcurrencyViolationError("Cannot `fetch` an unlaunched `DTask`"))
6468
end
6569
return fetch(t.future; raw)
6670
end
67-
function Base.show(io::IO, t::EagerThunk)
71+
function Base.show(io::IO, t::DTask)
6872
status = if isdefined(t, :thunk_ref)
6973
isready(t) ? "finished" : "running"
7074
else
7175
"not launched"
7276
end
73-
print(io, "EagerThunk ($status)")
77+
print(io, "DTask ($status)")
7478
end
75-
istask(t::EagerThunk) = true
79+
istask(t::DTask) = true
7680

77-
"When finalized, cleans-up the associated `EagerThunk`."
78-
mutable struct EagerThunkFinalizer
81+
"When finalized, cleans-up the associated `DTask`."
82+
mutable struct DTaskFinalizer
7983
uid::UInt
80-
function EagerThunkFinalizer(uid)
84+
function DTaskFinalizer(uid)
8185
x = new(uid)
8286
finalizer(Sch.eager_cleanup, x)
8387
x

0 commit comments

Comments
 (0)