Skip to content

Commit 50fd192

Browse files
authored
Merge pull request #470 from JuliaParallel/jps/proc-utils
Add some processor utilities
2 parents 081200c + 05be02d commit 50fd192

File tree

9 files changed

+257
-181
lines changed

9 files changed

+257
-181
lines changed

Diff for: src/Dagger.jl

+5
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@ include("utils/locked-object.jl")
2828
include("utils/tasks.jl")
2929
include("options.jl")
3030
include("processor.jl")
31+
include("threadproc.jl")
32+
include("context.jl")
33+
include("utils/processors.jl")
34+
include("task-tls.jl")
3135
include("scopes.jl")
36+
include("utils/scopes.jl")
3237
include("eager_thunk.jl")
3338
include("queue.jl")
3439
include("thunk.jl")

Diff for: src/context.jl

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
"""
2+
Context(xs::Vector{OSProc}) -> Context
3+
Context(xs::Vector{Int}) -> Context
4+
5+
Create a Context, by default adding each available worker.
6+
7+
It is also possible to create a Context from a vector of [`OSProc`](@ref),
8+
or equivalently the underlying process ids can also be passed directly
9+
as a `Vector{Int}`.
10+
11+
Special fields include:
12+
- 'log_sink': A log sink object to use, if any.
13+
- `log_file::Union{String,Nothing}`: Path to logfile. If specified, at
14+
scheduler termination, logs will be collected, combined with input thunks, and
15+
written out in DOT format to this location.
16+
- `profile::Bool`: Whether or not to perform profiling with Profile stdlib.
17+
"""
18+
mutable struct Context
19+
procs::Vector{Processor}
20+
proc_lock::ReentrantLock
21+
proc_notify::Threads.Condition
22+
log_sink::Any
23+
log_file::Union{String,Nothing}
24+
profile::Bool
25+
options
26+
end
27+
28+
Context(procs::Vector{P}=Processor[OSProc(w) for w in procs()];
29+
proc_lock=ReentrantLock(), proc_notify=Threads.Condition(),
30+
log_sink=TimespanLogging.NoOpLog(), log_file=nothing, profile=false,
31+
options=nothing) where {P<:Processor} =
32+
Context(procs, proc_lock, proc_notify, log_sink, log_file,
33+
profile, options)
34+
Context(xs::Vector{Int}; kwargs...) = Context(map(OSProc, xs); kwargs...)
35+
Context(ctx::Context, xs::Vector=copy(procs(ctx))) = # make a copy
36+
Context(xs; log_sink=ctx.log_sink, log_file=ctx.log_file,
37+
profile=ctx.profile, options=ctx.options)
38+
39+
const GLOBAL_CONTEXT = Ref{Context}()
40+
function global_context()
41+
if !isassigned(GLOBAL_CONTEXT)
42+
GLOBAL_CONTEXT[] = Context()
43+
end
44+
return GLOBAL_CONTEXT[]
45+
end
46+
47+
"""
48+
lock(f, ctx::Context)
49+
50+
Acquire `ctx.proc_lock`, execute `f` with the lock held, and release the lock
51+
when `f` returns.
52+
"""
53+
Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock)
54+
55+
"""
56+
procs(ctx::Context)
57+
58+
Fetch the list of procs currently known to `ctx`.
59+
"""
60+
procs(ctx::Context) = lock(ctx) do
61+
copy(ctx.procs)
62+
end
63+
64+
"""
65+
addprocs!(ctx::Context, xs)
66+
67+
Add new workers `xs` to `ctx`.
68+
69+
Workers will typically be assigned new tasks in the next scheduling iteration
70+
if scheduling is ongoing.
71+
72+
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
73+
"""
74+
addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs))
75+
function addprocs!(ctx::Context, xs::AbstractVector{<:OSProc})
76+
lock(ctx) do
77+
append!(ctx.procs, xs)
78+
end
79+
lock(ctx.proc_notify) do
80+
notify(ctx.proc_notify)
81+
end
82+
end
83+
84+
"""
85+
rmprocs!(ctx::Context, xs)
86+
87+
Remove the specified workers `xs` from `ctx`.
88+
89+
Workers will typically finish all their assigned tasks if scheduling is ongoing
90+
but will not be assigned new tasks after removal.
91+
92+
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
93+
"""
94+
rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs))
95+
function rmprocs!(ctx::Context, xs::AbstractVector{<:OSProc})
96+
lock(ctx) do
97+
filter!(p -> (p xs), ctx.procs)
98+
end
99+
lock(ctx.proc_notify) do
100+
notify(ctx.proc_notify)
101+
end
102+
end

Diff for: src/processor.jl

