Skip to content

Rename EagerThunk to DTask #518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions docs/src/api-dagger/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ Pages = ["functions.md"]
```@docs
@spawn
spawn
delayed
@par
```

## Task Options Functions/Macros
Expand All @@ -38,16 +36,6 @@ scope
constrain
```

## Lazy Task Functions
```@docs
domain
compute
dependents
noffspring
order
treereduce
```

## Processor Functions
```@docs
execute!
Expand Down
2 changes: 1 addition & 1 deletion docs/src/api-dagger/types.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Pages = ["types.md"]
## Task Types
```@docs
Thunk
EagerThunk
DTask
```

## Task Options Types
Expand Down
6 changes: 3 additions & 3 deletions docs/src/darray.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ Now, `DZ` will contain the result of computing `(DX .+ DX) .* 3`.
```
julia> Dagger.chunks(DZ)
2×2 Matrix{Any}:
EagerThunk (finished) EagerThunk (finished)
EagerThunk (finished) EagerThunk (finished)
DTask (finished) DTask (finished)
DTask (finished) DTask (finished)

julia> Dagger.chunks(fetch(DZ))
2×2 Matrix{Union{Thunk, Dagger.Chunk}}:
Expand All @@ -363,7 +363,7 @@ julia> Dagger.chunks(fetch(DZ))
```

