Skip to content

DArray: Add automatic partitioning and show array elements #505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 268 additions & 54 deletions docs/src/darray.md

Large diffs are not rendered by default.

82 changes: 46 additions & 36 deletions src/array/alloc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,71 @@ function stage(ctx, a::AllocateArray)
return DArray(a.eltype, a.domain, a.domainchunks, thunks, a.partitioning)
end

function Base.rand(p::Blocks, eltype::Type, dims)
const BlocksOrAuto = Union{Blocks{N} where N, AutoBlocks}

function Base.rand(p::Blocks, eltype::Type, dims::Dims)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(eltype, (_, x...) -> rand(x...), d, partition(p, d), p)
return _to_darray(a)
end
Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...) = rand(p, T, dims)
Base.rand(p::BlocksOrAuto, T::Type, dims::Dims) = rand(p, T, dims)
Base.rand(p::BlocksOrAuto, dims::Integer...) = rand(p, Float64, dims)
Base.rand(p::BlocksOrAuto, dims::Dims) = rand(p, Float64, dims)
Base.rand(::AutoBlocks, eltype::Type, dims::Dims) =
rand(auto_blocks(dims), eltype, dims)

Base.rand(p::Blocks, t::Type, dims::Integer...) = rand(p, t, dims)
Base.rand(p::Blocks, dims::Integer...) = rand(p, Float64, dims)
Base.rand(p::Blocks, dims::Tuple) = rand(p, Float64, dims)
function Base.randn(p::Blocks, eltype::Type, dims::Dims)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(eltype, (_, x...) -> randn(x...), d, partition(p, d), p)
return _to_darray(a)
end
Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...) = randn(p, T, dims)
Base.randn(p::BlocksOrAuto, T::Type, dims::Dims) = randn(p, T, dims)
Base.randn(p::BlocksOrAuto, dims::Integer...) = randn(p, Float64, dims)
Base.randn(p::BlocksOrAuto, dims::Dims) = randn(p, Float64, dims)
Base.randn(::AutoBlocks, eltype::Type, dims::Dims) =
randn(auto_blocks(dims), eltype, dims)

function Base.randn(p::Blocks, eltype::Type, dims)
function sprand(p::Blocks, eltype::Type, dims::Dims, sparsity::AbstractFloat)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(Float64, (_, x...) -> randn(x...), d, partition(p, d), p)
a = AllocateArray(eltype, (_, T, _dims) -> sprand(T, _dims..., sparsity), d, partition(p, d), p)
return _to_darray(a)
end
Base.randn(p::Blocks, t::Type, dims::Integer...) = randn(p, t, dims)
Base.randn(p::Blocks, dims::Integer...) = randn(p, dims)
Base.randn(p::Blocks, dims::Tuple) = randn(p, Float64, dims)
sprand(p::BlocksOrAuto, T::Type, dims_and_sparsity::Real...) =
sprand(p, T, dims_and_sparsity[1:end-1], dims_and_sparsity[end])
sprand(p::BlocksOrAuto, T::Type, dims::Dims, sparsity::AbstractFloat) =
sprand(p, T, dims, sparsity)
sprand(p::BlocksOrAuto, dims_and_sparsity::Real...) =
sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end])
sprand(p::BlocksOrAuto, dims::Dims, sparsity::AbstractFloat) =
sprand(p, Float64, dims, sparsity)
sprand(::AutoBlocks, eltype::Type, dims::Dims, sparsity::AbstractFloat) =
sprand(auto_blocks(dims), eltype, dims, sparsity)

function Base.ones(p::Blocks, eltype::Type, dims)
function Base.ones(p::Blocks, eltype::Type, dims::Dims)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(eltype, (_, x...) -> ones(x...), d, partition(p, d), p)
return _to_darray(a)
end
Base.ones(p::Blocks, t::Type, dims::Integer...) = ones(p, t, dims)
Base.ones(p::Blocks, dims::Integer...) = ones(p, Float64, dims)
Base.ones(p::Blocks, dims::Tuple) = ones(p, Float64, dims)
Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...) = ones(p, T, dims)
Base.ones(p::BlocksOrAuto, T::Type, dims::Dims) = ones(p, T, dims)
Base.ones(p::BlocksOrAuto, dims::Integer...) = ones(p, Float64, dims)
Base.ones(p::BlocksOrAuto, dims::Dims) = ones(p, Float64, dims)
Base.ones(::AutoBlocks, eltype::Type, dims::Dims) =
ones(auto_blocks(dims), eltype, dims)