-181
Original file line numberDiff line numberDiff line change
@@ -146,184 +146,3 @@ iscompatible_arg(proc::OSProc, opts, args...) =
146146
any(child->
147147
all(arg->iscompatible_arg(child, opts, arg), args),
148148
children(proc))
149-
150-
"""
151-
ThreadProc <: Processor
152-
153-
Julia CPU (OS) thread, identified by Julia thread ID.
154-
"""
155-
struct ThreadProc <: Processor
156-
owner::Int
157-
tid::Int
158-
end
159-
iscompatible(proc::ThreadProc, opts, f, args...) = true
160-
iscompatible_func(proc::ThreadProc, opts, f) = true
161-
iscompatible_arg(proc::ThreadProc, opts, x) = true
162-
function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...); @nospecialize(kwargs...))
163-
tls = get_tls()
164-
task = Task() do
165-
set_tls!(tls)
166-
TimespanLogging.prof_task_put!(tls.sch_handle.thunk_id.id)
167-
@invokelatest f(args...; kwargs...)
168-
end
169-
set_task_tid!(task, proc.tid)
170-
schedule(task)
171-
try
172-
fetch(task)
173-
catch err
174-
@static if VERSION < v"1.7-rc1"
175-
stk = Base.catch_stack(task)
176-
else
177-
stk = Base.current_exceptions(task)
178-
end
179-
err, frames = stk[1]
180-
rethrow(CapturedException(err, frames))
181-
end
182-
end
183-
get_parent(proc::ThreadProc) = OSProc(proc.owner)
184-
default_enabled(proc::ThreadProc) = true
185-
186-
# TODO: ThreadGroupProc?
187-
188-
"""
189-
Context(xs::Vector{OSProc}) -> Context
190-
Context(xs::Vector{Int}) -> Context
191-
192-
Create a Context, by default adding each available worker.
193-
194-
It is also possible to create a Context from a vector of [`OSProc`](@ref),
195-
or equivalently the underlying process ids can also be passed directly
196-
as a `Vector{Int}`.
197-
198-
Special fields include:
199-
- 'log_sink': A log sink object to use, if any.
200-
- `log_file::Union{String,Nothing}`: Path to logfile. If specified, at
201-
scheduler termination, logs will be collected, combined with input thunks, and
202-
written out in DOT format to this location.
203-
- `profile::Bool`: Whether or not to perform profiling with Profile stdlib.
204-
"""
205-
mutable struct Context
206-
procs::Vector{Processor}
207-
proc_lock::ReentrantLock
208-
proc_notify::Threads.Condition
209-
log_sink::Any
210-
log_file::Union{String,Nothing}
211-
profile::Bool
212-
options
213-
end
214-
215-
Context(procs::Vector{P}=Processor[OSProc(w) for w in procs()];
216-
proc_lock=ReentrantLock(), proc_notify=Threads.Condition(),
217-
log_sink=TimespanLogging.NoOpLog(), log_file=nothing, profile=false,
218-
options=nothing) where {P<:Processor} =
219-
Context(procs, proc_lock, proc_notify, log_sink, log_file,
220-
profile, options)
221-
Context(xs::Vector{Int}; kwargs...) = Context(map(OSProc, xs); kwargs...)
222-
Context(ctx::Context, xs::Vector=copy(procs(ctx))) = # make a copy
223-
Context(xs; log_sink=ctx.log_sink, log_file=ctx.log_file,
224-
profile=ctx.profile, options=ctx.options)
225-
226-
const GLOBAL_CONTEXT = Ref{Context}()
227-
function global_context()
228-
if !isassigned(GLOBAL_CONTEXT)
229-
GLOBAL_CONTEXT[] = Context()
230-
end
231-
return GLOBAL_CONTEXT[]
232-
end
233-
234-
"""
235-
lock(f, ctx::Context)
236-
237-
Acquire `ctx.proc_lock`, execute `f` with the lock held, and release the lock
238-
when `f` returns.
239-
"""
240-
Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock)
241-
242-
"""
243-
procs(ctx::Context)
244-
245-
Fetch the list of procs currently known to `ctx`.
246-
"""
247-
procs(ctx::Context) = lock(ctx) do
248-
copy(ctx.procs)
249-
end
250-
251-
"""
252-
addprocs!(ctx::Context, xs)
253-
254-
Add new workers `xs` to `ctx`.
255-
256-
Workers will typically be assigned new tasks in the next scheduling iteration
257-
if scheduling is ongoing.
258-
259-
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
260-
"""
261-
addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs))
262-
function addprocs!(ctx::Context, xs::AbstractVector{<:OSProc})
263-
lock(ctx) do
264-
append!(ctx.procs, xs)
265-
end
266-
lock(ctx.proc_notify) do
267-
notify(ctx.proc_notify)
268-
end
269-
end
270-
271-
"""
272-
rmprocs!(ctx::Context, xs)
273-
274-
Remove the specified workers `xs` from `ctx`.
275-
276-
Workers will typically finish all their assigned tasks if scheduling is ongoing
277-
but will not be assigned new tasks after removal.
278-
279-
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
280-
"""
281-
rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs))
282-
function rmprocs!(ctx::Context, xs::AbstractVector{<:OSProc})
283-
lock(ctx) do
284-
filter!(p -> (p xs), ctx.procs)
285-
end
286-
lock(ctx.proc_notify) do
287-
notify(ctx.proc_notify)
288-
end
289-
end
290-
291-
# In-Thunk Helpers
292-
293-
"""
294-
thunk_processor()
295-
296-
Get the current processor executing the current thunk.
297-
"""
298-
thunk_processor() = task_local_storage(:_dagger_processor)::Processor
299-
300-
"""
301-
in_thunk()
302-
303-
Returns `true` if currently in a [`Thunk`](@ref) process, else `false`.
304-
"""
305-
in_thunk() = haskey(task_local_storage(), :_dagger_sch_uid)
306-
307-
"""
308-
get_tls()
309-
310-
Gets all Dagger TLS variable as a `NamedTuple`.
311-
"""
312-
get_tls() = (
313-
sch_uid=task_local_storage(:_dagger_sch_uid),
314-
sch_handle=task_local_storage(:_dagger_sch_handle),
315-
processor=thunk_processor(),
316-
task_spec=task_local_storage(:_dagger_task_spec),
317-
)
318-
319-
"""
320-
set_tls!(tls)
321-
322-
Sets all Dagger TLS variables from the `NamedTuple` `tls`.
323-
"""
324-
function set_tls!(tls)
325-
task_local_storage(:_dagger_sch_uid, tls.sch_uid)
326-
task_local_storage(:_dagger_sch_handle, tls.sch_handle)
327-
task_local_storage(:_dagger_processor, tls.processor)
328-
task_local_storage(:_dagger_task_spec, tls.task_spec)
329-
end

