forked from JuliaParallel/Dagger.jl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheager_thunk.jl
94 lines (87 loc) · 2.4 KB
/
eager_thunk.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"A future holding the result of a `Thunk`."
struct ThunkFuture
future::Future
end
ThunkFuture(x::Integer) = ThunkFuture(Future(x))
ThunkFuture() = ThunkFuture(Future())
Base.isready(t::ThunkFuture) = isready(t.future)
Base.wait(t::ThunkFuture) = Dagger.Sch.thunk_yield() do
wait(t.future)
end
function Base.fetch(t::ThunkFuture; proc=OSProc(), raw=false)
error, value = Dagger.Sch.thunk_yield() do
fetch(t.future)
end
if error
throw(value)
end
if raw
return value
else
return move(proc, value)
end
end
Base.put!(t::ThunkFuture, x; error=false) = put!(t.future, (error, x))
"""
Options(::NamedTuple)
Options(; kwargs...)
Options for thunks and the scheduler. See [Task Spawning](@ref) for more
information.
"""
struct Options
options::NamedTuple
end
Options(;options...) = Options((;options...))
Options(options...) = Options((;options...))
"""
EagerThunk
Returned from `spawn`/`@spawn` calls. Represents a task that is in the
scheduler, potentially ready to execute, executing, or finished executing. May
be `fetch`'d or `wait`'d on at any time.
"""
mutable struct EagerThunk
uid::UInt
future::ThunkFuture
finalizer_ref::DRef
thunk_ref::DRef
EagerThunk(uid, future, finalizer_ref) = new(uid, future, finalizer_ref)
end
Base.isready(t::EagerThunk) = isready(t.future)
function Base.wait(t::EagerThunk)
if !isdefined(t, :thunk_ref)
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `EagerThunk`"))
end
wait(t.future)
end
function Base.fetch(t::EagerThunk; raw=false)
if !isdefined(t, :thunk_ref)
throw(ConcurrencyViolationError("Cannot `fetch` an unlaunched `EagerThunk`"))
end
return fetch(t.future; raw)
end
function Base.show(io::IO, t::EagerThunk)
status = if isdefined(t, :thunk_ref)
isready(t) ? "finished" : "running"
else
"not launched"
end
print(io, "EagerThunk ($status)")
end
istask(t::EagerThunk) = true
"When finalized, cleans-up the associated `EagerThunk`."
mutable struct EagerThunkFinalizer
uid::UInt
function EagerThunkFinalizer(uid)
x = new(uid)
finalizer(Sch.eager_cleanup, x)
x
end
end
const EAGER_ID_COUNTER = Threads.Atomic{UInt64}(1)
function eager_next_id()
if myid() == 1
Threads.atomic_add!(EAGER_ID_COUNTER, one(UInt64))
else
remotecall_fetch(eager_next_id, 1)
end
end