function Base.zeros(p::Blocks, eltype::Type, dims)
function Base.zeros(p::Blocks, eltype::Type, dims::Dims)
d = ArrayDomain(map(x->1:x, dims))
a = AllocateArray(eltype, (_, x...) -> zeros(x...), d, partition(p, d), p)
return _to_darray(a)
end
Base.zeros(p::Blocks, t::Type, dims::Integer...) = zeros(p, t, dims)
Base.zeros(p::Blocks, dims::Integer...) = zeros(p, Float64, dims)
Base.zeros(p::Blocks, dims::Tuple) = zeros(p, Float64, dims)
Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...) = zeros(p, T, dims)
Base.zeros(p::BlocksOrAuto, T::Type, dims::Dims) = zeros(p, T, dims)
Base.zeros(p::BlocksOrAuto, dims::Integer...) = zeros(p, Float64, dims)
Base.zeros(p::BlocksOrAuto, dims::Dims) = zeros(p, Float64, dims)
Base.zeros(::AutoBlocks, eltype::Type, dims::Dims) =
zeros(auto_blocks(dims), eltype, dims)

function Base.zero(x::DArray{T,N}) where {T,N}
dims = ntuple(i->x.domain.indexes[i].stop, N)
Expand All @@ -83,22 +112,3 @@ function Base.view(A::AbstractArray{T,N}, p::Blocks{N}) where {T,N}
chunks = [tochunk(view(A, x.indexes...)) for x in dc]
return DArray(T, d, dc, chunks, p)
end

function sprand(p::Blocks, m::Integer, n::Integer, sparsity::Real)
s = rand(UInt)
f = function (idx, t,sz)
sprand(MersenneTwister(s+idx), sz...,sparsity)
end
d = ArrayDomain((1:m, 1:n))
a = AllocateArray(Float64, f, d, partition(p, d), p)
return _to_darray(a)
end

function sprand(p::Blocks, n::Integer, sparsity::Real)
s = rand(UInt)
f = function (idx,t,sz)
sprand(MersenneTwister(s+idx), sz...,sparsity)
end
a = AllocateArray(Float64, f, d, partition(p, ArrayDomain((1:n,))), p)
return _to_darray(a)
end
157 changes: 129 additions & 28 deletions src/array/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import Base: ==, fetch
using Serialization
import Serialization: serialize, deserialize

export DArray, DVector, DMatrix, Blocks, AutoBlocks
export distribute


###### Array Domains ######