Diff for: src/task-tls.jl

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# In-Thunk Helpers
2+
3+
"""
4+
thunk_processor()
5+
6+
Get the current processor executing the current thunk.
7+
"""
8+
thunk_processor() = task_local_storage(:_dagger_processor)::Processor
9+
10+
"""
11+
in_thunk()
12+
13+
Returns `true` if currently in a [`Thunk`](@ref) process, else `false`.
14+
"""
15+
in_thunk() = haskey(task_local_storage(), :_dagger_sch_uid)
16+
17+
"""
18+
get_tls()
19+
20+
Gets all Dagger TLS variable as a `NamedTuple`.
21+
"""
22+
get_tls() = (
23+
sch_uid=task_local_storage(:_dagger_sch_uid),
24+
sch_handle=task_local_storage(:_dagger_sch_handle),
25+
processor=thunk_processor(),
26+
task_spec=task_local_storage(:_dagger_task_spec),
27+
)
28+
29+
"""
30+
set_tls!(tls)
31+
32+
Sets all Dagger TLS variables from the `NamedTuple` `tls`.
33+
"""
34+
function set_tls!(tls)
35+
task_local_storage(:_dagger_sch_uid, tls.sch_uid)
36+
task_local_storage(:_dagger_sch_handle, tls.sch_handle)
37+
task_local_storage(:_dagger_processor, tls.processor)
38+
task_local_storage(:_dagger_task_spec, tls.task_spec)
39+
end

Diff for: src/threadproc.jl

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""
2+
ThreadProc <: Processor
3+
4+
Julia CPU (OS) thread, identified by Julia thread ID.
5+
"""
6+
struct ThreadProc <: Processor
7+
owner::Int
8+
tid::Int
9+
end
10+
iscompatible(proc::ThreadProc, opts, f, args...) = true
11+
iscompatible_func(proc::ThreadProc, opts, f) = true
12+
iscompatible_arg(proc::ThreadProc, opts, x) = true
13+
function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...); @nospecialize(kwargs...))
14+
tls = get_tls()
15+
task = Task() do
16+
set_tls!(tls)
17+
TimespanLogging.prof_task_put!(tls.sch_handle.thunk_id.id)
18+
@invokelatest f(args...; kwargs...)
19+
end
20+
set_task_tid!(task, proc.tid)
21+
schedule(task)
22+
try
23+
fetch(task)
24+
catch err
25+
@static if VERSION < v"1.7-rc1"
26+
stk = Base.catch_stack(task)
27+
else
28+
stk = Base.current_exceptions(task)
29+
end
30+
err, frames = stk[1]
31+
rethrow(CapturedException(err, frames))
32+
end
33+
end
34+
get_parent(proc::ThreadProc) = OSProc(proc.owner)
35+
default_enabled(proc::ThreadProc) = true
36+
37+
# TODO: ThreadGroupProc?
38+

Diff for: src/utils/processors.jl

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Processor utilities
2+
3+
"""
4+
all_processors(ctx::Context=Sch.eager_context()) -> Set{Processor}
5+
6+
Returns the set of all processors available to the scheduler, across all
7+
Distributed workers.
8+
"""
9+
function all_processors(ctx::Context=Sch.eager_context())
10+
all_procs = Set{Processor}()
11+
for gproc in procs(ctx)
12+
for proc in get_processors(gproc)
13+
push!(all_procs, proc)
14+
end
15+
end
16+
return all_procs
17+
end

0 commit comments

Comments
 (0)