Skip to content

More Thunk to DTask renames #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/src/api-dagger/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ addprocs!
rmprocs!
```

## Thunk Execution Environment Functions
## DTask Execution Environment Functions

These functions are used within the function called by a `Thunk`.
These functions are used within the function called by a `DTask`.

```@docs
in_thunk
thunk_processor
in_task
task_processor
```

### Dynamic Scheduler Control Functions
Expand Down
2 changes: 1 addition & 1 deletion docs/src/darray.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ julia> Dagger.chunks(DZ)
DTask (finished) DTask (finished)

julia> Dagger.chunks(fetch(DZ))
2×2 Matrix{Union{Thunk, Dagger.Chunk}}:
2×2 Matrix{Union{DTask, Dagger.Chunk}}:
Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(4, 8, 0x0000000000004e20), ThreadProc(4, 1), AnyScope(), true) … Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(2, 5, 0x0000000000004e20), ThreadProc(2, 1), AnyScope(), true)
Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(5, 5, 0x0000000000004e20), ThreadProc(5, 1), AnyScope(), true) Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(3, 3, 0x0000000000004e20), ThreadProc(3, 1), AnyScope(), true)
```
Expand Down
8 changes: 4 additions & 4 deletions docs/src/scopes.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ using VideoIO, Distributed

function get_handle()
handle = VideoIO.opencamera()
proc = Dagger.thunk_processor()
proc = Dagger.task_processor()
scope = Dagger.scope(worker=myid()) # constructs a `ProcessScope`
return Dagger.tochunk(handle, proc, scope)
end
Expand Down Expand Up @@ -78,7 +78,7 @@ function generate()
fill!(arr, 1)
Mmap.sync!(arr)
# Note: Dagger.scope() does not yet support node scopes
Dagger.tochunk(path, Dagger.thunk_processor(), NodeScope())
Dagger.tochunk(path, Dagger.task_processor(), NodeScope())
end

function consume(path)
Expand Down Expand Up @@ -120,7 +120,7 @@ function generate_secrets()
secrets = open("/shared/secret_results.txt", "r") do io
String(read(io))
end
Dagger.tochunk(secrets, Dagger.thunk_processor(), secrets_scope)
Dagger.tochunk(secrets, Dagger.task_processor(), secrets_scope)
end

summarize(secrets) = occursin("QA Pass", secrets)
Expand All @@ -144,7 +144,7 @@ constraints). For example:
ps2 = ProcessScope(2)
ps3 = ProcessScope(3)

generate(scope) = Dagger.tochunk(rand(64), Dagger.thunk_processor(), scope)
generate(scope) = Dagger.tochunk(rand(64), Dagger.task_processor(), scope)

d2 = Dagger.@spawn generate(ps2) # Run on process 2
d3 = Dagger.@spawn generate(ps3) # Run on process 3
Expand Down
4 changes: 2 additions & 2 deletions src/array/alloc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ function partition(p::AbstractBlocks, dom::ArrayDomain)
end

function allocate_array(f, T, idx, sz)
new_f = allocate_array_func(thunk_processor(), f)
new_f = allocate_array_func(task_processor(), f)
return new_f(idx, T, sz)
end
function allocate_array(f, T, sz)
new_f = allocate_array_func(thunk_processor(), f)
new_f = allocate_array_func(task_processor(), f)
return new_f(T, sz)
end
allocate_array_func(::Processor, f) = f
Expand Down
6 changes: 3 additions & 3 deletions src/array/cholesky.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
LinearAlgebra.cholcopy(A::DArray{T,2}) where T = copy(A)
function potrf_checked!(uplo, A, info_arr)
_A, info = move(thunk_processor(), LAPACK.potrf!)(uplo, A)
_A, info = move(task_processor(), LAPACK.potrf!)(uplo, A)
if info != 0
fill!(info_arr, info)
throw(PosDefException(info))
Expand Down Expand Up @@ -41,7 +41,7 @@ function LinearAlgebra._chol!(A::DArray{T,2}, ::Type{UpperTriangular}) where T
end
end
catch err
err isa ThunkFailedException || rethrow()
err isa DTaskFailedException || rethrow()
err = Dagger.Sch.unwrap_nested_exception(err.ex)
err isa PosDefException || rethrow()
end
Expand Down Expand Up @@ -82,7 +82,7 @@ function LinearAlgebra._chol!(A::DArray{T,2}, ::Type{LowerTriangular}) where T
end
end
catch err
err isa ThunkFailedException || rethrow()
err isa DTaskFailedException || rethrow()
err = Dagger.Sch.unwrap_nested_exception(err.ex)
err isa PosDefException || rethrow()
end
Expand Down
2 changes: 2 additions & 0 deletions src/dtask.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export DTask

"A future holding the result of a `Thunk`."
struct ThunkFuture
future::Future
Expand Down
4 changes: 2 additions & 2 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import Random: randperm
import Base: @invokelatest