"""
Expand Down Expand Up @@ -92,18 +96,6 @@ collect(x::Computation) = collect(fetch(x))

Base.fetch(x::Computation) = fetch(stage(Context(global_context()), x))

function Base.show(io::IO, ::MIME"text/plain", x::ArrayOp)
write(io, string(typeof(x)))
write(io, string(size(x)))
end

function Base.show(io::IO, x::ArrayOp)
m = MIME"text/plain"()
show(io, m, x)
end

export BlockPartition, Blocks

abstract type AbstractBlocks{N} end

abstract type AbstractMultiBlocks{N}<:AbstractBlocks{N} end
Expand Down Expand Up @@ -149,11 +141,11 @@ mutable struct DArray{T,N,B<:AbstractBlocks{N},F} <: ArrayOp{T, N}
end
end

WrappedDArray{T,N} = Union{<:DArray{T,N}, Transpose{<:DArray{T,N}}, Adjoint{<:DArray{T,N}}}
WrappedDMatrix{T} = WrappedDArray{T,2}
WrappedDVector{T} = WrappedDArray{T,1}
DMatrix{T} = DArray{T,2}
DVector{T} = DArray{T,1}
const WrappedDArray{T,N} = Union{<:DArray{T,N}, Transpose{<:DArray{T,N}}, Adjoint{<:DArray{T,N}}}
const WrappedDMatrix{T} = WrappedDArray{T,2}
const WrappedDVector{T} = WrappedDArray{T,1}
const DMatrix{T} = DArray{T,2}
const DVector{T} = DArray{T,1}


# mainly for backwards-compatibility
Expand Down Expand Up @@ -196,6 +188,95 @@ function Base.collect(d::DArray; tree=false)
end
end

### show

#= FIXME
@static if isdefined(Base, :AnnotatedString)
# FIXME: Import StyledStrings
struct ColorElement{T}
color::Symbol
value::T
end
function Base.show(io::IO, ::MIME"text/plain", x::ColorElement)
print(io, styled"{(foreground=$(x.color)):$(x.value)}")
end
else
=#
struct ColorElement{T}
color::Symbol
value::Union{Some{T},Nothing}
end
function Base.show(io::IO, ::MIME"text/plain", x::ColorElement)
if x.value !== nothing
printstyled(io, something(x.value); color=x.color)
else
printstyled(io, "..."; color=x.color)
end
end
Base.alignment(io::IO, x::ColorElement) =
Base.alignment(io, something(x.value, "..."))
#end
struct ColorArray{T,N} <: DenseArray{T,N}
A::DArray{T,N}
color_map::Vector{Symbol}
seen_values::Dict{NTuple{N,Int},Union{Some{T},Nothing}}
function ColorArray(A::DArray{T,N}) where {T,N}
colors = [:red, :green, :yellow, :blue, :magenta, :cyan]
color_map = [colors[mod1(idx, length(colors))] for idx in 1:length(A.chunks)]
return new{T,N}(A, color_map, Dict{NTuple{N,Int},Union{Some{T},Nothing}}())
end
end
Base.size(A::ColorArray) = size(A.A)
Base.getindex(A::ColorArray, idx::Integer) = getindex(A, (idx,))
Base.getindex(A::ColorArray, idxs::Integer...) = getindex(A, (idxs...,))
function Base.getindex(A::ColorArray{T,N}, idxs::NTuple{N,Int}) where {T,N}
sd_idx_tuple, _ = partition_for(A.A, idxs)
sd_idx = CartesianIndex(sd_idx_tuple)
sd_idx_linear = LinearIndices(A.A.chunks)[sd_idx]
if !haskey(A.seen_values, idxs)
chunk = A.A.chunks[sd_idx]
if chunk isa Chunk || isready(chunk)
value = A.seen_values[idxs] = Some(getindex(A.A, idxs))
else
# Show a placeholder instead
value = A.seen_values[idxs] = nothing
end
else
value = A.seen_values[idxs]
end
if value !== nothing
color = A.color_map[sd_idx_linear]
else
color = :light_black
end
return ColorElement{T}(color, value)
end
function Base.getindex(A::ColorArray{T,N}, idxs::Dims{S}) where {T,N,S}
if S > N
if all(idxs[(N+1):end] .== 1)
return getindex(A, idxs[1:N])
else
throw(BoundsError(A, idxs))
end
elseif S < N
throw(BoundsError(A, idxs))
end
end
function Base.show(io::IO, ::MIME"text/plain", A::DArray{T,N}) where {T,N}
write(io, string(DArray{T,N}))
write(io, string(size(A)))
write(io, " with $(join(size(A.chunks), 'x')) partitions of size $(join(A.partitioning.blocksize, 'x')):")
pct_complete = 100 * (sum(c->c isa Chunk ? true : isready(c), A.chunks) / length(A.chunks))
if pct_complete < 100
println(io)
printstyled(io, "~$(round(Int, pct_complete))% completed"; color=:yellow)
end
println(io)
with_index_caching(1) do
Base.print_array(IOContext(io, :compact=>true), ColorArray(A))
end
end

function (==)(x::ArrayOp, y::ArrayOp)
x === y || reduce((a,b)->a&&b, map(==, x, y))
end
Expand Down Expand Up @@ -319,8 +400,6 @@ end
Base.@deprecate_binding Cat DArray
Base.@deprecate_binding ComputedArray DArray

export Distribute, distribute

struct Distribute{T,N,B<:AbstractBlocks} <: ArrayOp{T, N}
domainchunks
partitioning::B
Expand Down Expand Up @@ -385,22 +464,44 @@ function stage(ctx::Context, d::Distribute)
d.partitioning)
end

function distribute(x::AbstractArray, dist::Blocks)
_to_darray(Distribute(dist, x))
end
"""
AutoBlocks

