-
-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathscopes.jl
266 lines (223 loc) · 11.7 KB
/
scopes.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@testset "Chunk Scopes" begin
wid1, wid2 = addprocs(2, exeflags=["-t 2"])
@everywhere [wid1,wid2] using Dagger
Dagger.addprocs!(Dagger.Sch.eager_context(), [wid1,wid2])
fetch(Dagger.@spawn 1+1) # Force scheduler to pick up new workers
# Tests run locally
@test Dagger.system_uuid(myid()) == Dagger.system_uuid(wid2)
@test Dagger.system_uuid(wid1) == Dagger.system_uuid(wid2)
# Test different nodes by creating fake UUIDs
wid1_uuid, wid2_uuid = uuid4(), uuid4()
@everywhere Dagger.SYSTEM_UUIDS[$wid1] = $wid1_uuid
@everywhere Dagger.SYSTEM_UUIDS[$wid2] = $wid2_uuid
# Emulate running "remotely"
@test Dagger.system_uuid(wid1) == wid1_uuid
@test Dagger.system_uuid(wid2) == wid2_uuid
@test Dagger.system_uuid(myid()) != Dagger.system_uuid(wid2)
@test Dagger.system_uuid(wid1) != Dagger.system_uuid(wid2)
ns1, ns2 = NodeScope(wid1_uuid), NodeScope(wid2_uuid)
ns1_ch = Dagger.tochunk(nothing, OSProc(), ns1)
ns2_ch = Dagger.tochunk(nothing, OSProc(), ns2)
ps1, ps2 = ProcessScope(wid1), ProcessScope(wid2)
ps1_ch = Dagger.tochunk(nothing, OSProc(), ps1)
ps2_ch = Dagger.tochunk(nothing, OSProc(), ps2)
es1, es2 = ExactScope(Dagger.ThreadProc(wid1, 1)), ExactScope(Dagger.ThreadProc(wid2, 2))
es1_ch = Dagger.tochunk(nothing, OSProc(), es1)
es2_ch = Dagger.tochunk(nothing, OSProc(), es2)
os1 = ExactScope(OSProc(1))
@testset "Default Scope" begin
ds = DefaultScope()
for (s1, s2) in ((ds, es1), (es1, ds))
@test Dagger.constrain(s1, s2) == es1
end
for (s1, s2) in ((ds, os1), (os1, ds))
@test Dagger.constrain(s1, s2) isa Dagger.InvalidScope
end
end
@testset "Node Scope" begin
@everywhere node_scope_test(ch...) = Dagger.system_uuid()
# One node
ts = fetch.([Dagger.@spawn node_scope_test(isodd(i) ? ns1_ch : ns2_ch) for i in 1:20])
@test all(x->x==wid1_uuid, ts[1:2:20])
@test all(x->x==wid2_uuid, ts[2:2:20])
# Same node
t = fetch(Dagger.@spawn node_scope_test(ns1_ch, ns1_ch))
@test t == wid1_uuid
# Different nodes
for (ch1, ch2) in [(ns1_ch, ns2_ch), (ns2_ch, ns1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
end
@testset "Process Scope" begin
@everywhere process_scope_test(ch...) = myid()
@test ps1.parent.uuid == Dagger.system_uuid(wid1)
@test ps2.parent.uuid == Dagger.system_uuid(wid2)
# One process
ts = fetch.([Dagger.@spawn process_scope_test(isodd(i) ? ps1_ch : ps2_ch) for i in 1:20])
@test all(x->x==wid1, ts[1:2:20])
@test all(x->x==wid2, ts[2:2:20])
# Same process
t = fetch(Dagger.@spawn process_scope_test(ps1_ch, ps1_ch))
@test t == wid1
# Different process
for (ch1, ch2) in [(ps1_ch, ps2_ch), (ps2_ch, ps1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
# Same process and node
@test fetch(Dagger.@spawn process_scope_test(ps1_ch, ns1_ch)) == wid1
# Different process and node
for (ch1, ch2) in [(ps1_ch, ns2_ch), (ns2_ch, ps1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
end
@testset "Exact Scope" begin
@everywhere exact_scope_test(ch...) = Dagger.thunk_processor()
@test es1.parent.wid == wid1
@test es1.parent.parent.uuid == Dagger.system_uuid(wid1)
@test es2.parent.wid == wid2
@test es2.parent.parent.uuid == Dagger.system_uuid(wid2)
# One process
ts = fetch.([Dagger.@spawn exact_scope_test(isodd(i) ? es1_ch : es2_ch) for i in 1:20])
@test all(x->x.owner==wid1, ts[1:2:20])
@test all(x->x.owner==wid2&&x.tid==2, ts[2:2:20])
# Same process
t = fetch(Dagger.@spawn exact_scope_test(es1_ch, es1_ch))
@test t.owner == wid1 && t.tid == 1
# Different process, different processor
for (ch1, ch2) in [(es1_ch, es2_ch), (es2_ch, es1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
# Same process, different processor
es1_2 = ExactScope(Dagger.ThreadProc(wid1, 2))
es1_2_ch = Dagger.tochunk(nothing, OSProc(), es1_2)
for (ch1, ch2) in [(es1_ch, es1_2_ch), (es1_2_ch, es1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
end
@testset "Union Scope" begin
# One inner scope
us_es1_ch = Dagger.tochunk(nothing, OSProc(), UnionScope(es1))
@test fetch(Dagger.@spawn exact_scope_test(us_es1_ch)) == es1.processor
# Multiple redundant inner scopes
us_es1_multi_ch = Dagger.tochunk(nothing, OSProc(), UnionScope(es1, es1))
@test fetch(Dagger.@spawn exact_scope_test(us_es1_multi_ch)) == es1.processor
# No inner scopes
@test UnionScope() isa UnionScope
# Same inner scope
@test fetch(Dagger.@spawn exact_scope_test(us_es1_ch, us_es1_ch)) == es1.processor
# Extra unmatched inner scope
us_es1_es2_ch = Dagger.tochunk(nothing, OSProc(), UnionScope(es1, es2))
for (ch1, ch2) in [(us_es1_ch, us_es1_es2_ch), (us_es1_es2_ch, us_es1_ch)]
@test fetch(Dagger.@spawn exact_scope_test(ch1, ch2)) == es1.processor
end
us_res = Dagger.constrain(UnionScope(es1, es2), UnionScope(es1))
@test us_res isa UnionScope
@test es1 in us_res.scopes
@test !(es2 in us_res.scopes)
end
@testset "Processor Type Scope" begin
pts_th = ProcessorTypeScope(Dagger.ThreadProc)
pts_os = ProcessorTypeScope(Dagger.OSProc)
@test Dagger.constrain(pts_th, es1) == es1
@test Dagger.constrain(pts_th, os1) isa Dagger.InvalidScope
@test Dagger.constrain(pts_os, es1) isa Dagger.InvalidScope
@test Dagger.constrain(pts_os, os1) == os1
# Duplicate
pts_th_dup = Dagger.constrain(pts_th, pts_th)
@test Dagger.constrain(pts_th_dup, es1) == es1
@test Dagger.constrain(pts_th_dup, os1) isa Dagger.InvalidScope
# Empty intersection
pts_all = Dagger.constrain(pts_th, pts_os)
@test Dagger.constrain(pts_all, es1) isa Dagger.InvalidScope
@test Dagger.constrain(pts_all, os1) isa Dagger.InvalidScope
end
# TODO: Test scope propagation
@testset "scope helper" begin
@test Dagger.scope(:any) isa AnyScope
@test Dagger.scope(:default) == DefaultScope()
@test_throws ArgumentError Dagger.scope(:blah)
@test Dagger.scope(()) == UnionScope()
@test Dagger.scope(worker=wid1) ==
Dagger.scope(workers=[wid1]) ==
ProcessScope(wid1)
@test Dagger.scope(workers=[wid1,wid2]) == UnionScope([ProcessScope(wid1),
ProcessScope(wid2)])
@test Dagger.scope(workers=[]) == UnionScope()
@test Dagger.scope(thread=1) ==
Dagger.scope(threads=[1]) ==
UnionScope([ExactScope(Dagger.ThreadProc(w,1)) for w in procs()])
@test Dagger.scope(threads=[1,2]) == UnionScope([ExactScope(Dagger.ThreadProc(w,t)) for t in [1,2] for w in procs()])
@test Dagger.scope(threads=[]) == UnionScope()
@test Dagger.scope(worker=wid1,thread=1) ==
Dagger.scope(thread=1,worker=wid1) ==
Dagger.scope(workers=[wid1],thread=1) ==
Dagger.scope(worker=wid1,threads=[1]) ==
Dagger.scope(workers=[wid1],threads=[1]) ==
ExactScope(Dagger.ThreadProc(wid1,1))
@test_throws ArgumentError Dagger.scope(blah=1)
@test_throws ArgumentError Dagger.scope(thread=1, blah=1)
@test Dagger.scope(worker=1,thread=1) ==
Dagger.scope((worker=1,thread=1)) ==
Dagger.scope(((worker=1,thread=1),))
@test Dagger.scope((worker=1,thread=1),(worker=wid1,thread=2)) ==
Dagger.scope(((worker=1,thread=1),(worker=wid1,thread=2),)) ==
Dagger.scope(((worker=1,thread=1),), ((worker=wid1,thread=2),)) ==
UnionScope([ExactScope(Dagger.ThreadProc(1, 1)),
ExactScope(Dagger.ThreadProc(wid1, 2))])
@test_throws ArgumentError Dagger.scope((;blah=1))
@test_throws ArgumentError Dagger.scope((thread=1, blah=1))
@testset "custom handler" begin
@eval begin
Dagger.scope_key_precedence(::Val{:gpu}) = 1
Dagger.scope_key_precedence(::Val{:rocm}) = 2
Dagger.scope_key_precedence(::Val{:cuda}) = 2
# Some fake scopes to use as sentinels
Dagger.to_scope(::Val{:gpu}, sc::NamedTuple) = ExactScope(Dagger.ThreadProc(1, sc.device))
Dagger.to_scope(::Val{:rocm}, sc::NamedTuple) = ExactScope(Dagger.ThreadProc($wid1, sc.gpu))
Dagger.to_scope(::Val{:cuda}, sc::NamedTuple) = ExactScope(Dagger.ThreadProc($wid2, sc.gpu))
end
@test Dagger.scope(gpu=1,device=2) ==
Dagger.scope(device=2,gpu=1) ==
Dagger.scope(gpu=1,device=2,blah=3) ==
Dagger.scope((gpu=1,device=2,blah=3)) ==
ExactScope(Dagger.ThreadProc(1, 2))
@test Dagger.scope((gpu=1,device=2),(worker=1,thread=1)) ==
Dagger.scope((worker=1,thread=1),(device=2,gpu=1)) ==
UnionScope([ExactScope(Dagger.ThreadProc(1, 2)),
ExactScope(Dagger.ThreadProc(1, 1))])
@test Dagger.scope((gpu=1,device=2),(device=3,gpu=1)) ==
UnionScope([ExactScope(Dagger.ThreadProc(1, 2)),
ExactScope(Dagger.ThreadProc(1, 3))])
@test Dagger.scope(rocm=1,gpu=2) ==
Dagger.scope(gpu=2,rocm=1) ==
ExactScope(Dagger.ThreadProc(wid1, 2))
@test Dagger.scope(cuda=1,gpu=2) ==
Dagger.scope(gpu=2,cuda=1) ==
ExactScope(Dagger.ThreadProc(wid2, 2))
@test_throws ArgumentError Dagger.scope(rocm=1,cuda=1,gpu=2)
@test_throws ArgumentError Dagger.scope(gpu=2,rocm=1,cuda=1)
@test_throws ArgumentError Dagger.scope((rocm=1,cuda=1,gpu=2))
end
end
@testset "compatible_processors" begin
scope = Dagger.scope(workers=[])
comp_procs = Dagger.compatible_processors(scope)
@test !any(proc->proc in comp_procs, Dagger.get_processors(OSProc(wid1)))
@test !any(proc->proc in comp_procs, Dagger.get_processors(OSProc(wid2)))
scope = Dagger.scope(worker=wid1)
comp_procs = Dagger.compatible_processors(scope)
@test all(proc->proc in comp_procs, Dagger.get_processors(OSProc(wid1)))
@test !any(proc->proc in comp_procs, Dagger.get_processors(OSProc(wid2)))
scope = Dagger.scope(worker=wid1, thread=2)
comp_procs = Dagger.compatible_processors(scope)
@test length(comp_procs) == 1
@test !all(proc->proc in comp_procs, Dagger.get_processors(OSProc(wid1)))
@test !all(proc->proc in comp_procs, Dagger.get_processors(OSProc(wid2)))
@test Dagger.ThreadProc(wid1, 2) in comp_procs
scope = Dagger.scope(workers=[wid1, wid2])
comp_procs = Dagger.compatible_processors(scope)
@test all(proc->proc in comp_procs, Dagger.get_processors(OSProc(wid1)))
@test all(proc->proc in comp_procs, Dagger.get_processors(OSProc(wid2)))
end
rmprocs([wid1, wid2])
end