Skip to content

Commit 77a4461

Browse files
authored
Merge pull request #524 from JuliaParallel/jps/datadeps-dev
Datadeps: Add aliasing awareness and GPU support
2 parents 66ba6a9 + f8b4fd5 commit 77a4461

File tree

14 files changed

+1663
-443
lines changed

14 files changed

+1663
-443
lines changed

Diff for: docs/src/datadeps.md

+64-3
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
For many programs, the restriction that tasks cannot write to their arguments
44
feels overly restrictive and makes certain kinds of programs (such as in-place
55
linear algebra) hard to express efficiently in Dagger. Thankfully, there is a
6-
solution: `spawn_datadeps`. This function constructs a "datadeps region",
6+
solution called "Datadeps" (short for "data dependencies"), accessible through
7+
the `spawn_datadeps` function. This function constructs a "datadeps region",
78
within which tasks are allowed to write to their arguments, with parallelism
8-
controlled via dependencies specified via argument annotations. Let's look at
9-
a simple example to make things concrete:
9+
controlled via dependencies specified via argument annotations. Let's look at a
10+
simple example to make things concrete:
1011

1112
```julia
1213
A = rand(1000)
@@ -94,3 +95,63 @@ Additionally, we can notice a powerful feature of this model - if the
9495
runs sequentially. This means that the structure of the program doesn't have to
9596
change in order to use Dagger for parallelization, which can make applying
9697
Dagger to existing algorithms quite effortless.
98+
99+
## Aliasing Support
100+
101+
Datadeps is smart enough to detect when two arguments from different tasks
102+
actually access the same memory (we say that these arguments "alias"). There's
103+
the obvious case where the two arguments are exactly the same object, but
104+
Datadeps is also aware of more subtle cases, such as when two arguments are
105+
different views into the same array, or where two arrays point to the same
106+
underlying memory. In these cases, Datadeps will ensure that the tasks are
107+
executed in the correct order - if one task writes to an argument which aliases
108+
with an argument read by another task, those two tasks will be executed in
109+
sequence, rather than in parallel.
110+
111+
There are two ways to specify aliasing to Datadeps. The simplest way is the most straightforward: if the argument passed to a task is a view or another supported object (such as an `UpperTriangular`-wrapped array), Datadeps will compare it with all other task's arguments to determine if they alias. This works great when you want to pass that view or `UpperTriangular` object directly to the called function. For example:
112+
113+
```julia
114+
A = rand(1000)
115+
A_l = view(A, 1:500)
116+
A_r = view(A, 501:1000)
117+
118+
# inc! supports views, so we can pass A_l and A_r directly
119+
inc!(X) = X .+= 1
120+
121+
Dagger.spawn_datadeps() do
122+
# These two tasks don't alias, so they can run in parallel
123+
Dagger.@spawn inc!(InOut(A_l))
124+
Dagger.@spawn inc!(InOut(A_r))
125+
126+
# This task aliases with the previous two, so it will run after them
127+
Dagger.@spawn inc!(InOut(A))
128+
end
129+
```
130+
131+
The other way allows you to seperate what argument is passed to the function,
132+
from how that argument is accessed within the function. This is done with the
133+
`Deps` wrapper, which is used like so:
134+
135+
```julia
136+
A = rand(1000, 1000)
137+
138+
inc_upper!(X) = UpperTriangular(X) .+= 1
139+
inc_ulower!(X) = UnitLowerTriangular(X) .+= 1
140+
inc_diag!(X) = X[diagind(X)] .+= 1
141+
142+
Dagger.spawn_datadeps() do
143+
# These two tasks don't alias, so they can run in parallel
144+
Dagger.@spawn inc_upper!(Deps(A, InOut(UpperTriangular)))
145+
Dagger.@spawn inc_ulower!(Deps(A, InOut(UnitLowerTriangular)))
146+
147+
# This task aliases with the `inc_upper!` task (`UpperTriangular` accesses the diagonal of the array)
148+
Dagger.@spawn inc_diag!(Deps(A, InOut(Diagonal)))
149+
end
150+
```
151+
152+
You can pass any number of aliasing modifiers to `Deps`. This is particularly
153+
useful for declaring aliasing with `Diagonal`, `Bidiagonal`, `Tridiagonal`, and
154+
`SymTridiagonal` access, as these "wrappers" make a copy of their parent array
155+
and thus can't be used to "mask" access to the parent like `UpperTriangular`
156+
and `UnitLowerTriangular` can (which is valuable for writing memory-efficient,
157+
generic algorithms in Julia).

Diff for: ext/LuxorExt.jl

+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
module LuxorExt
2+
3+
if isdefined(Base, :get_extension)
4+
using Luxor
5+
else
6+
using ..Luxor
7+
end
8+
9+
import Dagger
10+
import Dagger: Chunk, Processor
11+
import Dagger.TimespanLogging: Timespan
12+
13+
import .Luxor: Drawing, finish, Point, background, sethue, fontsize, rect, text
14+
15+
function proclt(p1::T, p2::R) where {T,R}
16+
if p1.owner != p2.owner
17+
return p1.owner < p2.owner
18+
else
19+
return repr(T) < repr(R)
20+
end
21+
end
22+
function proclt(p1::T, p2::T) where {T}
23+
if p1.owner != p2.owner
24+
return p1.owner < p2.owner
25+
else
26+
for field in fieldnames(T)
27+
f1 = getfield(p1, field)
28+
f2 = getfield(p2, field)
29+
if f1 != f2
30+
return f1 < f2
31+
end
32+
end
33+
end
34+
false
35+
end
36+
proclt(p1::Dagger.OSProc, p2::Dagger.OSProc) = p1.pid < p2.pid
37+
proclt(p1::Dagger.OSProc, p2) = p1.pid < p2.owner
38+
proclt(p1, p2::Dagger.OSProc) = p1.owner < p2.pid
39+
40+
function update_window_logs!(window_logs, logs; root_time, window_start)
41+
if !isempty(logs)
42+
for id in keys(logs)
43+
append!(window_logs, map(x->(x,), filter(x->x.category==:compute||x.category==:scheduler_init, logs[id])))
44+
end
45+
end
46+
for idx in length(window_logs):-1:1
47+
log = window_logs[idx]
48+
if length(log) == 2
49+
# Clear out finished events older than window start
50+
log_finish_s = (log[2].timestamp-root_time)/(1000^3)
51+
if log_finish_s < window_start
52+
@debug "Gantt: Deleted event"
53+
deleteat!(window_logs, idx)
54+
end
55+
elseif log[1] isa Dagger.Event{:finish}
56+
# Pair finish events with start events
57+
sidx = findfirst(x->length(x) == 1 &&
58+
x[1] isa Dagger.Event{:start} &&
59+
x[1].id==log[1].id, window_logs)
60+
if sidx === nothing
61+
@debug "Gantt: Removed unpaired finish"
62+
deleteat!(window_logs, idx)
63+
continue
64+
end
65+
window_logs[sidx] = (window_logs[sidx][1], log[1])
66+
@debug "Gantt: Paired event"
67+
deleteat!(window_logs, idx)
68+
end
69+
end
70+
end
71+
function Dagger.render_plan(logs::Dict, ::Val{:luxor_gantt}; delay=2, width=1000, height=640, window_length=20)
72+
root_time = time_ns()
73+
window_logs = []
74+
if window_length !== nothing
75+
window_start = -window_length
76+
else
77+
window_start = 0
78+
end
79+
window_finish = 0
80+
procs = Dagger.all_processors()
81+
if (height/length(procs)) < 50
82+
height = length(procs) * 50
83+
@warn "SVG height too small; resizing to $height pixels"
84+
end
85+
86+
update_window_logs!(window_logs, logs; root_time=root_time, window_start=window_start)
87+
isempty(window_logs) && return
88+
89+
for proc in unique(map(x->x[1].timeline[2], filter(x->x[1].category==:compute, window_logs)))
90+
push!(procs, proc)
91+
end
92+
colors = Colors.distinguishable_colors(length(procs))
93+
procs_len = length(procs)
94+
proc_height = height/(3procs_len)
95+
@debug "Gantt: Start"
96+
if isfile(svg_path)
97+
Drawing(width, height, svg_path)
98+
else
99+
Drawing(width, height, joinpath(svg_path, repr(image_idx) * ".svg"))
100+
end
101+
background("white")
102+
for (proc_idx, proc) in enumerate(sort(collect(procs); lt=proclt))
103+
ypos = (proc_idx-0.5)*(height/procs_len)
104+
sethue("grey15")
105+
fontsize(round(Int,proc_height))
106+
text(getname(proc), Point(width/2,ypos-(proc_height/2)))
107+
rect(Point(1,ypos+(proc_height/3)),width-2,proc_height,:stroke)
108+
fontsize(8)
109+
text("$(window_start) s", Point(0,ypos); halign=:left)
110+
text("$(window_finish) s", Point(width-8,ypos); halign=:right)
111+
proc_color = colors[proc_idx]
112+
for log in filter(x->x[1].timeline[2]==proc, filter(x->x[1].category==:compute, window_logs))
113+
length(log) == 1 && log[1] isa Dagger.Event{:finish} && error("Unpaired finish!")
114+
log_start_s = (log[1].timestamp-root_time)/(1000^3)
115+
log_finish_s = if length(log) == 2
116+
log_finish_s = (log[2].timestamp-root_time)/(1000^3)
117+
else
118+
window_finish+1
119+
end
120+
xstart = ((log_start_s-window_start)/(window_finish-window_start))*width
121+
xfinish = ((log_finish_s-window_start)/(window_finish-window_start))*width
122+
sethue(proc_color)
123+
rect(Point(xstart,ypos+(proc_height/3)+1),xfinish-xstart,proc_height-2,:fill)
124+
sethue("black")
125+
rect(Point(xstart,ypos+(proc_height/3)+1),xfinish-xstart,proc_height-2,:stroke)
126+
end
127+
for log in filter(x->x[1].category==:scheduler_init, window_logs)
128+
log_start_s = (log[1].timestamp-root_time)/(1000^3)
129+
log_finish_s = if length(log) == 2
130+
log_finish_s = (log[2].timestamp-root_time)/(1000^3)
131+
else
132+
window_finish+1
133+
end
134+
xstart = ((log_start_s-window_start)/(window_finish-window_start))*width
135+
xfinish = ((log_finish_s-window_start)/(window_finish-window_start))*width
136+
sethue("red")
137+
rect(Point(xstart,0),#=xfinish-xstart=#1,height)
138+
end
139+
end
140+
finish()
141+
end
142+
143+
end # module LuxorExt

Diff for: src/Dagger.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import Distributed
1212
import Distributed: Future, RemoteChannel, myid, workers, nworkers, procs, remotecall, remotecall_wait, remotecall_fetch
1313

1414
import LinearAlgebra
15-
import LinearAlgebra: Adjoint, BLAS, Diagonal, LAPACK, LowerTriangular, PosDefException, Transpose, UpperTriangular, diagind, ishermitian, issymmetric
15+
import LinearAlgebra: Adjoint, BLAS, Diagonal, Bidiagonal, Tridiagonal, LAPACK, LowerTriangular, PosDefException, Transpose, UpperTriangular, UnitLowerTriangular, UnitUpperTriangular, diagind, ishermitian, issymmetric
1616

1717
import UUIDs: UUID, uuid4
1818

@@ -23,7 +23,7 @@ else
2323
end
2424

2525
if !isdefined(Base, :get_extension)
26-
using Requires
26+
import Requires: @require
2727
end
2828

2929
import TimespanLogging

Diff for: src/array/cholesky.jl

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
LinearAlgebra.cholcopy(A::DArray{T,2}) where T = copy(A)
22
function potrf_checked!(uplo, A, info_arr)
3-
_A, info = LAPACK.potrf!(uplo, A)
4-
if info > 0
3+
_A, info = move(thunk_processor(), LAPACK.potrf!)(uplo, A)
4+
if info != 0
55
info_arr[1] = info
66
throw(PosDefException(info))
77
end
@@ -22,7 +22,7 @@ function LinearAlgebra._chol!(A::DArray{T,2}, ::Type{UpperTriangular}) where T
2222

2323
info = [convert(LinearAlgebra.BlasInt, 0)]
2424
try
25-
Dagger.spawn_datadeps() do
25+
Dagger.spawn_datadeps(;aliasing=true) do
2626
for k in range(1, mt)
2727
Dagger.@spawn potrf_checked!(uplo, InOut(Ac[k, k]), Out(info))
2828
for n in range(k+1, nt)

0 commit comments

Comments
 (0)