import ..Dagger
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, thunk_processor, constrain, cputhreadtime
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime
import ..Dagger: @dagdebug, @safe_lock_spin1
import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek

Expand Down
2 changes: 1 addition & 1 deletion src/sch/dynamic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ end
function Base.fetch(h::SchedulerHandle, id::ThunkID)
future = ThunkFuture(Future(1))
exec!(_register_future!, h, future, id, true)
fetch(future; proc=thunk_processor())
fetch(future; proc=task_processor())
end
"""
Waits on a thunk to complete, and fetches its result. If `check` is set to
Expand Down
4 changes: 2 additions & 2 deletions src/sch/eager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ Allows a thunk to safely wait on another thunk by temporarily reducing its
effective occupancy to 0, which allows a newly-spawned task to run.
"""
function thunk_yield(f)
if Dagger.in_thunk()
if Dagger.in_task()
h = sch_handle()
tls = Dagger.get_tls()
proc = Dagger.thunk_processor()
proc = Dagger.task_processor()
proc_istate = proc_states(tls.sch_uid) do states
states[proc].state
end
Expand Down
6 changes: 5 additions & 1 deletion src/sch/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ end
"Marks `thunk` and all dependent thunks as failed."
function set_failed!(state, origin, thunk=origin)
filter!(x->x!==thunk, state.ready)
state.cache[thunk] = ThunkFailedException(thunk, origin, state.cache[origin])
ex = state.cache[origin]
if ex isa RemoteException
ex = ex.captured
end
state.cache[thunk] = DTaskFailedException(thunk, origin, ex)
state.errored[thunk] = true
finish_failed!(state, thunk, origin)
end
Expand Down
2 changes: 1 addition & 1 deletion src/submission.jl
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ end

# Local -> Remote
function eager_submit!(ntasks, uid, future, finalizer_ref, f, args, options)
if Dagger.in_thunk()
if Dagger.in_task()
h = Dagger.sch_handle()
return exec!(eager_submit_internal!, h, ntasks, uid, future, finalizer_ref, f, args, options, true)
elseif myid() != 1
Expand Down
16 changes: 9 additions & 7 deletions src/task-tls.jl
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
# In-Thunk Helpers

"""
thunk_processor()
task_processor()

Get the current processor executing the current thunk.
Get the current processor executing the current Dagger task.
"""
thunk_processor() = task_local_storage(:_dagger_processor)::Processor
task_processor() = task_local_storage(:_dagger_processor)::Processor
@deprecate thunk_processor() task_processor()

"""
in_thunk()
in_task()

Returns `true` if currently in a [`Thunk`](@ref) process, else `false`.
Returns `true` if currently executing in a [`DTask`](@ref), else `false`.
"""
in_thunk() = haskey(task_local_storage(), :_dagger_sch_uid)
in_task() = haskey(task_local_storage(), :_dagger_sch_uid)
@deprecate in_thunk() in_task()

