Skip to content

DArray: Implement efficient in-place matrix-matrix multiply #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
queue: "juliaecosystem"
sandbox_capable: "true"
os: linux
arch: x86_64
command: "julia --project -e 'using Pkg; Pkg.develop(;path=\"lib/TimespanLogging\")'"
.bench: &bench
if: build.message =~ /\[run benchmarks\]/
Expand All @@ -15,7 +16,7 @@
num_cpus: 16
steps:
- label: Julia 1.8
timeout_in_minutes: 60
timeout_in_minutes: 90
<<: *test
plugins:
- JuliaCI/julia#v1:
Expand All @@ -25,7 +26,7 @@ steps:
- JuliaCI/julia-coverage#v1:
codecov: true
- label: Julia 1.9
timeout_in_minutes: 60
timeout_in_minutes: 90
<<: *test
plugins:
- JuliaCI/julia#v1:
Expand All @@ -35,7 +36,7 @@ steps:
- JuliaCI/julia-coverage#v1:
codecov: true
- label: Julia 1.10
timeout_in_minutes: 60
timeout_in_minutes: 90
<<: *test
plugins:
- JuliaCI/julia#v1:
Expand All @@ -45,7 +46,7 @@ steps:
- JuliaCI/julia-coverage#v1:
codecov: true
- label: Julia nightly
timeout_in_minutes: 60
timeout_in_minutes: 90
<<: *test
plugins:
- JuliaCI/julia#v1:
Expand All @@ -55,7 +56,7 @@ steps:
- JuliaCI/julia-coverage#v1:
codecov: true
- label: Julia 1.8 (macOS)
timeout_in_minutes: 60
timeout_in_minutes: 90
<<: *test
agents:
queue: "juliaecosystem"
Expand All @@ -69,7 +70,7 @@ steps:
- JuliaCI/julia-coverage#v1:
codecov: true
- label: Julia 1.8 - TimespanLogging
timeout_in_minutes: 60
timeout_in_minutes: 20
<<: *test
plugins:
- JuliaCI/julia#v1:
Expand All @@ -78,7 +79,7 @@ steps:
codecov: true
command: "julia --project -e 'using Pkg; Pkg.instantiate(); Pkg.develop(;path=\"lib/TimespanLogging\"); Pkg.test(\"TimespanLogging\")'"
- label: Julia 1.8 - DaggerWebDash
timeout_in_minutes: 60
timeout_in_minutes: 20
<<: *test
plugins:
- JuliaCI/julia#v1:
Expand Down
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383"
SparseArrays = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
TaskLocalValues = "ed4db957-447d-4319-bfb6-7fa9ae7ecf34"
TimespanLogging = "a526e669-04d3-4846-9525-c66122c55f63"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

Expand All @@ -34,6 +35,7 @@ Requires = "1"
ScopedValues = "1.1"
Statistics = "1"
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33, 0.34"
TaskLocalValues = "0.1"
TimespanLogging = "0.1"
julia = "1.8"