Here we can see the `DArray`'s internal representation of the partitions, which
are stored as either `EagerThunk` objects (representing an ongoing or completed
are stored as either `DTask` objects (representing an ongoing or completed
computation) or `Chunk` objects (which reference data which exist locally or on
other Julia workers). Of course, one doesn't typically need to worry about
these internal details unless implementing low-level operations on `DArray`s.
Expand Down
2 changes: 1 addition & 1 deletion docs/src/task-queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ queues".
A task queue in Dagger is an object that can be configured to accept unlaunched
tasks from `@spawn`/`spawn` and either modify them or delay their launching
arbitrarily. By default, Dagger tasks are enqueued through the
`EagerTaskQueue`, which submits tasks directly into the scheduler before
`DefaultTaskQueue`, which submits tasks directly into the scheduler before
`@spawn`/`spawn` returns. However, Dagger also has an `InOrderTaskQueue`, which
ensures that tasks enqueued through it execute sequentially with respect to
each other. This queue can be allocated with `Dagger.spawn_sequential`:
Expand Down
10 changes: 5 additions & 5 deletions docs/src/task-spawning.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ or `spawn` if it's more convenient:

`Dagger.spawn(f, Dagger.Options(options), args...; kwargs...)`

When called, it creates an [`EagerThunk`](@ref) (also known as a "thunk" or
When called, it creates an [`DTask`](@ref) (also known as a "thunk" or
"task") object representing a call to function `f` with the arguments `args` and
keyword arguments `kwargs`. If it is called with other thunks as args/kwargs,
such as in `Dagger.@spawn f(Dagger.@spawn g())`, then, in this example, the
Expand All @@ -22,9 +22,9 @@ waits on `g()` to complete before executing.

An important observation to make is that, for each argument to
`@spawn`/`spawn`, if the argument is the result of another `@spawn`/`spawn`
call (thus it's an [`EagerThunk`](@ref)), the argument will be computed first, and then
call (thus it's an [`DTask`](@ref)), the argument will be computed first, and then
its result will be passed into the function receiving the argument. If the
argument is *not* an [`EagerThunk`](@ref) (instead, some other type of Julia object),
argument is *not* an [`DTask`](@ref) (instead, some other type of Julia object),
it'll be passed as-is to the function `f` (with some exceptions).

## Options
Expand Down Expand Up @@ -75,7 +75,7 @@ The final result (from `fetch(s)`) is the obvious consequence of the operation:

Dagger's `@spawn` macro works similarly to `@async` and `Threads.@spawn`: when
called, it wraps the function call specified by the user in an
[`EagerThunk`](@ref) object, and immediately places it onto a running scheduler,
[`DTask`](@ref) object, and immediately places it onto a running scheduler,
to be executed once its dependencies are fulfilled.

```julia
Expand Down Expand Up @@ -114,7 +114,7 @@ One can also safely call `@spawn` from another worker (not ID 1), and it will be

```
x = fetch(Distributed.@spawnat 2 Dagger.@spawn 1+2) # fetches the result of `@spawnat`
x::EagerThunk
x::DTask
@assert fetch(x) == 3 # fetch the result of `@spawn`
```

Expand Down
4 changes: 2 additions & 2 deletions docs/src/use-cases/parallel-nested-loops.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ end

In this code we have job interdependence. Firstly, we are calculating the
standard deviation `σ` and than we are using that value in the function `f`.
Since `Dagger.@spawn` yields an `EagerThunk` rather than actual values, we need
Since `Dagger.@spawn` yields an `DTask` rather than actual values, we need
to use the `fetch` function to obtain those values. In this example, the value
fetching is perfomed once all computations are completed (note that `@sync`
preceding the loop forces the loop to wait for all jobs to complete). Also,
note that contrary to the previous example, we do not need to implement locking
as we are just pushing the `EagerThunk` results of `Dagger.@spawn` serially
as we are just pushing the `DTask` results of `Dagger.@spawn` serially
into the DataFrame (which is fast since `Dagger.@spawn` doesn't block).

The above use case scenario has been tested by running `julia -t 8` (or with
Expand Down
2 changes: 1 addition & 1 deletion ext/GraphVizExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ else
end

import Dagger
import Dagger: EagerThunk, Chunk, Processor
import Dagger: DTask, Chunk, Processor
import Dagger.TimespanLogging: Timespan
import Graphs: SimpleDiGraph, add_edge!, add_vertex!, inneighbors, outneighbors, vertices, is_directed, edges, nv, src, dst

Expand Down
2 changes: 1 addition & 1 deletion ext/PlotsExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ else
end

import Dagger
import Dagger: EagerThunk, Chunk, Processor
import Dagger: DTask, Chunk, Processor
import Dagger.TimespanLogging: Timespan

function logs_to_df(logs::Dict)
Expand Down
2 changes: 1 addition & 1 deletion src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ include("utils/processors.jl")
include("task-tls.jl")
include("scopes.jl")
include("utils/scopes.jl")
include("eager_thunk.jl")
include("dtask.jl")
include("queue.jl")
include("thunk.jl")
include("submission.jl")
Expand Down
2 changes: 1 addition & 1 deletion src/array/matrix.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ function (+)(a::ArrayDomain, b::ArrayDomain)
end

struct BinaryComputeOp{F} end
BinaryComputeOp{F}(x::Union{Chunk,EagerThunk}, y::Union{Chunk,EagerThunk}) where F = @spawn F(x, y)
BinaryComputeOp{F}(x::Union{Chunk,DTask}, y::Union{Chunk,DTask}) where F = @spawn F(x, y)
BinaryComputeOp{F}(x, y) where F = F(x, y)

const AddComputeOp = BinaryComputeOp{+}
Expand Down
2 changes: 1 addition & 1 deletion src/array/sort.jl
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ function dsort_chunks(cs, nchunks=length(cs), nsamples=2000;
end

function propagate_affinity!(c, aff)
if !isa(c, EagerThunk)
if !isa(c, DTask)
return
end
if c.affinity !== nothing
Expand Down
2 changes: 1 addition & 1 deletion src/chunks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ function shard(@nospecialize(f); procs=nothing, workers=nothing, per_thread=fals
end
end
isempty(procs) && throw(ArgumentError("Cannot create empty Shard"))
shard_running_dict = Dict{Processor,EagerThunk}()
shard_running_dict = Dict{Processor,DTask}()
for proc in procs
scope = proc isa OSProc ? ProcessScope(proc) : ExactScope(proc)
thunk = Dagger.@spawn scope=scope _mutable_inner(f, proc, scope)
Expand Down
40 changes: 20 additions & 20 deletions src/datadeps.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,27 @@ struct DataDepsTaskQueue <: AbstractTaskQueue
upper_queue::AbstractTaskQueue
# The mapping of unique objects to previously-launched tasks,
# and their data dependency on the object (read, write)
deps::IdDict{Any, Vector{Pair{Tuple{Bool,Bool}, EagerThunk}}}
deps::IdDict{Any, Vector{Pair{Tuple{Bool,Bool}, DTask}}}

# Whether to analyze the DAG statically or eagerly
# The fields following only apply when static==true
static::Bool
# The set of tasks that have already been seen
seen_tasks::Union{Vector{Pair{EagerTaskSpec,EagerThunk}},Nothing}
seen_tasks::Union{Vector{Pair{DTaskSpec,DTask}},Nothing}
# The data-dependency graph of all tasks
g::Union{SimpleDiGraph{Int},Nothing}
# The mapping from task to graph ID
task_to_id::Union{Dict{EagerThunk,Int},Nothing}
task_to_id::Union{Dict{DTask,Int},Nothing}
# How to traverse the dependency graph when launching tasks
traversal::Symbol

function DataDepsTaskQueue(upper_queue; static::Bool=true,
traversal::Symbol=:inorder)
deps = IdDict{Any, Vector{Pair{Tuple{Bool,Bool}, EagerThunk}}}()
deps = IdDict{Any, Vector{Pair{Tuple{Bool,Bool}, DTask}}}()
if static
seen_tasks = Pair{EagerTaskSpec,EagerThunk}[]
seen_tasks = Pair{DTaskSpec,DTask}[]
g = SimpleDiGraph()
task_to_id = Dict{EagerThunk,Int}()
task_to_id = Dict{DTask,Int}()
else
seen_tasks = nothing
g = nothing
Expand All @@ -51,7 +51,7 @@ struct DataDepsTaskQueue <: AbstractTaskQueue
end
end

function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerThunk})
function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{DTaskSpec,DTask})
# If static, record this task and its edges in the graph
if queue.static
g = queue.g
Expand Down Expand Up @@ -91,18 +91,18 @@ function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerT
readdep = true
end
spec.args[idx] = pos => arg
arg_data = arg isa EagerThunk ? fetch(arg; raw=true) : arg
arg_data = arg isa DTask ? fetch(arg; raw=true) : arg

push!(deps_to_add, arg_data => (readdep, writedep))

if !haskey(queue.deps, arg_data)
continue
end
argdeps = queue.deps[arg_data]::Vector{Pair{Tuple{Bool,Bool}, EagerThunk}}
argdeps = queue.deps[arg_data]::Vector{Pair{Tuple{Bool,Bool}, DTask}}
if readdep
# When you have an in dependency, sync with the previous out
for ((other_readdep::Bool, other_writedep::Bool),
other_task::EagerThunk) in argdeps
other_task::DTask) in argdeps
if other_writedep
if queue.static
other_task_id = task_to_id[other_task]
Expand All @@ -116,7 +116,7 @@ function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerT
if writedep
# When you have an out dependency, sync with the previous in or out
for ((other_readdep::Bool, other_writedep::Bool),
other_task::EagerThunk) in argdeps
other_task::DTask) in argdeps
if other_readdep || other_writedep
if queue.static
other_task_id = task_to_id[other_task]
Expand All @@ -130,7 +130,7 @@ function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerT
end
for (arg_data, (readdep, writedep)) in deps_to_add
argdeps = get!(queue.deps, arg_data) do
Vector{Pair{Tuple{Bool,Bool}, EagerThunk}}()
Vector{Pair{Tuple{Bool,Bool}, DTask}}()
end
push!(argdeps, (readdep, writedep) => task)
end
Expand All @@ -139,15 +139,15 @@ function _enqueue!(queue::DataDepsTaskQueue, fullspec::Pair{EagerTaskSpec,EagerT
spec.options = merge(opts, (;syncdeps, scope))
end
end
function enqueue!(queue::DataDepsTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk})
function enqueue!(queue::DataDepsTaskQueue, spec::Pair{DTaskSpec,DTask})
_enqueue!(queue, spec)
if queue.static
push!(queue.seen_tasks, spec)
else
enqueue!(queue.upper_queue, spec)
end
end
function enqueue!(queue::DataDepsTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}})
function enqueue!(queue::DataDepsTaskQueue, specs::Vector{Pair{DTaskSpec,DTask}})
for spec in specs
_enqueue!(queue, spec)
end
Expand All @@ -173,7 +173,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
# Determine which arguments could be written to, and thus need tracking
arg_has_writedep = IdDict{Any,Bool}(arg=>any(argdep->argdep[1][2], argdeps) for (arg, argdeps) in queue.deps)
has_writedep(arg) = haskey(arg_has_writedep, arg) && arg_has_writedep[arg]
function has_writedep(arg, task::EagerThunk)
function has_writedep(arg, task::DTask)
haskey(arg_has_writedep, arg) || return false
any_writedep = false
for ((readdep, writedep), other_task) in queue.deps[arg]
Expand All @@ -184,7 +184,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
end
error("Task isn't in argdeps set")
end
function is_writedep(arg, task::EagerThunk)
function is_writedep(arg, task::DTask)
haskey(arg_has_writedep, arg) || return false
for ((readdep, writedep), other_task) in queue.deps[arg]
if task === other_task
Expand Down Expand Up @@ -221,8 +221,8 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
data_locality = IdDict{Any,MemorySpace}(data=>memory_space(data) for data in keys(queue.deps))

# Track writers ("owners") and readers
args_owner = IdDict{Any,Union{EagerThunk,Nothing}}(arg=>nothing for arg in keys(queue.deps))
args_readers = IdDict{Any,Vector{EagerThunk}}(arg=>EagerThunk[] for arg in keys(queue.deps))
args_owner = IdDict{Any,Union{DTask,Nothing}}(arg=>nothing for arg in keys(queue.deps))
args_readers = IdDict{Any,Vector{DTask}}(arg=>DTask[] for arg in keys(queue.deps))
function get_write_deps!(arg, syncdeps)
haskey(args_owner, arg) || return
if (owner = args_owner[arg]) !== nothing
Expand Down Expand Up @@ -327,7 +327,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)

# Spawn copies before user's task, as necessary
@dagdebug nothing :spawn_datadeps "($(spec.f)) Scheduling: $our_proc ($our_space)"
task_args = map(((pos, arg)=_arg,)->pos=>(arg isa EagerThunk ? fetch(arg; raw=true) : arg), copy(spec.args))
task_args = map(((pos, arg)=_arg,)->pos=>(arg isa DTask ? fetch(arg; raw=true) : arg), copy(spec.args))

# Copy args from local to remote
for (idx, (pos, arg)) in enumerate(task_args)
Expand Down Expand Up @@ -467,7 +467,7 @@ is experimental and subject to change.
function spawn_datadeps(f::Base.Callable; static::Bool=true,
traversal::Symbol=:inorder)
wait_all(; check_errors=true) do
queue = DataDepsTaskQueue(get_options(:task_queue, EagerTaskQueue());
queue = DataDepsTaskQueue(get_options(:task_queue, DefaultTaskQueue());
static, traversal)
result = with_options(f; task_queue=queue)
if queue.static
Expand Down
38 changes: 21 additions & 17 deletions src/eager_thunk.jl → src/dtask.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ThunkFuture() = ThunkFuture(Future())
Base.isready(t::ThunkFuture) = isready(t.future)
Base.wait(t::ThunkFuture) = Dagger.Sch.thunk_yield() do
wait(t.future)
return
end
function Base.fetch(t::ThunkFuture; proc=OSProc(), raw=false)
error, value = Dagger.Sch.thunk_yield() do
Expand Down Expand Up @@ -37,47 +38,50 @@ Options(;options...) = Options((;options...))
Options(options...) = Options((;options...))

"""
EagerThunk
DTask

Returned from `spawn`/`@spawn` calls. Represents a task that is in the
scheduler, potentially ready to execute, executing, or finished executing. May
be `fetch`'d or `wait`'d on at any time.
Returned from `Dagger.@spawn`/`Dagger.spawn` calls. Represents a task that is
in the scheduler, potentially ready to execute, executing, or finished
executing. May be `fetch`'d or `wait`'d on at any time. See `Dagger.@spawn` for
more details.
"""
mutable struct EagerThunk
mutable struct DTask
uid::UInt
future::ThunkFuture
finalizer_ref::DRef
thunk_ref::DRef
EagerThunk(uid, future, finalizer_ref) = new(uid, future, finalizer_ref)
DTask(uid, future, finalizer_ref) = new(uid, future, finalizer_ref)
end

Base.isready(t::EagerThunk) = isready(t.future)
function Base.wait(t::EagerThunk)
const EagerThunk = DTask

Base.isready(t::DTask) = isready(t.future)
function Base.wait(t::DTask)
if !isdefined(t, :thunk_ref)
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `EagerThunk`"))
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `DTask`"))
end
wait(t.future)
end
function Base.fetch(t::EagerThunk; raw=false)
function Base.fetch(t::DTask; raw=false)
if !isdefined(t, :thunk_ref)
throw(ConcurrencyViolationError("Cannot `fetch` an unlaunched `EagerThunk`"))
throw(ConcurrencyViolationError("Cannot `fetch` an unlaunched `DTask`"))
end
return fetch(t.future; raw)
end
function Base.show(io::IO, t::EagerThunk)
function Base.show(io::IO, t::DTask)
status = if isdefined(t, :thunk_ref)
isready(t) ? "finished" : "running"
else
"not launched"
end
print(io, "EagerThunk ($status)")
print(io, "DTask ($status)")
end
istask(t::EagerThunk) = true
istask(t::DTask) = true

"When finalized, cleans-up the associated `EagerThunk`."
mutable struct EagerThunkFinalizer
"When finalized, cleans-up the associated `DTask`."
mutable struct DTaskFinalizer
uid::UInt
function EagerThunkFinalizer(uid)
function DTaskFinalizer(uid)
x = new(uid)
finalizer(Sch.eager_cleanup, x)
x
Expand Down
Loading