Skip to content

Commit beeb195

Browse files
committed
Merge pull request #3649 from amitmurthy/amitm/cluster
Introduced ClusterManager. Externalized support for new cluster types.
2 parents fdbb2ad + b49faa3 commit beeb195

File tree

3 files changed

+140
-154
lines changed

3 files changed

+140
-154
lines changed

base/deprecated.jl

+10
Original file line numberDiff line numberDiff line change
@@ -274,3 +274,13 @@ function amap(f::Function, A::AbstractArray, axis::Integer)
274274

275275
return R
276276
end
277+
278+
function addprocs_scyld(np::Integer)
279+
error("Base.addprocs_scyld is deprecated - add package ClusterManagers and then use ClusterManagers.addprocs_scyld instead.")
280+
end
281+
export addprocs_scyld
282+
283+
function addprocs_sge(np::Integer)
284+
error("Base.addprocs_sge is deprecated - add package ClusterManagers and then use ClusterManagers.addprocs_sge instead.")
285+
end
286+
export addprocs_sge

base/exports.jl

+1-2
Original file line numberDiff line numberDiff line change
@@ -1092,8 +1092,6 @@ export
10921092

10931093
# multiprocessing
10941094
addprocs,
1095-
addprocs_scyld,
1096-
addprocs_sge,
10971095
fetch,
10981096
isready,
10991097
yield,
@@ -1110,6 +1108,7 @@ export
11101108
remotecall_wait,
11111109
take,
11121110
wait,
1111+
ClusterManager,
11131112

11141113
# distributed arrays
11151114
distribute,

base/multi.jl

+129-152
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
## julia starts with one process, and processors can be added using:
44
## addprocs(n) using exec
55
## addprocs({"host1","host2",...}) using remote execution
6-
## addprocs_scyld(n) using Scyld ClusterWare
7-
## addprocs_sge(n) using Sun Grid Engine batch queue
86
##
97
## remotecall(w, func, args...) -
108
## tell a worker to call a function on the given arguments.
@@ -210,7 +208,7 @@ function add_workers(pg::ProcessGroup, w::Array{Any,1})
210208
for i=1:length(w)
211209
send_msg_now(w[i], :join_pgrp, w[i].id, all_locs)
212210
end
213-
:ok
211+
[w[i].id for i in 1:length(w)]
214212
end
215213

216214
myid() = LPROC.id
@@ -860,7 +858,7 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
860858
end)
861859
end
862860

863-
function disable_parallel_libs()
861+
function disable_threaded_libs()
864862
blas_set_num_threads(1)
865863
end
866864

@@ -879,7 +877,7 @@ function start_worker(out::IO)
879877
# close STDIN; workers will not use it
880878
#close(STDIN)
881879

882-
disable_parallel_libs()
880+
disable_threaded_libs()
883881

884882
ccall(:jl_install_sigint_handler, Void, ())
885883

@@ -896,42 +894,86 @@ function start_worker(out::IO)
896894
exit(0)
897895
end
898896