Expand Down
17 changes: 10 additions & 7 deletions docs/src/darray.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,7 @@ julia> DZ = DY .* 3
Dagger.DArray{Float64, 2, Blocks{2}, typeof(cat)}(100, 100)
```

Now, `DZ` will contain the result of computing `(DX .+ DX) .* 3`. Note that
`DArray` objects are immutable, and operations on them are thus functional
transformations of their input `DArray`.

!!! note
Support for mutation of `DArray`s is planned for a future release
Now, `DZ` will contain the result of computing `(DX .+ DX) .* 3`.

```
julia> Dagger.chunks(DZ)
Expand Down Expand Up @@ -208,15 +203,17 @@ julia> collect(DZ)
```

A variety of other operations exist on the `DArray`, and it should generally
behavior otherwise similar to any other `AbstractArray` type. If you find that
behave otherwise similar to any other `AbstractArray` type. If you find that
it's missing an operation that you need, please file an issue!

### Known Supported Operations

This list is not exhaustive, but documents operations which are known to work well with the `DArray`:

From `Base`:
- `getindex`/`setindex!`
- Broadcasting
- `similar`/`copy`/`copyto!`
- `map`/`reduce`/`mapreduce`
- `sum`/`prod`
- `minimum`/`maximum`/`extrema`
Expand All @@ -225,3 +222,9 @@ From `Statistics`:
- `mean`
- `var`
- `std`

From `LinearAlgebra`:
- `transpose`/`adjoint` (Out-of-place transpose)
- `*` (Out-of-place Matrix-(Matrix/Vector) multiply)
- `mul!` (In-place Matrix-Matrix multiply)
- `cholesky`/`cholesky!` (In-place/Out-of-place Cholesky factorization)
4 changes: 3 additions & 1 deletion src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,19 @@ include("datadeps.jl")
include("array/darray.jl")
include("array/alloc.jl")
include("array/map-reduce.jl")
include("array/copy.jl")

# File IO
include("file-io.jl")

include("array/operators.jl")
include("array/getindex.jl")
include("array/indexing.jl")
include("array/setindex.jl")
include("array/matrix.jl")
include("array/sparse_partition.jl")
include("array/sort.jl")
include("array/linalg.jl")
include("array/mul.jl")
include("array/cholesky.jl")

# Visualization
Expand Down
111 changes: 111 additions & 0 deletions src/array/copy.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copy Buffering

function maybe_copy_buffered(f, args...)
@assert all(arg->arg isa Pair{<:DArray,<:Blocks}, args) "maybe_copy_buffered only supports `DArray`=>`Blocks`"
if any(arg_part->arg_part[1].partitioning != arg_part[2], args)
return copy_buffered(f, args...)
else
return f(map(first, args)...)
end
end
function copy_buffered(f, args...)
real_args = map(arg_part->arg_part[1], args)
buffered_args = map(arg_part->allocate_copy_buffer(arg_part[2], arg_part[1]), args)
for (buf_arg, arg) in zip(buffered_args, real_args)
copyto!(buf_arg, arg)
end
result = f(buffered_args...)
for (buf_arg, arg) in zip(buffered_args, real_args)
copyto!(arg, buf_arg)
end
return result
end
function allocate_copy_buffer(part::Blocks{N}, A::DArray{T,N}) where {T,N}
# FIXME: undef initializer
return zeros(part, T, size(A))
end
function Base.copyto!(B::DArray{T,N}, A::DArray{T,N}) where {T,N}
if size(B) != size(A)
throw(DimensionMismatch("Cannot copy from array of size $(size(A)) to array of size $(size(B))"))
end

Bc = B.chunks
Ac = A.chunks
Asd_all = A.subdomains::DomainBlocks{N}

Dagger.spawn_datadeps() do
for Bidx in CartesianIndices(Bc)
Bpart = Bc[Bidx]
Bsd = B.subdomains[Bidx]

# Find the first overlapping subdomain of A
if A.partitioning isa Blocks
Aidx = CartesianIndex(ntuple(i->fld1(Bsd.indexes[i].start, A.partitioning.blocksize[i]), N))
else
# Fallback just in case of non-dense partitioning
Aidx = first(CartesianIndices(Ac))
Asd = first(Asd_all)
for dim in 1:N
while Asd.indexes[dim].stop < Bsd.indexes[dim].start
Aidx += CartesianIndex(ntuple(i->i==dim, N))
Asd = Asd_all[Aidx]
end
end
end
Aidx_start = Aidx

# Find the last overlapping subdomain of A
for dim in 1:N
while true
Aidx_next = Aidx + CartesianIndex(ntuple(i->i==dim, N))
if !(Aidx_next in CartesianIndices(Ac))
break
end
Asd_next = Asd_all[Aidx_next]
if Asd_next.indexes[dim].start <= Bsd.indexes[dim].stop
Aidx = Aidx_next
else
break
end
end
end
Aidx_end = Aidx

# Find the span and set of subdomains of A overlapping Bpart
Aidx_span = Aidx_start:Aidx_end
Asd_view = view(A.subdomains, Aidx_span)

# Copy all overlapping subdomains of A
for Aidx in Aidx_span
Asd = Asd_all[Aidx]
Apart = Ac[Aidx]

# Compute the true range
range_start = CartesianIndex(ntuple(i->max(Bsd.indexes[i].start, Asd.indexes[i].start), N))
range_end = CartesianIndex(ntuple(i->min(Bsd.indexes[i].stop, Asd.indexes[i].stop), N))
range_diff = range_end - range_start

# Compute the offset range into Apart
Asd_start = ntuple(i->Asd.indexes[i].start, N)
Asd_end = ntuple(i->Asd.indexes[i].stop, N)
Arange = range(range_start - CartesianIndex(Asd_start) + CartesianIndex{N}(1),
range_start - CartesianIndex(Asd_start) + CartesianIndex{N}(1) + range_diff)

# Compute the offset range into Bpart
Bsd_start = ntuple(i->Bsd.indexes[i].start, N)
Bsd_end = ntuple(i->Bsd.indexes[i].stop, N)
Brange = range(range_start - CartesianIndex(Bsd_start) + CartesianIndex{N}(1),
range_start - CartesianIndex(Bsd_start) + CartesianIndex{N}(1) + range_diff)

# Perform view copy
Dagger.@spawn copyto_view!(Out(Bpart), Brange, In(Apart), Arange)
end
end
end

return B
end
function copyto_view!(Bpart, Brange, Apart, Arange)
copyto!(view(Bpart, Brange), view(Apart, Arange))
return
end
60 changes: 44 additions & 16 deletions src/array/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,20 @@ import Serialization: serialize, deserialize

An `N`-dimensional domain over an array.
"""
struct ArrayDomain{N}
indexes::NTuple{N, Any}
struct ArrayDomain{N,T<:Tuple}
indexes::T
end
include("../lib/domain-blocks.jl")


ArrayDomain(xs...) = ArrayDomain(xs)
ArrayDomain(xs::T) where T<:Tuple = ArrayDomain{length(xs),T}(xs)
ArrayDomain(xs::NTuple{N,Base.OneTo}) where N =
ArrayDomain{N,NTuple{N,UnitRange{Int}}}(ntuple(i->UnitRange(xs[i]), N))
ArrayDomain(xs::NTuple{N,Int}) where N =
ArrayDomain{N,NTuple{N,UnitRange{Int}}}(ntuple(i->xs[i]:xs[i], N))
ArrayDomain(xs...) = ArrayDomain((xs...,))
ArrayDomain(xs::Array) = ArrayDomain((xs...,))

include("../lib/domain-blocks.jl")

indexes(a::ArrayDomain) = a.indexes
chunks(a::ArrayDomain{N}) where {N} = DomainBlocks(
ntuple(i->first(indexes(a)[i]), Val(N)), map(x->[length(x)], indexes(a)))
Expand Down Expand Up @@ -117,6 +122,7 @@ indicates the number of dimensions in the resulting array.
"""
Blocks(xs::Int...) = Blocks(xs)

const DArrayDomain{N} = ArrayDomain{N, NTuple{N, UnitRange{Int}}}

"""
DArray{T,N,F}(domain, subdomains, chunks, concat)
Expand All @@ -133,8 +139,8 @@ An N-dimensional distributed array of element type T, with a concatenation funct
and concatenates them along dimension `d`. `cat` is used by default.
"""
mutable struct DArray{T,N,B<:AbstractBlocks{N},F} <: ArrayOp{T, N}
domain::ArrayDomain{N}
subdomains::AbstractArray{ArrayDomain{N}, N}
domain::DArrayDomain{N}
subdomains::AbstractArray{DArrayDomain{N}, N}
chunks::AbstractArray{Any, N}
partitioning::B
concat::F
Expand All @@ -143,20 +149,27 @@ mutable struct DArray{T,N,B<:AbstractBlocks{N},F} <: ArrayOp{T, N}
end
end

WrappedDArray{T,N} = Union{<:DArray{T,N}, Transpose{<:DArray{T,N}}, Adjoint{<:DArray{T,N}}}
WrappedDMatrix{T} = WrappedDArray{T,2}
WrappedDVector{T} = WrappedDArray{T,1}
DMatrix{T} = DArray{T,2}
DVector{T} = DArray{T,1}


# mainly for backwards-compatibility
DArray{T, N}(domain, subdomains, chunks, partitioning, concat=cat) where {T,N} =
DArray(T, domain, subdomains, chunks, partitioning, concat)

function DArray(T, domain::ArrayDomain{N},
subdomains::AbstractArray{ArrayDomain{N}, N},
chunks::AbstractArray{<:Any, N}, partitioning::B, concat=cat) where {N,B<:AbstractMultiBlocks{N}}
function DArray(T, domain::DArrayDomain{N},
subdomains::AbstractArray{DArrayDomain{N}, N},
chunks::AbstractArray{<:Any, N}, partitioning::B, concat=cat) where {N,B<:AbstractBlocks{N}}
DArray{T,N,B,typeof(concat)}(domain, subdomains, chunks, partitioning, concat)
end

function DArray(T, domain::ArrayDomain{N},
subdomains::ArrayDomain{N},
function DArray(T, domain::DArrayDomain{N},
subdomains::DArrayDomain{N},
chunks::Any, partitioning::B, concat=cat) where {N,B<:AbstractSingleBlocks{N}}
_subdomains = Array{ArrayDomain{N}, N}(undef, ntuple(i->1, N)...)
_subdomains = Array{DArrayDomain{N}, N}(undef, ntuple(i->1, N)...)
_subdomains[1] = subdomains
_chunks = Array{Any, N}(undef, ntuple(i->1, N)...)
_chunks[1] = chunks
Expand Down Expand Up @@ -201,6 +214,13 @@ function Base.similar(x::DArray{T,N}) where {T,N}
return DArray(T, x.domain, x.subdomains, thunks, x.partitioning, x.concat)
end

function Base.similar(A::DArray{T,N} where T, ::Type{S}, dims::Dims{N}) where {S,N}
d = ArrayDomain(map(x->1:x, dims))
p = A.partitioning
a = AllocateArray(S, (_, _, x...) -> Array{S,N}(undef, x...), d, partition(p, d), p)
return _to_darray(a)
end

Base.copy(x::DArray{T,N,B,F}) where {T,N,B,F} =
map(identity, x)::DArray{T,N,B,F}

Expand All @@ -214,7 +234,7 @@ Base.:(/)(x::DArray{T,N,B,F}, y::U) where {T<:Real,U<:Real,N,B,F} =
A `view` of a `DArray` chunk returns a `DArray` of `Thunk`s.
"""
function Base.view(c::DArray, d)
subchunks, subdomains = lookup_parts(chunks(c), domainchunks(c), d)
subchunks, subdomains = lookup_parts(c, chunks(c), domainchunks(c), d)
d1 = alignfirst(d)
DArray(eltype(c), d1, subdomains, subchunks, c.partitioning, c.concat)
end
Expand Down Expand Up @@ -252,7 +272,7 @@ function group_indices(cumlength, idxs::AbstractRange)
end

_cumsum(x::AbstractArray) = length(x) == 0 ? Int[] : cumsum(x)
function lookup_parts(ps::AbstractArray, subdmns::DomainBlocks{N}, d::ArrayDomain{N}) where N
function lookup_parts(A::DArray, ps::AbstractArray, subdmns::DomainBlocks{N}, d::ArrayDomain{N}) where N
groups = map(group_indices, subdmns.cumlength, indexes(d))
sz = map(length, groups)
pieces = Array{Any}(undef, sz)
Expand All @@ -264,7 +284,15 @@ function lookup_parts(ps::AbstractArray, subdmns::DomainBlocks{N}, d::ArrayDomai
end
out_cumlength = map(g->_cumsum(map(x->length(x[2]), g)), groups)
out_dmn = DomainBlocks(ntuple(x->1,Val(N)), out_cumlength)
pieces, out_dmn
return pieces, out_dmn
end
function lookup_parts(A::DArray, ps::AbstractArray, subdmns::DomainBlocks{N}, d::ArrayDomain{S}) where {N,S}
if S != 1
throw(BoundsError(A, d.indexes))
end
inds = CartesianIndices(A)[d.indexes...]
new_d = ntuple(i->first(inds).I[i]:last(inds).I[i], N)
return lookup_parts(A, ps, subdmns, ArrayDomain(new_d))
end

"""
Expand Down
Loading
Loading