"""
get_tls()
Expand All @@ -22,7 +24,7 @@ Gets all Dagger TLS variable as a `NamedTuple`.
get_tls() = (
sch_uid=task_local_storage(:_dagger_sch_uid),
sch_handle=task_local_storage(:_dagger_sch_handle),
processor=thunk_processor(),
processor=task_processor(),
task_spec=task_local_storage(:_dagger_task_spec),
)

Expand Down
27 changes: 14 additions & 13 deletions src/thunk.jl
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,23 @@ function Base.convert(::Type{ThunkSummary}, t::WeakThunk)
return t
end

struct ThunkFailedException{E<:Exception} <: Exception
struct DTaskFailedException{E<:Exception} <: Exception
thunk::ThunkSummary
origin::ThunkSummary
ex::E
end
ThunkFailedException(thunk, origin, ex::E) where E =
ThunkFailedException{E}(convert(ThunkSummary, thunk),
DTaskFailedException(thunk, origin, ex::E) where E =
DTaskFailedException{E}(convert(ThunkSummary, thunk),
convert(ThunkSummary, origin),
ex)
function Base.showerror(io::IO, ex::ThunkFailedException)
@deprecate ThunkFailedException DTaskFailedException
function Base.showerror(io::IO, ex::DTaskFailedException)
t = ex.thunk

# Find root-cause thunk
last_tfex = ex
failed_tasks = Union{ThunkSummary,Nothing}[]
while last_tfex.ex isa ThunkFailedException
while last_tfex.ex isa DTaskFailedException
push!(failed_tasks, last_tfex.thunk)
last_tfex = last_tfex.ex
end
Expand All @@ -246,7 +247,7 @@ function Base.showerror(io::IO, ex::ThunkFailedException)
Tinputs = Any[]
for (_, input) in t.inputs
if istask(input)
push!(Tinputs, "Thunk(id=$(input.id))")
push!(Tinputs, "DTask(id=$(input.id))")
else
push!(Tinputs, input)
end
Expand All @@ -256,28 +257,28 @@ function Base.showerror(io::IO, ex::ThunkFailedException)
else
"$(t.f)($(length(Tinputs)) inputs...)"
end
return "Thunk(id=$(t.id), $t_sig)"
return "DTask(id=$(t.id), $t_sig)"
end
t_str = thunk_string(t)
o_str = thunk_string(o)
println(io, "ThunkFailedException:")
println(io, " Root Exception Type: $(typeof(root_ex))")
println(io, "DTaskFailedException:")
println(io, " Root Exception Type: $(typeof(Sch.unwrap_nested_exception(root_ex)))")
println(io, " Root Exception:")
Base.showerror(io, root_ex); println(io)
if t.id !== o.id
println(io, " Root Thunk: $o_str")
println(io, " Root Task: $o_str")
if length(failed_tasks) <= 4
for i in failed_tasks
i_str = thunk_string(i)
println(io, " Inner Thunk: $i_str")
println(io, " Inner Task: $i_str")
end
else
println(io, " ...")
println(io, " $(length(failed_tasks)) Inner Thunks...")
println(io, " $(length(failed_tasks)) Inner Tasks...")
println(io, " ...")
end
end
print(io, " This Thunk: $t_str")
print(io, " This Task: $t_str")
end

"""
Expand Down
2 changes: 1 addition & 1 deletion test/datadeps.jl
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ function test_datadeps(;args_chunks::Bool,

# Scope
exec_procs = fetch.(Dagger.spawn_datadeps(;aliasing) do
[Dagger.@spawn Dagger.thunk_processor() for i in 1:10]
[Dagger.@spawn Dagger.task_processor() for i in 1:10]
end)
unique!(exec_procs)
scope = Dagger.get_options(:scope)
Expand Down
2 changes: 1 addition & 1 deletion test/mutation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ end
x = Dagger.@mutable worker=w Ref{Int}()
@test fetch(Dagger.@spawn mutable_update!(x)) == w
wo_scope = Dagger.ProcessScope(wo)
@test_throws_unwrap Dagger.ThunkFailedException fetch(Dagger.@spawn scope=wo_scope mutable_update!(x))
@test_throws_unwrap Dagger.DTaskFailedException fetch(Dagger.@spawn scope=wo_scope mutable_update!(x))
end
end # @testset "@mutable"

Expand Down
6 changes: 3 additions & 3 deletions test/processors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ end
end
@testset "Processor exhaustion" begin
opts = ThunkOptions(proclist=[OptOutProc])
@test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
@test_throws_unwrap Dagger.DTaskFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
opts = ThunkOptions(proclist=(proc)->false)
@test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
@test_throws_unwrap Dagger.DTaskFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
opts = ThunkOptions(proclist=nothing)
@test collect(delayed(sum; options=opts)([1,2,3])) == 6
end
Expand Down Expand Up @@ -89,7 +89,7 @@ end

@testset "Processor TLS accessor" begin
@everywhere function mythunk(x)
typeof(Dagger.thunk_processor())
typeof(Dagger.task_processor())
end
@test collect(delayed(mythunk)(1)) === ThreadProc
end
Expand Down
2 changes: 1 addition & 1 deletion test/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ end
@testset "allow errors" begin
opts = ThunkOptions(;allow_errors=true)
a = delayed(error; options=opts)("Test")
@test_throws_unwrap Dagger.ThunkFailedException collect(a)
@test_throws_unwrap Dagger.DTaskFailedException collect(a)
end
end

Expand Down
12 changes: 6 additions & 6 deletions test/scopes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

# Different nodes
for (ch1, ch2) in [(ns1_ch, ns2_ch), (ns2_ch, ns1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
end
@testset "Process Scope" begin
Expand All @@ -75,19 +75,19 @@

# Different process
for (ch1, ch2) in [(ps1_ch, ps2_ch), (ps2_ch, ps1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end

# Same process and node
@test fetch(Dagger.@spawn process_scope_test(ps1_ch, ns1_ch)) == wid1

# Different process and node
for (ch1, ch2) in [(ps1_ch, ns2_ch), (ns2_ch, ps1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
end
@testset "Exact Scope" begin
@everywhere exact_scope_test(ch...) = Dagger.thunk_processor()
@everywhere exact_scope_test(ch...) = Dagger.task_processor()
@test es1.parent.wid == wid1
@test es1.parent.parent.uuid == Dagger.system_uuid(wid1)
@test es2.parent.wid == wid2
Expand All @@ -104,14 +104,14 @@

# Different process, different processor
for (ch1, ch2) in [(es1_ch, es2_ch), (es2_ch, es1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end

# Same process, different processor
es1_2 = ExactScope(Dagger.ThreadProc(wid1, 2))
es1_2_ch = Dagger.tochunk(nothing, OSProc(), es1_2)
for (ch1, ch2) in [(es1_ch, es1_2_ch), (es1_2_ch, es1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
end
@testset "Union Scope" begin
Expand Down
Loading