Skip to content

Commit b378f11

Browse files
authored
Merge pull request #240 from JuliaParallel/jps/scopes
Add scopes mechanism to limit Chunk movement and restrict scheduling
2 parents 810ba04 + cd19088 commit b378f11

13 files changed

+549
-95
lines changed

Diff for: Project.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "Dagger"
22
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
3-
version = "0.12.0"
3+
version = "0.12.1"
44

55
[deps]
66
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
@@ -17,6 +17,7 @@ Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
1717
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
1818
TableOperations = "ab02a1b2-a7df-11e8-156e-fb1833f50b87"
1919
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
20+
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2021

2122
[compat]
2223
Colors = "0.10, 0.11, 0.12"

Diff for: docs/make.jl

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ makedocs(;
1515
"Home" => "index.md",
1616
"Processors" => "processors.md",
1717
"Checkpointing" => "checkpointing.md",
18+
"Scopes" => "scopes.md",
1819
"Dynamic Scheduler Control" => "dynamic.md",
1920
"Logging and Graphing" => "logging.md",
2021
"Benchmarking" => "benchmarking.md",

Diff for: docs/src/scopes.md

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# Scopes
2+
3+
Sometimes you will have data that is only meaningful in a certain location,
4+
such as within a single Julia process, a given server, or even for a specific
5+
Dagger processor. We call this location a "scope" in Dagger, denoting the
6+
bounds within which the data is meaningful and valid. For example, C pointers
7+
are typically scoped to a process, file paths are scoped to one or more servers
8+
dependent on filesystem configuration, etc. By default, Dagger doesn't
9+
recognize this; it treats everything passed into a task, or generated from a
10+
task, as inherently safe to transfer anywhere else. When this is not the case,
11+
Dagger provides optional scopes to instruct the scheduler where data is
12+
considered valid.
13+
14+
## Scope Basics
15+
16+
Let's take the example of a webcam handle generated by VideoIO.jl. This handle is a C pointer, and thus has process scope. We can open the handle on a given process, and set the scope of the resulting data to a `ProcessScope()`, which defaults to the current Julia process:
17+
18+
```julia
19+
using VideoIO
20+
21+
function get_handle()
22+
handle = VideoIO.opencamera()
23+
proc = Dagger.thunk_processor()
24+
scope = ProcessScope()
25+
return Dagger.tochunk(handle, proc, scope)
26+
end
27+
28+
cam_handle = Dagger.@spawn get_handle()
29+
```
30+
31+
Now, wherever `cam_handle` is passed, Dagger will ensure that any computations
32+
on the handle only happen within its defined scope. For example, we can read
33+
from the camera:
34+
35+
```julia
36+
cam_frame = Dagger.@spawn read(cam_handle)
37+
```
38+
39+
The `cam_frame` task is executed within any processor on the same process that
40+
the `cam_handle` task was executed on. Of course, the resulting camera frame is
41+
*not* scoped to anywhere specific (denoted as `AnyScope()`), and thus
42+
computations on it may execute anywhere.
43+
44+
Now, let's try out some other kinds of scopes, starting with `NodeScope`. This
45+
scope encompasses the server that one or more Julia processes may be running
46+
on. Say we want to use memory mapping (mmap) to more efficiently send arrays
47+
between two tasks. We can construct the mmap'd array in one task, attach a
48+
`NodeScope()` to it, and using the path of the mmap'd file to communicate its
49+
location, lock downstream tasks to the same server:
50+
51+
```julia
52+
using Mmap
53+
54+
function generate()
55+
path = "myfile.bin"
56+
arr = Mmap.mmap(path, Matrix{Int}, (64,64))
57+
fill!(arr, 1)
58+
Mmap.sync!(arr)
59+
Dagger.tochunk(path, Dagger.thunk_processor(), NodeScope())
60+
end
61+
62+
function consume(path)
63+
arr = Mmap.mmap(path, Matrix{Int}, (64,64))
64+
sum(arr)
65+
end
66+
67+
a = Dagger.@spawn generate()
68+
@assert fetch(Dagger.@spawn consume(a)) == 64*64
69+
```
70+
71+
Whatever server `a` executed on, `b` will also execute on!
72+
73+
Finally, we come to the "lowest" scope on the scope hierarchy, the
74+
`ExactScope`. This scope specifies one exact processor as the bounding scope,
75+
and is typically useful in certain limited cases. We won't provide an example
76+
here, because you don't usually need to ever use this scope, but if you already
77+
understand the `NodeScope` and `ProcessScope`, the `ExactScope` should be easy
78+
to figure out.
79+
80+
## Union Scopes
81+
82+
Sometimes one simple scope isn't enough! In that case, you can use the
83+
`UnionScope` to construct the union of two or more scopes. Say, for example,
84+
you have some sensitive data on your company's servers that you want to compute
85+
summaries of, but you'll be driving the computation from your laptop, and you
86+
aren't allowed to send the data itself outside of the company's network. You
87+
could accomplish this by constructing a `UnionScope` of `ProcessScope`s of each
88+
of the non-laptop Julia processes, and use that to ensure that the data in its
89+
original form always stays within the company network:
90+
91+
```julia
92+
addprocs(4) # some local processors
93+
procs = addprocs([("server.company.com", 4)]) # some company processors
94+
95+
secrets_scope = UnionScope(ProcessScope.(procs))
96+
97+
function generate_secrets()
98+
secrets = open("/shared/secret_results.txt", "r") do io
99+
String(read(io))
100+
end
101+
Dagger.tochunk(secrets, Dagger.thunk_processor(), secrets_scope)
102+
end
103+
104+
summarize(secrets) = occursin("QA Pass", secrets)
105+
106+
# Generate the data on the first company process
107+
sensitive_data = Dagger.@spawn single=first(procs) generate_secrets()
108+
109+
# We can safely call this, knowing that it will be executed on a company server
110+
qa_passed = Dagger.@spawn summarize(sensitive_data)
111+
```
112+
113+
## Mismatched Scopes
114+
115+
You might now be thinking, "What if I want to run a task on multiple pieces of
116+
data whose scopes don't match up?" In such a case, Dagger will throw an error,
117+
refusing to schedule that task, since the intersection of the data scopes is an
118+
empty set (there is no feasible processor which can satisfy the scoping
119+
constraints). For example:
120+
121+
```julia
122+
ps2 = ProcessScope(2)
123+
ps3 = ProcessScope(3)
124+
125+
generate(scope) = Dagger.tochunk(rand(64), Dagger.thunk_processor(), scope)
126+
127+
d2 = Dagger.@spawn generate(ps2) # Run on process 2
128+
d3 = Dagger.@spawn generate(ps3) # Run on process 3
129+
res = Dagger.@spawn d2 * d3 # An error!
130+
```
131+
132+
Moral of the story: only use scopes when you know you really need them, and if
133+
you aren't careful to arrange everything just right, be prepared for Dagger to
134+
refuse to schedule your tasks! Scopes should only be used to ensure correctness
135+
of your programs, and are *not* intended to be used to optimize the schedule
136+
that Dagger uses for your tasks, since restricting the scope of execution for
137+
tasks will necessarily reduce the optimizations that Dagger's scheduler can
138+
perform.

Diff for: src/Dagger.jl

+8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import Distributed: procs
1010
using LinearAlgebra
1111
import LinearAlgebra: transpose
1212

13+
using UUIDs
14+
1315
using Requires
1416

1517
const PLUGINS = Dict{Symbol,Any}()
@@ -22,11 +24,14 @@ include("lib/logging.jl")
2224

2325
# Distributed data
2426
include("processor.jl")
27+
include("scopes.jl")
2528
include("thunk.jl")
2629
include("chunks.jl")
2730

2831
# Task scheduling
2932
include("compute.jl")
33+
include("utils/clock.jl")
34+
include("utils/system_uuid.jl")
3035
include("sch/Sch.jl"); using .Sch
3136

3237
# Array computations
@@ -47,6 +52,9 @@ include("array/sort.jl")
4752
include("ui/graph.jl")
4853
include("table/dtable.jl")
4954
function __init__()
55+
# Initialize system UUID
56+
system_uuid()
57+
5058
@require Luxor="ae8d54c2-7ccd-5906-9d76-62fc9837b5bc" begin
5159
# Gantt chart renderer
5260
include("ui/gantt.jl")

Diff for: src/chunks.jl

+10-8
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,15 @@ processor to safely serialize the data to the calling worker.
4646
## Constructors
4747
See [`tochunk`](@ref).
4848
"""
49-
mutable struct Chunk{T, H, P<:Processor}
49+
mutable struct Chunk{T, H, P<:Processor, S<:AbstractScope}
5050
chunktype::Type{T}
5151
domain
5252
handle::H
5353
processor::P
54+
scope::S
5455
persist::Bool
55-
function (::Type{Chunk{T,H,P}})(::Type{T}, domain, handle, processor, persist) where {T,H,P}
56-
c = new{T,H,P}(T, domain, handle, processor, persist)
56+
function (::Type{Chunk{T,H,P,S}})(::Type{T}, domain, handle, processor, scope, persist) where {T,H,P,S}
57+
c = new{T,H,P,S}(T, domain, handle, processor, scope, persist)
5758
finalizer(x -> @async(myid() == 1 && free!(x)), c)
5859
c
5960
end
@@ -132,15 +133,15 @@ end
132133
133134
Create a chunk from sequential object `x` which resides on `proc`.
134135
"""
135-
function tochunk(x::X, proc::P=OSProc(); persist=false, cache=false) where {X,P}
136+
function tochunk(x::X, proc::P=OSProc(), scope::S=AnyScope(); persist=false, cache=false) where {X,P,S}
136137
ref = poolset(x, destroyonevict=persist ? false : cache)
137-
Chunk{X, typeof(ref), P}(X, domain(x), ref, proc, persist)
138+
Chunk{X,typeof(ref),P,S}(X, domain(x), ref, proc, scope, persist)
138139
end
139-
tochunk(x::Union{Chunk, Thunk}, proc=nothing) = x
140+
tochunk(x::Union{Chunk, Thunk}, proc=nothing, scope=nothing) = x
140141

141142
# Check to see if the node is set to persist
142143
# if it is foce can override it
143-
function free!(s::Chunk{X, DRef, P}; force=true, cache=false) where {X,P}
144+
function free!(s::Chunk{X,DRef,P,S}; force=true, cache=false) where {X,P,S}
144145
if force || !s.persist
145146
if cache
146147
try
@@ -160,7 +161,8 @@ function savechunk(data, dir, f)
160161
end
161162
fr = FileRef(f, sz)
162163
proc = OSProc()
163-
Chunk{typeof(data), typeof(fr), typeof(proc)}(typeof(data), domain(data), fr, proc, true)
164+
scope = AnyScope() # FIXME: Scoped to this node
165+
Chunk{typeof(data),typeof(fr),typeof(proc),typeof(scope)}(typeof(data), domain(data), fr, proc, scope, true)
164166
end
165167

166168

0 commit comments

Comments
 (0)