899-
900-
function start_remote_workers(machines, cmds, tunnel=false, sshflags=``)
901-
n = length(cmds)
902-
outs = cell(n)
897+
function start_cluster_workers(n, config)
903898
w = cell(n)
904-
for i=1:n
905-
outs[i],_ = readsfrom(cmds[i])
906-
outs[i].line_buffered = true
899+
cman = config[:cman]
900+
901+
# Get the cluster manager to launch the instance
902+
(insttype, instances) = cman.launch_cb(n, config)
903+
904+
905+
if insttype == :io_only
906+
read_cb_response(inst) =
907+
begin
908+
(host, port) = read_worker_host_port(inst)
909+
inst, host, port
910+
end
911+
elseif insttype == :io_host
912+
read_cb_response(inst) =
913+
begin
914+
io = inst[1]
915+
(_, port) = read_worker_host_port(io)
916+
io, inst[2], port
917+
end
918+
elseif insttype == :io_host_port
919+
read_cb_response(inst) = (inst[1], inst[2], inst[3])
920+
elseif insttype == :host_port
921+
read_cb_response(inst) = (nothing, inst[1], inst[2])
922+
elseif insttype == :cmd
923+
read_cb_response(inst) =
924+
begin
925+
io,_ = readsfrom(detach(inst))
926+
io.line_buffered = true
927+
(host, port) = read_worker_host_port(io)
928+
io, host, port
929+
end
930+
else
931+
error("Unsupported format from Cluster Manager callback")
907932
end
933+
908934
for i=1:n
909-
local hostname::String, port::Int16
910-
stream = outs[i]
911-
stream.line_buffered = true
912-
while true
913-
conninfo = readline(stream)
914-
private_hostname, port = parse_connection_info(conninfo)
915-
if private_hostname != ""
916-
break
917-
end
935+
(io, host, port) = read_cb_response(instances[i])
936+
w[i] = create_worker(host, port, io, config)
937+
end
938+
w
939+
end
940+
941+
function read_worker_host_port (io::IO)
942+
io.line_buffered = true
943+
while true
944+
conninfo = readline(io)
945+
private_hostname, port = parse_connection_info(conninfo)
946+
if private_hostname != ""
947+
return private_hostname, port
918948
end
919-
920-
s = split(machines[i],'@')
921-
if length(s) > 1
922-
user = s[1]
923-
hostname = s[2]
924-
else
949+
end
950+
end
951+
952+
function create_worker(hostname, port, stream, config)
953+
tunnel = config[:tunnel]
954+
955+
s = split(hostname,'@')
956+
if length(s) > 1
957+
user = s[1]
958+
hostname = s[2]
959+
else
960+
if haskey(ENV, "USER")
925961
user = ENV["USER"]
926-
hostname = s[1]
927-
end
928-
929-
if tunnel
930-
w[i] = Worker(hostname, port, user, sshflags)
931-
else
932-
w[i] = Worker(hostname, port)
962+
elseif tunnel
963+
error("USER must be specified either in the environment or as part of the hostname when tunnel option is used.")
933964
end
934-
let wrker = w[i]
965+
hostname = s[1]
966+
end
967+
968+
if tunnel
969+
sshflags = config[:sshflags]
970+
w = Worker(hostname, port, user, sshflags)
971+
else
972+
w = Worker(hostname, port)
973+
end
974+
975+
if isa(stream, AsyncStream)
976+
let wrker = w
935977
# redirect console output from workers to the client's stdout:
936978
start_reading(stream,function(stream::AsyncStream,nread::Int)
937979
if nread>0
@@ -950,6 +992,7 @@ function start_remote_workers(machines, cmds, tunnel=false, sshflags=``)
950992
w
951993
end
952994

995+
953996
function parse_connection_info(str)
954997
m = match(r"^julia_worker:(\d+)#(.*)", str)
955998
if m != nothing
@@ -977,135 +1020,69 @@ function ssh_tunnel(user, host, port, sshflags)
9771020
localp
9781021
end
9791022

980-
#function worker_ssh_cmd(host, key)
981-
# `ssh -i $key -n $host "sh -l -c \"cd $JULIA_HOME && ./julia-release-basic --worker\""`
982-
#end
9831023

984-
# start and connect to processes via SSH.
985-
# optionally through an SSH tunnel.
986-
# the tunnel is only used from the head (process 1); the nodes are assumed
987-
# to be mutually reachable without a tunnel, as is often the case in a cluster.
988-
function addprocs(machines::AbstractVector;
989-
tunnel=false, dir=JULIA_HOME, exename="./julia-release-basic", sshflags::Cmd=``)
990-
add_workers(PGRP,
991-
start_remote_workers(machines,
992-
map(m->detach(`ssh -n $sshflags $m "sh -l -c \"cd $dir && $exename --worker\""`),
993-
machines),
994-
tunnel, sshflags))
995-
end
996-
997-
#function addprocs_ssh(machines, keys)
998-
# if !(isa(keys, Array)) && isa(machines,Array)
999-
# key = keys
1000-
# keys = [ key for x = 1:length(machines)]
1001-
# cmdargs = { {machines[x],keys[x]} for x = 1:length(machines)}
1002-
# else
1003-
# cmdargs = {{machines,keys}}
1004-
# end #if/else
1005-
# add_workers(PGRP, start_remote_workers(machines, map(x->worker_ssh_cmd(x[1],x[2]), cmdargs)))
1006-
#end
1024+
abstract ClusterManager
10071025

1008-
worker_local_cmd() = `$JULIA_HOME/julia-release-basic --bind-to $bind_addr --worker`
1009-
1010-
function addprocs(np::Integer)
1011-
disable_parallel_libs()
1012-
add_workers(PGRP, start_remote_workers({ "localhost" for i=1:np },
1013-
{ worker_local_cmd() for i=1:np }))
1014-
end
1015-
1016-
function start_scyld_workers(np::Integer)
1017-
home = JULIA_HOME
1018-
beomap_cmd = `beomap --no-local --np $np`
1019-
out,beomap_proc = readsfrom(beomap_cmd)
1020-
wait(beomap_proc)
1021-
if !success(beomap_proc)
1022-
error("node availability inaccessible (could not run beomap)")
1023-
end
1024-
nodes = split(chomp(readline(out)),':')
1025-
outs = cell(np)
1026-
for (i,node) in enumerate(nodes)
1027-
cmd = detach(`bpsh $node sh -l -c "cd $home && ./julia-release-basic --worker"`)
1028-
outs[i],_ = readsfrom(cmd)
1029-
outs[i].line_buffered = true
1030-
end
1031-
workers = cell(np)
1032-
for (i,stream) in enumerate(outs)
1033-
local hostname::String, port::Int16
1034-
stream.line_buffered = true
1035-
while true
1036-
conninfo = readline(stream)
1037-
hostname, port = parse_connection_info(conninfo)
1038-
if hostname != ""
1039-
break
1040-
end
1041-
end
1042-
workers[i] = Worker(hostname, port)
1043-
let worker = workers[i]
1044-
# redirect console output from workers to the client's stdout:
1045-
start_reading(stream,function(stream::AsyncStream,nread::Int)
1046-
if(nread>0)
1047-
try
1048-
line = readbytes(stream.buffer, nread)
1049-
print("\tFrom worker $(worker.id):\t",line)
1050-
catch err
1051-
println(STDERR,"\tError parsing reply from worker $(worker.id):\t",err)
1052-
return false
1053-
end
1054-
end
1055-
true
1056-
end)
1026+
function launch_procs(n::Integer, config::Dict)
1027+
dir = config[:dir]
1028+
exename = config[:exename]
1029+
exeflags = config[:exeflags]
1030+
1031+
cman = config[:cman]
1032+
if cman.ssh
1033+
sshflags = config[:sshflags]
1034+
outs=cell(n)
1035+
for i in 1:n
1036+
m = cman.machines[i]
1037+
cmd = detach(`ssh -n $sshflags $m "sh -l -c \"cd $dir && $exename $exeflags\""`)
1038+
io,_ = readsfrom(cmd)
1039+
io.line_buffered = true
1040+
local port::Int16
1041+
(_, port) = read_worker_host_port (io)
1042+
1043+
# We ignore the hostname printed by the worker, since the worker may be behind a NATed interface,
1044+
# we just use the hostname specified by the caller as part of the machine name
1045+
outs[i] = (io, m, port)
10571046
end
1047+
return (:io_host_port, outs)
1048+
1049+
else
1050+
worker_local_cmd = `$(dir)/$(exename) --bind-to $bind_addr $exeflags`
1051+
return (:cmd, {worker_local_cmd for i in 1:n})
10581052
end
1059-
workers
10601053
end
10611054

1062-
function addprocs_scyld(np::Integer)
1063-
disable_parallel_libs()
1064-
add_workers(PGRP, start_scyld_workers(np))
1055+
immutable RegularCluster <: ClusterManager
1056+
launch_cb::Function
1057+
ssh::Bool
1058+
machines
1059+
1060+
RegularCluster(; ssh=false, machines=[]) = new(launch_procs, ssh, machines)
10651061
end
10661062

1067-
function start_sge_workers(n)
1068-
home = JULIA_HOME
1069-
sgedir = joinpath(pwd(),"SGE")
1070-
run(`mkdir -p $sgedir`)
1071-
qsub_cmd = `echo $home/julia-release-basic --worker` |> `qsub -N JULIA -terse -cwd -j y -o $sgedir -t 1:$n`
1072-
out,qsub_proc = readsfrom(qsub_cmd)
1073-
if !success(qsub_proc)
1074-
error("batch queue not available (could not run qsub)")
1075-
end
1076-
id = chomp(split(readline(out),'.')[1])
1077-
println("job id is $id")
1078-
print("waiting for job to start");
1079-
workers = cell(n)
1080-
for i=1:n
1081-
# wait for each output stream file to get created
1082-
fname = "$sgedir/JULIA.o$(id).$(i)"
1083-
local fl, hostname, port
1084-
fexists = false
1085-
sleep(0.5)
1086-
while !fexists
1087-
try
1088-
fl = open(fname)
1089-
try
1090-
conninfo = readline(fl)
1091-
hostname, port = parse_connection_info(conninfo)
1092-
finally
1093-
close(fl)
1094-
end
1095-
fexists = (hostname != "")
1096-
catch
1097-
print(".");
1098-
sleep(0.5)
1099-
end
1063+
# start and connect to processes via SSH.
1064+
# optionally through an SSH tunnel.
1065+
# the tunnel is only used from the head (process 1); the nodes are assumed
1066+
# to be mutually reachable without a tunnel, as is often the case in a cluster.
1067+
function addprocs(instances::Union(AbstractVector, Integer);
1068+
tunnel=false, dir=JULIA_HOME, exename="./julia-release-basic", sshflags::Cmd=``, cman=nothing)
1069+
1070+
config={:dir=>dir, :exename=>exename, :exeflags=>` --worker `, :tunnel=>tunnel, :sshflags=>sshflags}
1071+
disable_threaded_libs()
1072+
1073+
if isa(instances, AbstractVector) && (cman == nothing)
1074+
config[:cman] = RegularCluster(ssh=true, machines=instances)
1075+
return add_workers(PGRP, start_cluster_workers(length(instances), config))
1076+
else
1077+
if isa(cman, ClusterManager)
1078+
config[:cman] = cman
1079+
else
1080+
config[:cman] = RegularCluster()
11001081
end
1101-
#print("hostname=$hostname, port=$port\n")
1102-
workers[i] = Worker(hostname, port)
1082+
return add_workers(PGRP, start_cluster_workers(instances, config))
11031083
end
1104-
print("\n")
1105-
workers
11061084
end
11071085

1108-
addprocs_sge(n) = add_workers(PGRP, start_sge_workers(n))
11091086

11101087
## higher-level functions: spawn, pmap, pfor, etc. ##
11111088

0 commit comments

Comments
 (0)