Automatically determines the size and number of blocks for a distributed array.
This may construct any kind of `Dagger.AbstractBlocks` partitioning.
"""
struct AutoBlocks end
function auto_blocks(dims::Dims{N}) where N
# TODO: Allow other partitioning schemes
np = num_processors()
p = cld(dims[end], np)
return Blocks(ntuple(i->i == N ? p : dims[i], N))
end
auto_blocks(A::AbstractArray{T,N}) where {T,N} = auto_blocks(size(A))

distribute(A::AbstractArray) = distribute(A, AutoBlocks())
distribute(A::AbstractArray{T,N}, dist::Blocks{N}) where {T,N} =
_to_darray(Distribute(dist, A))
distribute(A::AbstractArray, ::AutoBlocks) = distribute(A, auto_blocks(A))
function distribute(x::AbstractArray{T,N}, n::NTuple{N}) where {T,N}
p = map((d, dn)->ceil(Int, d / dn), size(x), n)
distribute(x, Blocks(p))
end
distribute(x::AbstractVector, n::Int) = distribute(x, (n,))
distribute(x::AbstractVector, n::Vector{<:Integer}) =
distribute(x, DomainBlocks((1,), (cumsum(n),)))

function distribute(x::AbstractVector, n::Int)
distribute(x, (n,))
end
DVector(A::AbstractVector{T}, part::Blocks{1}) where T = distribute(A, part)
DMatrix(A::AbstractMatrix{T}, part::Blocks{2}) where T = distribute(A, part)
DArray(A::AbstractArray{T,N}, part::Blocks{N}) where {T,N} = distribute(A, part)

function distribute(x::AbstractVector, n::Vector{<:Integer})
distribute(x, DomainBlocks((1,), (cumsum(n),)))
end
DVector(A::AbstractVector{T}) where T = DVector(A, AutoBlocks())
DMatrix(A::AbstractMatrix{T}) where T = DMatrix(A, AutoBlocks())
DArray(A::AbstractArray) = DArray(A, AutoBlocks())

DVector(A::AbstractVector{T}, ::AutoBlocks) where T = DVector(A, auto_blocks(A))
DMatrix(A::AbstractMatrix{T}, ::AutoBlocks) where T = DMatrix(A, auto_blocks(A))
DArray(A::AbstractArray, ::AutoBlocks) = DArray(A, auto_blocks(A))

function Base.:(==)(x::ArrayOp{T,N}, y::AbstractArray{S,N}) where {T,S,N}
collect(x) == y
Expand Down
24 changes: 24 additions & 0 deletions src/array/indexing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ Base.getindex(A::DArray, idx::Integer) =
getindex(A, Base._ind2sub(A, idx))
Base.getindex(A::DArray, idx::CartesianIndex) =
getindex(A, Tuple(idx))
function Base.getindex(A::DArray{T,N}, idxs::Dims{S}) where {T,N,S}
if S > N
if all(idxs[(N+1):end] .== 1)
return getindex(A, idxs[1:N])
else
throw(BoundsError(A, idxs))
end
elseif S < N
throw(BoundsError(A, idxs))
end
error()
end

### setindex!

Expand Down Expand Up @@ -126,6 +138,18 @@ Base.setindex!(A::DArray, value, idx::Integer) =
setindex!(A, value, Base._ind2sub(A, idx))
Base.setindex!(A::DArray, value, idx::CartesianIndex) =
setindex!(A, value, Tuple(idx))
function Base.setindex!(A::DArray{T,N}, value, idxs::Dims{S}) where {T,N,S}
if S > N
if all(idxs[(N+1):end] .== 1)
return setindex!(A, value, idxs[1:N])
else
throw(BoundsError(A, idxs))
end
elseif S < N
throw(BoundsError(A, idxs))
end
error()
end

### Allow/disallow scalar indexing

Expand Down
Loading
Loading