diff --git a/base/exports.jl b/base/exports.jl index cf6d9ddfde437..c8a3bb2ad68fc 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -89,6 +89,7 @@ export RoundUp, Schur, Set, + SharedArray, SparseMatrixCSC, StatStruct, StridedArray, diff --git a/base/mmap.jl b/base/mmap.jl index 2d60e5f9e7f7b..6d5060ea37a41 100644 --- a/base/mmap.jl +++ b/base/mmap.jl @@ -108,13 +108,13 @@ function mmap_stream_settings(s::IO) end # Mmapped-array constructor -function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset) +function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset; grow::Bool=true) prot, flags, iswrite = mmap_stream_settings(s) len = prod(dims)*sizeof(T) if len > typemax(Int) error("file is too large to memory-map on this platform") end - if iswrite + if iswrite && grow pmap, delta = mmap_grow(len, prot, flags, fd(s), offset) else pmap, delta = mmap(len, prot, flags, fd(s), offset) diff --git a/base/sharedarray.jl b/base/sharedarray.jl new file mode 100644 index 0000000000000..511e0a8702b4f --- /dev/null +++ b/base/sharedarray.jl @@ -0,0 +1,260 @@ +type SharedArray{T,N} <: AbstractArray{T,N} + dims::NTuple{N,Int} + pids::Vector{Int} + refs::Array{RemoteRef} + + # Fields below are not to be serialized + # Local shmem map. + loc_shmarr::Array{T,N} + + # idx of current workers pid into the pids vector, 0 if this shared array is not mapped locally. + loc_pididx::Int + + # the local partition into the array when viewed as a single dimensional array. + loc_subarr_1d + + SharedArray(d,p,r) = new(d,p,r) +end + +function SharedArray(T::Type, dims::NTuple; init=false, pids=workers()) + N = length(dims) + + !isbits(T) ? error("Type of Shared Array elements must be bits types") : nothing + @windows_only error(" SharedArray is not supported on Windows yet.") + + len_sa = prod(dims) + if length(pids) > len_sa + pids = pids[1:len_sa] + end + + onlocalhost = assert_same_host(pids) + + local shm_seg_name = "" + local loc_shmarr + local sa = nothing + local shmmem_create_pid + try + # On OSX, the shm_seg_name length must be < 32 characters + shm_seg_name = string("/jl", getpid(), int64(time() * 10^9)) + if onlocalhost + shmmem_create_pid = myid() + loc_shmarr = shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR) + else + # The shared array is being created on a remote machine.... + shmmem_create_pid = pids[1] + remotecall(pids[1], () -> begin shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR); nothing end) + end + + func_mapshmem = () -> shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR) + + refs = Array(RemoteRef, length(pids)) + for (i, p) in enumerate(pids) + refs[i] = remotecall(p, func_mapshmem) + end + + # Wait till all the workers have mapped the segment + for i in 1:length(refs) + wait(refs[i]) + end + + # All good, immediately unlink the segment. + remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end) + shm_seg_name = "" + + sa = SharedArray{T,N}(dims, pids, refs) + if onlocalhost + init_loc_flds(sa) + + # In the event that myid() is not part of pids, loc_shmarr will not be set + # in the init function above, hence setting it here if available. + sa.loc_shmarr = loc_shmarr + else + sa.loc_pididx = 0 + end + + # if present init function is called on each of the parts + @sync begin + if isa(init, Function) + for p in pids + @async remotecall_wait(p, init, sa) + end + end + end + + finally + if shm_seg_name != "" + remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end) + end + end + sa +end + +SharedArray(T, I::Int...; kwargs...) = SharedArray(T, I; kwargs...) + + +length(sa::SharedArray) = prod(sa.dims) +size(sa::SharedArray) = sa.dims +procs(sa::SharedArray) = sa.pids + + + +function range_1dim(sa::SharedArray, n) + l = length(sa) + nw = length(sa.pids) + partlen = div(l, nw) + + if n == nw + return (((n-1) * partlen) + 1):l + else + return (((n-1) * partlen) + 1):(n*partlen) + end +end + +sub_1dim(sa::SharedArray, n) = sub(sa.loc_shmarr, range_1dim(sa, n)) + +function init_loc_flds(sa) + if myid() in sa.pids + sa.loc_pididx = findfirst(sa.pids, myid()) + sa.loc_shmarr = fetch(sa.refs[sa.loc_pididx]) + sa.loc_subarr_1d = sub_1dim(sa, sa.loc_pididx) + else + sa.loc_pididx = 0 + end +end + + +# Don't serialize loc_shmarr (it is the complete array) and +# pididx, which is relevant to the current process only +function serialize(s, sa::SharedArray) + serialize_type(s, typeof(sa)) + serialize(s, length(SharedArray.names)) + for n in SharedArray.names + if n in [:loc_shmarr, :loc_pididx, :loc_subarr_1d] + writetag(s, UndefRefTag) + else + serialize(s, getfield(sa, n)) + end + end +end + +function deserialize{T,N}(s, t::Type{SharedArray{T,N}}) + sa = invoke(deserialize, (Any, DataType), s, t) + init_loc_flds(sa) + if (sa.loc_pididx == 0) + error("SharedArray cannot be used on a non-participating process") + end + sa +end + +convert(::Type{Array}, sa::SharedArray) = sa.loc_shmarr + +# avoiding ambiguity warnings +getindex(sa::SharedArray, x::Real) = getindex(sa.loc_shmarr, x) +getindex(sa::SharedArray, x::AbstractArray) = getindex(sa.loc_shmarr, x) + +# pass through getindex and setindex! - they always work on the complete array unlike DArrays +getindex(sa::SharedArray, args...) = getindex(sa.loc_shmarr, args...) +setindex!(sa::SharedArray, args...) = (setindex!(sa.loc_shmarr, args...); sa) + +# convenience constructors +function shmem_fill(v, dims; kwargs...) + SharedArray(typeof(v), dims; init = S->fill!(S.loc_subarr_1d, v), kwargs...) +end +shmem_fill(v, I::Int...; kwargs...) = shmem_fill(v, I; kwargs...) + +# rand variant with range +function shmem_rand(TR::Union(DataType, Range1), dims; kwargs...) + if isa(TR, Range1) + SharedArray(Int, dims; init = S -> map!((x)->rand(TR), S.loc_subarr_1d), kwargs...) + else + SharedArray(TR, dims; init = S -> map!((x)->rand(TR), S.loc_subarr_1d), kwargs...) + end +end +shmem_rand(TR::Union(DataType, Range1), i::Int; kwargs...) = shmem_rand(TR, (i,); kwargs...) +shmem_rand(TR::Union(DataType, Range1), I::Int...; kwargs...) = shmem_rand(TR, I; kwargs...) + +shmem_rand(dims; kwargs...) = shmem_rand(Float64, dims; kwargs...) +shmem_rand(I::Int...; kwargs...) = shmem_rand(I; kwargs...) + +function shmem_randn(dims; kwargs...) + SharedArray(Float64, dims; init = S-> map!((x)->randn(), S.loc_subarr_1d), kwargs...) +end +shmem_randn(I::Int...; kwargs...) = shmem_randn(I; kwargs...) + + + +function print_shmem_limits(slen) + try + @linux_only pfx = "kernel" + @osx_only pfx = "kern.sysv" + + shmmax_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmmax`)[1]))[end]), 1024*1024) + page_size = int(split(readall(readsfrom(`getconf PAGE_SIZE`)[1]))[end]) + shmall_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmall`)[1]))[end]) * page_size, 1024*1024) + + println("System max size of single shmem segment(MB) : ", shmmax_MB, + "\nSystem max size of all shmem segments(MB) : ", shmall_MB, + "\nRequested size(MB) : ", div(slen, 1024*1024), + "\nPlease ensure requested size is within system limits.", + "\nIf not, increase system limits and try again." + ) + catch e + nothing # Ignore any errors in this... + end +end + +# utilities +function shm_mmap_array(T, dims, shm_seg_name, mode) + local s = nothing + local A = nothing + try + fd_mem = shm_open(shm_seg_name, mode, S_IRUSR | S_IWUSR) + if !(fd_mem > 0) + error("shm_open() failed") + end + + s = fdio(fd_mem, true) + + # On OSX, ftruncate must to used to set size of segment, just lseek does not work. + # and only at creation time + if (mode & JL_O_CREAT) == JL_O_CREAT + rc = ccall(:ftruncate, Int, (Int, Int), fd_mem, prod(dims)*sizeof(T)) + if rc != 0 + ec = errno() + error("ftruncate() failed, errno : ", ec) + end + end + + A = mmap_array(T, dims, s, 0, grow=false) + catch e + print_shmem_limits(prod(dims)*sizeof(T)) + rethrow(e) + + finally + if s != nothing + close(s) + end + end + A +end + +@unix_only shm_unlink(shm_seg_name) = ccall(:shm_unlink, Cint, (Ptr{Uint8},), shm_seg_name) +@unix_only shm_open(shm_seg_name, oflags, permissions) = ccall(:shm_open, Int, (Ptr{Uint8}, Int, Int), shm_seg_name, oflags, permissions) + + +function assert_same_host(procs) + myip = + resp = Array(Any, length(procs)) + + @sync begin + for (i, p) in enumerate(procs) + @async resp[i] = remotecall_fetch(p, () -> getipaddr()) + end + end + + if !all(x->x==resp[1], resp) + error("SharedArray requires all requested processes to be on the same machine.") + end + + return (resp[1] != getipaddr()) ? false : true +end diff --git a/base/sysimg.jl b/base/sysimg.jl index c142a56922c27..0bb54032ab8a5 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -160,6 +160,7 @@ include("combinatorics.jl") # distributed arrays and memory-mapped arrays include("darray.jl") include("mmap.jl") +include("sharedarray.jl") # utilities - version, timing, help, edit, metaprogramming include("sysinfo.jl") diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index 8944977c973cc..9ade1e2e32d90 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -541,6 +541,28 @@ is ``DArray``\ -specific, but we list it here for completeness:: end + +Shared Arrays (EXPERIMENTAL FEATURE) +------------------------------------ + +Shared Arrays use system shared memory to map the same array across many processes. + +The constructor for a shared array is of the form + ``SharedArray(T::Type, dims::NTuple; init=false, pids=workers())`` +which creates a shared array of type ``T`` and size ``dims`` across the processes +specified by ``pids`` - all of which have to be on the same host. + +If an ``init`` function of the type ``initfn(S::SharedArray)`` is specified, +it is called on all the participating workers. + +Unlike distributed arrays, a shareds array is accessible only from those participating workers +specified by the ``pids`` named argument (and the creating process too, if it is on the same host). + +Indexing into a shared array (both for setting as well as accessing values) is the same as +for a regular array. + + + ClusterManagers --------------- diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index 6e60c423a8168..c34f9cb54c3c1 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -4500,6 +4500,37 @@ Distributed Arrays Get the vector of processors storing pieces of ``d`` + +Shared Arrays (EXPERIMENTAL FEATURE) +------------------------------------ + +.. function:: SharedArray(T::Type, dims::NTuple; init=false, pids=workers()) + + Construct a SharedArray of type ``T`` and size ``dims`` across the processes + specified by ``pids`` - all of which have to be on the same host. + + If an ``init`` function of the type ``initfn(S::SharedArray)`` is specified, + it is called on all the participating workers. + + The following fields in type ``SharedArray`` are initialized appropriately on each + participating process. + + ``loc_shmarr::Array{T,N}`` - the shared memory segment mapped appropriately into + the current process. Note: For indexed access it is NOT required to use this field. + A ``SharedArray`` object can be used just like a regular array. + + ``loc_pididx::Int`` - index of the current process into the ``pids`` vector. Can be + used while distributing computational work across participating workers + + ``loc_subarr_1d`` - a 1-d subarray of the entire array, when equally partitioned + across participating workers. Can be used as a simple work partitioning scheme. + + +.. function:: procs(sa::SharedArray) + + Get the vector of processes that have mapped the shared array + + System ------ diff --git a/test/parallel.jl b/test/parallel.jl index 4410ebfc3e9b9..d664e2db55908 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -20,6 +20,48 @@ a = convert(Array, d) @test fetch(@spawnat id_me localpart(d)[1,1]) == d[1,1] @test fetch(@spawnat id_other localpart(d)[1,1]) == d[1,101] + +@unix_only begin +# SharedArray tests +dims = (20,20,20) +d = Base.shmem_rand(1:100, dims) +a = convert(Array, d) + +partsums = Array(Int, length(procs(d))) +@sync begin + for (i, p) in enumerate(procs(d)) + @async partsums[i] = remotecall_fetch(p, D->sum(D.loc_subarr_1d), d) + end +end +@test sum(a) == sum(partsums) + +d = Base.shmem_rand(dims) +for p in procs(d) + idxes_in_p = remotecall_fetch(p, D -> parentindexes(D.loc_subarr_1d)[1], d) + idxf = first(idxes_in_p) + idxl = last(idxes_in_p) + d[idxf] = float64(idxf) + rv = remotecall_fetch(p, (D,idxf,idxl) -> begin assert(D[idxf] == float64(idxf)); D[idxl] = float64(idxl); D[idxl]; end, d,idxf,idxl) + @test d[idxl] == rv +end + +@test ones(10, 10, 10) == Base.shmem_fill(1.0, (10,10,10)) +@test zeros(Int32, 10, 10, 10) == Base.shmem_fill(0, (10,10,10)) + +d = SharedArray(Int, dims; init = D->fill!(D.loc_subarr_1d, myid())) +for p in procs(d) + idxes_in_p = remotecall_fetch(p, D -> parentindexes(D.loc_subarr_1d)[1], d) + idxf = first(idxes_in_p) + idxl = last(idxes_in_p) + @test d[idxf] == p + @test d[idxl] == p +end + + +end # @unix_only(SharedArray tests) + + + # Test @parallel load balancing - all processors should get either M or M+1 # iterations out of the loop range for some M. if nprocs() < 4