Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SharedArray - take 2 #5380

Merged
merged 1 commit into from
Jan 20, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export
RoundUp,
Schur,
Set,
SharedArray,
SparseMatrixCSC,
StatStruct,
StridedArray,
Expand Down
4 changes: 2 additions & 2 deletions base/mmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ function mmap_stream_settings(s::IO)
end

# Mmapped-array constructor
function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset)
function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset; grow::Bool=true)
prot, flags, iswrite = mmap_stream_settings(s)
len = prod(dims)*sizeof(T)
if len > typemax(Int)
error("file is too large to memory-map on this platform")
end
if iswrite
if iswrite && grow
pmap, delta = mmap_grow(len, prot, flags, fd(s), offset)
else
pmap, delta = mmap(len, prot, flags, fd(s), offset)
Expand Down
260 changes: 260 additions & 0 deletions base/sharedarray.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
type SharedArray{T,N} <: AbstractArray{T,N}
dims::NTuple{N,Int}
pids::Vector{Int}
refs::Array{RemoteRef}

# Fields below are not to be serialized
# Local shmem map.
loc_shmarr::Array{T,N}

# idx of current workers pid into the pids vector, 0 if this shared array is not mapped locally.
loc_pididx::Int

# the local partition into the array when viewed as a single dimensional array.
loc_subarr_1d

SharedArray(d,p,r) = new(d,p,r)
end

function SharedArray(T::Type, dims::NTuple; init=false, pids=workers())
N = length(dims)

!isbits(T) ? error("Type of Shared Array elements must be bits types") : nothing
@windows_only error(" SharedArray is not supported on Windows yet.")

len_sa = prod(dims)
if length(pids) > len_sa
pids = pids[1:len_sa]
end

onlocalhost = assert_same_host(pids)

local shm_seg_name = ""
local loc_shmarr
local sa = nothing
local shmmem_create_pid
try
# On OSX, the shm_seg_name length must be < 32 characters
shm_seg_name = string("/jl", getpid(), int64(time() * 10^9))
if onlocalhost
shmmem_create_pid = myid()
loc_shmarr = shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR)
else
# The shared array is being created on a remote machine....
shmmem_create_pid = pids[1]
remotecall(pids[1], () -> begin shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR); nothing end)
end

func_mapshmem = () -> shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR)

refs = Array(RemoteRef, length(pids))
for (i, p) in enumerate(pids)
refs[i] = remotecall(p, func_mapshmem)
end

# Wait till all the workers have mapped the segment
for i in 1:length(refs)
wait(refs[i])
end

# All good, immediately unlink the segment.
remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end)
shm_seg_name = ""

sa = SharedArray{T,N}(dims, pids, refs)
if onlocalhost
init_loc_flds(sa)

# In the event that myid() is not part of pids, loc_shmarr will not be set
# in the init function above, hence setting it here if available.
sa.loc_shmarr = loc_shmarr
else
sa.loc_pididx = 0
end

# if present init function is called on each of the parts
@sync begin
if isa(init, Function)
for p in pids
@async remotecall_wait(p, init, sa)
end
end
end

finally
if shm_seg_name != ""
remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end)
end
end
sa
end

SharedArray(T, I::Int...; kwargs...) = SharedArray(T, I; kwargs...)


length(sa::SharedArray) = prod(sa.dims)
size(sa::SharedArray) = sa.dims
procs(sa::SharedArray) = sa.pids



function range_1dim(sa::SharedArray, n)
l = length(sa)
nw = length(sa.pids)
partlen = div(l, nw)

if n == nw
return (((n-1) * partlen) + 1):l
else
return (((n-1) * partlen) + 1):(n*partlen)
end
end

sub_1dim(sa::SharedArray, n) = sub(sa.loc_shmarr, range_1dim(sa, n))

function init_loc_flds(sa)
if myid() in sa.pids
sa.loc_pididx = findfirst(sa.pids, myid())
sa.loc_shmarr = fetch(sa.refs[sa.loc_pididx])
sa.loc_subarr_1d = sub_1dim(sa, sa.loc_pididx)
else
sa.loc_pididx = 0
end
end


# Don't serialize loc_shmarr (it is the complete array) and
# pididx, which is relevant to the current process only
function serialize(s, sa::SharedArray)
serialize_type(s, typeof(sa))
serialize(s, length(SharedArray.names))
for n in SharedArray.names
if n in [:loc_shmarr, :loc_pididx, :loc_subarr_1d]
writetag(s, UndefRefTag)
else
serialize(s, getfield(sa, n))
end
end
end

function deserialize{T,N}(s, t::Type{SharedArray{T,N}})
sa = invoke(deserialize, (Any, DataType), s, t)
init_loc_flds(sa)
if (sa.loc_pididx == 0)
error("SharedArray cannot be used on a non-participating process")
end
sa
end

convert(::Type{Array}, sa::SharedArray) = sa.loc_shmarr

# avoiding ambiguity warnings
getindex(sa::SharedArray, x::Real) = getindex(sa.loc_shmarr, x)
getindex(sa::SharedArray, x::AbstractArray) = getindex(sa.loc_shmarr, x)

# pass through getindex and setindex! - they always work on the complete array unlike DArrays
getindex(sa::SharedArray, args...) = getindex(sa.loc_shmarr, args...)
setindex!(sa::SharedArray, args...) = (setindex!(sa.loc_shmarr, args...); sa)

# convenience constructors
function shmem_fill(v, dims; kwargs...)
SharedArray(typeof(v), dims; init = S->fill!(S.loc_subarr_1d, v), kwargs...)
end
shmem_fill(v, I::Int...; kwargs...) = shmem_fill(v, I; kwargs...)

# rand variant with range
function shmem_rand(TR::Union(DataType, Range1), dims; kwargs...)
if isa(TR, Range1)
SharedArray(Int, dims; init = S -> map!((x)->rand(TR), S.loc_subarr_1d), kwargs...)
else
SharedArray(TR, dims; init = S -> map!((x)->rand(TR), S.loc_subarr_1d), kwargs...)
end
end
shmem_rand(TR::Union(DataType, Range1), i::Int; kwargs...) = shmem_rand(TR, (i,); kwargs...)
shmem_rand(TR::Union(DataType, Range1), I::Int...; kwargs...) = shmem_rand(TR, I; kwargs...)

shmem_rand(dims; kwargs...) = shmem_rand(Float64, dims; kwargs...)
shmem_rand(I::Int...; kwargs...) = shmem_rand(I; kwargs...)

function shmem_randn(dims; kwargs...)
SharedArray(Float64, dims; init = S-> map!((x)->randn(), S.loc_subarr_1d), kwargs...)
end
shmem_randn(I::Int...; kwargs...) = shmem_randn(I; kwargs...)



function print_shmem_limits(slen)
try
@linux_only pfx = "kernel"
@osx_only pfx = "kern.sysv"

shmmax_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmmax`)[1]))[end]), 1024*1024)
page_size = int(split(readall(readsfrom(`getconf PAGE_SIZE`)[1]))[end])
shmall_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmall`)[1]))[end]) * page_size, 1024*1024)

println("System max size of single shmem segment(MB) : ", shmmax_MB,
"\nSystem max size of all shmem segments(MB) : ", shmall_MB,
"\nRequested size(MB) : ", div(slen, 1024*1024),
"\nPlease ensure requested size is within system limits.",
"\nIf not, increase system limits and try again."
)
catch e
nothing # Ignore any errors in this...
end
end

# utilities
function shm_mmap_array(T, dims, shm_seg_name, mode)
local s = nothing
local A = nothing
try
fd_mem = shm_open(shm_seg_name, mode, S_IRUSR | S_IWUSR)
if !(fd_mem > 0)
error("shm_open() failed")
end

s = fdio(fd_mem, true)

# On OSX, ftruncate must to used to set size of segment, just lseek does not work.
# and only at creation time
if (mode & JL_O_CREAT) == JL_O_CREAT
rc = ccall(:ftruncate, Int, (Int, Int), fd_mem, prod(dims)*sizeof(T))
if rc != 0
ec = errno()
error("ftruncate() failed, errno : ", ec)
end
end

A = mmap_array(T, dims, s, 0, grow=false)
catch e
print_shmem_limits(prod(dims)*sizeof(T))
rethrow(e)

finally
if s != nothing
close(s)
end
end
A
end

@unix_only shm_unlink(shm_seg_name) = ccall(:shm_unlink, Cint, (Ptr{Uint8},), shm_seg_name)
@unix_only shm_open(shm_seg_name, oflags, permissions) = ccall(:shm_open, Int, (Ptr{Uint8}, Int, Int), shm_seg_name, oflags, permissions)


function assert_same_host(procs)
myip =
resp = Array(Any, length(procs))

@sync begin
for (i, p) in enumerate(procs)
@async resp[i] = remotecall_fetch(p, () -> getipaddr())
end
end

if !all(x->x==resp[1], resp)
error("SharedArray requires all requested processes to be on the same machine.")
end

return (resp[1] != getipaddr()) ? false : true
end
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ include("combinatorics.jl")
# distributed arrays and memory-mapped arrays
include("darray.jl")
include("mmap.jl")
include("sharedarray.jl")

# utilities - version, timing, help, edit, metaprogramming
include("sysinfo.jl")
Expand Down
22 changes: 22 additions & 0 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,28 @@ is ``DArray``\ -specific, but we list it here for completeness::
end



Shared Arrays (EXPERIMENTAL FEATURE)
------------------------------------

Shared Arrays use system shared memory to map the same array across many processes.

The constructor for a shared array is of the form
``SharedArray(T::Type, dims::NTuple; init=false, pids=workers())``
which creates a shared array of type ``T`` and size ``dims`` across the processes
specified by ``pids`` - all of which have to be on the same host.

If an ``init`` function of the type ``initfn(S::SharedArray)`` is specified,
it is called on all the participating workers.

Unlike distributed arrays, a shareds array is accessible only from those participating workers
specified by the ``pids`` named argument (and the creating process too, if it is on the same host).

Indexing into a shared array (both for setting as well as accessing values) is the same as
for a regular array.



ClusterManagers
---------------

Expand Down
31 changes: 31 additions & 0 deletions doc/stdlib/base.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4500,6 +4500,37 @@ Distributed Arrays

Get the vector of processors storing pieces of ``d``


Shared Arrays (EXPERIMENTAL FEATURE)
------------------------------------

.. function:: SharedArray(T::Type, dims::NTuple; init=false, pids=workers())

Construct a SharedArray of type ``T`` and size ``dims`` across the processes
specified by ``pids`` - all of which have to be on the same host.

If an ``init`` function of the type ``initfn(S::SharedArray)`` is specified,
it is called on all the participating workers.

The following fields in type ``SharedArray`` are initialized appropriately on each
participating process.

``loc_shmarr::Array{T,N}`` - the shared memory segment mapped appropriately into
the current process. Note: For indexed access it is NOT required to use this field.
A ``SharedArray`` object can be used just like a regular array.

``loc_pididx::Int`` - index of the current process into the ``pids`` vector. Can be
used while distributing computational work across participating workers

``loc_subarr_1d`` - a 1-d subarray of the entire array, when equally partitioned
across participating workers. Can be used as a simple work partitioning scheme.


.. function:: procs(sa::SharedArray)

Get the vector of processes that have mapped the shared array


System
------

Expand Down
Loading