diff --git a/base/client.jl b/base/client.jl index a6137abea69ed..8c3a0be396a75 100644 --- a/base/client.jl +++ b/base/client.jl @@ -189,7 +189,7 @@ function init_bind_addr(args::Vector{UTF8String}) btoidx = findfirst(args, "--bind-to") if btoidx > 0 bind_to = split(args[btoidx+1], ":") - bind_addr = parseip(bind_to[1]) + bind_addr = string(parseip(bind_to[1])) if length(bind_to) > 1 bind_port = parseint(bind_to[2]) else @@ -198,11 +198,11 @@ function init_bind_addr(args::Vector{UTF8String}) else bind_port = 0 try - bind_addr = getipaddr() + bind_addr = string(getipaddr()) catch # All networking is unavailable, initialize bind_addr to the loopback address # Will cause an exception to be raised only when used. - bind_addr = ip"127.0.0.1" + bind_addr = "127.0.0.1" end end global LPROC @@ -355,11 +355,12 @@ function load_juliarc() end function load_machine_file(path::AbstractString) - machines = AbstractString[] + machines = [] for line in split(readall(path),'\n'; keep=false) - s = split(line,'*'; keep=false) + s = map!(strip, split(line,'*'; keep=false)) if length(s) > 1 - append!(machines,fill(s[2],int(s[1]))) + cnt = isnumber(s[1]) ? int(s[1]) : symbol(s[1]) + push!(machines,(s[2], cnt)) else push!(machines,line) end diff --git a/base/exports.jl b/base/exports.jl index f9cad968ff2b0..807b98fbdc7ff 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -115,6 +115,7 @@ export WeakKeyDict, WeakRef, Woodbury, + WorkerConfig, WString, Zip, diff --git a/base/multi.jl b/base/multi.jl index f8ac15ca1d5dc..c9a0abb1ddc40 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -79,52 +79,62 @@ end abstract ClusterManager +type WorkerConfig + # Common fields relevant to all cluster managers + io::Nullable{IO} + host::Nullable{AbstractString} + port::Nullable{Integer} + + # Used when launching additional workers at a host + count::Nullable{Union(Int, Symbol)} + exeflags::Nullable{Cmd} + + # External cluster managers can use this to store information at a per-worker level + # Can be a dict if multiple fields need to be stored. + userdata::Nullable{Any} + + # SSHManager / SSH tunnel connections to workers + tunnel::Nullable{Bool} + bind_addr::Nullable{AbstractString} + sshflags::Nullable{Cmd} + max_parallel::Nullable{Integer} + + # Used by Local/SSH managers + connect_at::Nullable{Any} + + process::Nullable{Process} + ospid::Nullable{Integer} + + # Private dictionary used to store temporary information by Local/SSH managers. + environ::Nullable{Dict} + + function WorkerConfig() + wc=new() + for n in names(WorkerConfig) + T = eltype(fieldtype(WorkerConfig, n)) + setfield!(wc, n, Nullable{T}()) + end + wc + end +end + type Worker - host::ByteString - port::UInt16 - socket::TCPSocket + id::Int + r_stream::AsyncStream + w_stream::AsyncStream + manager::ClusterManager + config::WorkerConfig + sendbuf::IOBuffer del_msgs::Array{Any,1} add_msgs::Array{Any,1} - id::Int gcflag::Bool - bind_addr::IPAddr - manager::ClusterManager - config::Dict - Worker(host::AbstractString, port::Integer, sock::TCPSocket, id::Int) = - new(bytestring(host), uint16(port), sock, IOBuffer(), [], [], id, false) -end -Worker(host::AbstractString, port::Integer, sock::TCPSocket) = - Worker(host, port, sock, 0) -function Worker(host::AbstractString, port::Integer) - # Connect to the loopback port if requested host has the same ipaddress as self. - if host == string(LPROC.bind_addr) - w = Worker(host, port, connect("127.0.0.1", uint16(port))) - else - w = Worker(host, port, connect(host, uint16(port))) - end - # Avoid calling getaddrinfo if possible - involves a DNS lookup - # host may be a stringified ipv4 / ipv6 address or a dns name - if host == "localhost" - w.bind_addr = parseip("127.0.0.1") - else - try - w.bind_addr = parseip(host) - catch - w.bind_addr = getaddrinfo(host) - end - end - w -end -function Worker(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags) - w = Worker(host, port, - connect("localhost", - ssh_tunnel(tunnel_user, host, bind_addr, uint16(port), sshflags))) - w.bind_addr = parseip(bind_addr) - w + Worker(id, r_stream, w_stream, manager, config) = + new(id, r_stream, w_stream, manager, config, IOBuffer(), [], [], false) end +Worker(id, r_stream, w_stream, manager) = Worker(id, r_stream, w_stream, manager, WorkerConfig()) function send_msg_now(w::Worker, kind, args...) send_msg_(w, kind, args, true) @@ -151,7 +161,7 @@ function flush_gc_msgs(w::Worker) end #TODO: Move to different Thread -function enq_send_req(sock::TCPSocket, buf, now::Bool) +function enq_send_req(sock::AsyncStream, buf, now::Bool) arr=takebuf_array(buf) write(sock,arr) #TODO implement "now" @@ -168,7 +178,7 @@ function send_msg_(w::Worker, kind, args, now::Bool) if !now && w.gcflag flush_gc_msgs(w) else - enq_send_req(w.socket,buf,now) + enq_send_req(w.w_stream, buf, now) end end @@ -187,7 +197,7 @@ end type LocalProcess id::Int - bind_addr::IPAddr + bind_addr::AbstractString bind_port::UInt16 LocalProcess() = new(1) end @@ -219,16 +229,14 @@ end const PGRP = ProcessGroup([]) get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid)) -function get_bind_addr(w::Union(Worker, LocalProcess)) - if !isdefined(w, :bind_addr) +get_bind_addr(w::LocalProcess) = LPROC.bind_addr +function get_bind_addr(w::Worker) + if isnull(w.config.bind_addr) if w.id != myid() - w.bind_addr = remotecall_fetch(w.id, get_bind_addr, w.id) - else - error("LPROC.bind_addr not defined") # Should never happend since LPROC.bind_addr - # is defined early on during process init. + w.config.bind_addr = remotecall_fetch(w.id, get_bind_addr, w.id) end end - w.bind_addr + w.config.bind_addr end function add_worker(pg::ProcessGroup, w) @@ -236,17 +244,16 @@ function add_worker(pg::ProcessGroup, w) # has the full list of address:port assert(LPROC.id == 1) rr_join = RemoteRef() - w.id = get_next_pid() register_worker(w) - create_message_handler_loop(w.socket; ntfy_join_complete=rr_join) + process_messages(w.r_stream, w.w_stream; ntfy_join_complete=rr_join) - all_locs = map(x -> isa(x, Worker) ? (string(x.bind_addr), x.port, x.id, isa(x.manager, LocalManager)) : ("", 0, x.id, true), pg.workers) + all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id, isa(x.manager, LocalManager)) : ((), x.id, true), pg.workers) send_msg_now(w, :join_pgrp, w.id, all_locs, isa(w.manager, LocalManager)) @schedule manage(w.manager, w.id, w.config, :register) - (w.id, rr_join) + rr_join end myid() = LPROC.id @@ -260,8 +267,8 @@ end procs() = Int[x.id for x in PGRP.workers] function procs(pid::Integer) if myid() == 1 - if (pid == 1) || isa(map_pid_wrkr[pid].manager, LocalManager) - Int[x.id for x in filter(w -> (w.id==1) || isa(w.manager, LocalManager), PGRP.workers)] + if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager)) + Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), PGRP.workers)] else ipatpid = get_bind_addr(pid) Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, PGRP.workers)] @@ -336,7 +343,7 @@ end function worker_id_from_socket(s) w = get(map_sock_wrkr, s, nothing) if isa(w,Worker) - if is(s, w.socket) || is(s, w.sendbuf) + if is(s, w.r_stream) || is(s, w.w_stream) || is(s, w.sendbuf) return w.id end end @@ -347,13 +354,13 @@ function worker_id_from_socket(s) return -1 end - register_worker(w) = register_worker(PGRP, w) function register_worker(pg, w) push!(pg.workers, w) map_pid_wrkr[w.id] = w if isa(w, Worker) - map_sock_wrkr[w.socket] = w + map_sock_wrkr[w.r_stream] = w + map_sock_wrkr[w.w_stream] = w map_sock_wrkr[w.sendbuf] = w end end @@ -363,7 +370,10 @@ function deregister_worker(pg, pid) pg.workers = filter(x -> !(x.id == pid), pg.workers) w = pop!(map_pid_wrkr, pid, nothing) if isa(w, Worker) - pop!(map_sock_wrkr, w.socket) + pop!(map_sock_wrkr, w.r_stream) + if w.r_stream != w.w_stream + pop!(map_sock_wrkr, w.w_stream) + end pop!(map_sock_wrkr, w.sendbuf) # Notify the cluster manager of this workers death @@ -793,57 +803,71 @@ function accept_handler(server::TCPServer, status::Int32) error("an error occured during the creation of the server") end client = accept_nonblock(server) - create_message_handler_loop(client) + process_messages(client, client) end -function create_message_handler_loop(sock::AsyncStream; ntfy_join_complete=nothing) #returns immediately +function process_messages(r_stream::TCPSocket, w_stream::TCPSocket; kwargs...) + @schedule begin + disable_nagle(r_stream) + start_reading(r_stream) + wait_connected(r_stream) + if r_stream != w_stream + disable_nagle(w_stream) + wait_connected(w_stream) + end + create_message_handler_loop(r_stream, w_stream; kwargs...) + end +end + +function process_messages(r_stream::AsyncStream, w_stream::AsyncStream; kwargs...) + create_message_handler_loop(r_stream, w_stream; kwargs...) +end + +function create_message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream; ntfy_join_complete=nothing) #returns immediately @schedule begin global PGRP - #println("message_handler_loop") - disable_nagle(sock) - start_reading(sock) - wait_connected(sock) + global cluster_manager try while true - msg = deserialize(sock) - # println("got msg: ",msg) + msg = deserialize(r_stream) + #println("got msg: ",msg) # handle message if is(msg, :call) - id = deserialize(sock) + id = deserialize(r_stream) #print("$(myid()) got id $id\n") - f0 = deserialize(sock) + f0 = deserialize(r_stream) #print("$(myid()) got call $f0\n") - args0 = deserialize(sock) + args0 = deserialize(r_stream) #print("$(myid()) got args $args0\n") let f=f0, args=args0 schedule_call(id, ()->f(args...)) end elseif is(msg, :call_fetch) - id = deserialize(sock) - f = deserialize(sock) - args = deserialize(sock) + id = deserialize(r_stream) + f = deserialize(r_stream) + args = deserialize(r_stream) let f=f, args=args, id=id, msg=msg @schedule begin v = run_work_thunk(()->f(args...)) - deliver_result(sock, msg, id, v) + deliver_result(w_stream, msg, id, v) v end end elseif is(msg, :call_wait) - id = deserialize(sock) - notify_id = deserialize(sock) - f = deserialize(sock) - args = deserialize(sock) + id = deserialize(r_stream) + notify_id = deserialize(r_stream) + f = deserialize(r_stream) + args = deserialize(r_stream) let f=f, args=args, id=id, msg=msg, notify_id=notify_id @schedule begin rv = schedule_call(id, ()->f(args...)) - deliver_result(sock, msg, notify_id, wait_full(rv)) + deliver_result(w_stream, msg, notify_id, wait_full(rv)) end end elseif is(msg, :do) - f = deserialize(sock) - args = deserialize(sock) + f = deserialize(r_stream) + args = deserialize(r_stream) #print("got args: $args\n") let f=f, args=args @schedule begin @@ -852,39 +876,32 @@ function create_message_handler_loop(sock::AsyncStream; ntfy_join_complete=nothi end elseif is(msg, :result) # used to deliver result of wait or fetch - oid = deserialize(sock) + oid = deserialize(r_stream) #print("$(myid()) got $msg $oid\n") - val = deserialize(sock) + val = deserialize(r_stream) put!(lookup_ref(oid), val) elseif is(msg, :identify_socket) - otherid = deserialize(sock) - register_worker(Worker("", 0, sock, otherid)) + otherid = deserialize(r_stream) + register_worker(Worker(otherid, r_stream, w_stream, cluster_manager)) elseif is(msg, :join_pgrp) - # first connection; get process group info from client - self_pid = LPROC.id = deserialize(sock) - locs = deserialize(sock) - self_is_local = deserialize(sock) - #print("\nLocation: ",locs,"\nId:",myid(),"\n") - # joining existing process group - - controller = Worker("", 0, sock, 1) + self_pid = LPROC.id = deserialize(r_stream) + locs = deserialize(r_stream) + self_is_local = deserialize(r_stream) + controller = Worker(1, r_stream, w_stream, cluster_manager) register_worker(controller) register_worker(LPROC) - for (rhost, rport, rpid, r_is_local) in locs + for (connect_at, rpid, r_is_local) in locs if (rpid < self_pid) && (!(rpid == 1)) # Connect to them - if self_is_local && r_is_local - # If on localhost, use the loopback address - this addresses - # the special case of system suspend wherein the local ip - # may be changed upon system awake. - w = Worker("127.0.0.1", rport) - else - w = Worker(rhost, rport) - end - w.id = rpid + wconfig = WorkerConfig() + wconfig.connect_at = connect_at + wconfig.environ = AnyDict(:self_is_local=>self_is_local, :r_is_local=>r_is_local) + + (r_s, w_s) = connect(cluster_manager, rpid, wconfig) + w = Worker(rpid, r_s, w_s, cluster_manager, wconfig) register_worker(w) - create_message_handler_loop(w.socket) + process_messages(w.r_stream, w.w_stream) send_msg_now(w, :identify_socket, self_pid) else # Others will connect to us. Don't do anything just yet @@ -892,19 +909,27 @@ function create_message_handler_loop(sock::AsyncStream; ntfy_join_complete=nothi end end - send_msg_now(controller, :join_complete) + send_msg_now(controller, :join_complete, Sys.CPU_CORES, getpid()) elseif is(msg, :join_complete) - put!(ntfy_join_complete, :join_complete) + w = map_sock_wrkr[r_stream] + + environ = get(w.config.environ, Dict()) + environ[:cpu_cores] = deserialize(r_stream) + w.config.environ = environ + + w.config.ospid = deserialize(r_stream) + + put!(ntfy_join_complete, w.id) ntfy_join_complete = nothing # so that it gets gc'ed end end # end of while catch e - iderr = worker_id_from_socket(sock) + iderr = worker_id_from_socket(r_stream) # If error occured talking to pid 1, commit harakiri if iderr == 1 - if isopen(sock) + if isopen(w_stream) print(STDERR, "fatal error on ", myid(), ": ") display_error(e, catch_backtrace()) end @@ -916,7 +941,8 @@ function create_message_handler_loop(sock::AsyncStream; ntfy_join_complete=nothi # to each other on unhandled errors deregister_worker(iderr) - if isopen(sock) close(sock) end + if isopen(r_stream) close(r_stream) end + if isopen(w_stream) close(w_stream) end if (myid() == 1) global rmprocset @@ -951,6 +977,7 @@ function start_worker(out::IO) # exit when process 1 shut down. Don't yet know why. #redirect_stderr(STDOUT) + init_worker() if LPROC.bind_port == 0 (actual_port,sock) = listenany(uint16(9009)) LPROC.bind_port = actual_port @@ -966,7 +993,6 @@ function start_worker(out::IO) # close STDIN; workers will not use it #close(STDIN) - disable_threaded_libs() disable_nagle(sock) try @@ -980,48 +1006,147 @@ function start_worker(out::IO) exit(0) end -function read_cb_response(io::IO, config::Dict) - (host, port) = read_worker_host_port(io) - return (io, host, port, host, config) -end +function start_cluster_workers(manager, params, resp_arr, launched_ntfy) + # Get the cluster manager to launch the instance + #print("start_cluster_workers\n") + instances_ntfy = Condition() -function read_cb_response(io::IO, host::AbstractString, config::Dict) - (bind_addr, port) = read_worker_host_port(io) - return (io, bind_addr, port, host, config) -end + launched = WorkerConfig[] + t = @schedule try + launch(manager, params, launched, instances_ntfy) + catch e + print(STDERR, "Error launching workers with $T : $e\n") + end -read_cb_response(io::IO, host::AbstractString, port::Integer, config::Dict) = (io, host, port, host, config) + @sync begin + while true + if length(launched) == 0 + if istaskdone(t) + break + end + @schedule (sleep(1); notify(instances_ntfy)) + wait(instances_ntfy) + end + + if length(launched) > 0 + wconfig = shift!(launched) + rr = connect_n_create_worker(manager, get_next_pid(), wconfig) + let rr=rr + @async launch_additional(worker_from_id(fetch(rr)), resp_arr, launched_ntfy) + end -read_cb_response(host::AbstractString, port::Integer, config::Dict) = (nothing, host, port, host, config) + push!(resp_arr, rr) + notify(launched_ntfy) + end + end + end + notify(launched_ntfy) +end -function start_cluster_workers(np::Integer, config::Dict, manager::ClusterManager, resp_arr::Array, launched_ntfy::Condition) - # Get the cluster manager to launch the instance - instance_sets = [] - instances_ntfy = Condition() +function launch_additional(w::Worker, resp_arr::Array, launched_ntfy::Condition) + cnt = get(w.config.count, 1) + if cnt == :auto + cnt = get(w.config.environ)[:cpu_cores] + end + cnt = cnt - 1 # Removing self from the requested number - t = @schedule launch(manager, np, config, instance_sets, instances_ntfy) + if cnt > 0 + npids = [get_next_pid() for x in 1:cnt] + new_workers = remotecall_fetch(w.id, launch_additional, cnt, npids, get(w.config.exeflags, ``)) - while true - if (length(instance_sets) == 0) - istaskdone(t) && break - @schedule (sleep(1); notify(instances_ntfy)) - wait(instances_ntfy) + # keyword argument max_parallel is only relevant for concurrent ssh connections to a unique host + # Post launch, ssh from master to workers is used only if tunnel is true + num_new_w = length(new_workers) + tunnel = get(w.config.tunnel, false) + maxp = get(w.config.max_parallel, 0) + + if tunnel && (maxp > 0) + num_in_p = min(maxp, num_new_w) + control_rrs = [RemoteRef() for i in 1:num_in_p] + else + num_in_p = 0 # Do not rate-limit connect + control_rrs = [] end - if length(instance_sets) > 0 - instances = shift!(instance_sets) - for inst in instances - (io, bind_addr, port, pubhost, wconfig) = read_cb_response(inst...) - push!(resp_arr, create_worker(bind_addr, port, pubhost, io, wconfig, manager)) - notify(launched_ntfy) + @sync for (i, address) in enumerate(new_workers) + (pid, bind_addr, port) = address + + wconfig = WorkerConfig() + for x in [:host, :tunnel, :sshflags, :exeflags] + setfield!(wconfig, x, getfield(w.config, x)) + end + wconfig.bind_addr = bind_addr + wconfig.port = port + + rridx = num_in_p > 0 ? (num_new_w % num_in_p) + 1 : 0 + let pid=pid, wconfig=wconfig, rridx=rridx + @async try + (rridx > 0) && take!(control_rrs[rridx]) + rr = connect_n_create_worker(w.manager, pid, wconfig) + (rridx > 0) && put!(control_rrs[rridx], :OK) + + push!(resp_arr, rr) + notify(launched_ntfy) + catch e + print(STDERR, "Error connecting to additional worker : $(e)\n") + end end end + for rr in control_rrs + put(rr, :OK) + end end +end - notify(launched_ntfy) +function connect_n_create_worker(manager, pid, wconfig) + (r_s, w_s) = connect(manager, pid, wconfig) + + w = Worker(pid, r_s, w_s, manager, wconfig) + # install a finalizer to perform cleanup if necessary + finalizer(w, (w)->if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end) + + # performs initial handshake with new worker. Returns a remoteref we can wait on for completion. + rr = add_worker(PGRP, w) +end + + +function launch_additional(np::Integer, pids::Array, exeflags::Cmd) + assert(np == length(pids)) + + io_objs = cell(np) + addresses = cell(np) + + for i in 1:np + io, pobj = open(detach(`$(JULIA_HOME)/$(exename()) $exeflags`), "r") + io_objs[i] = io + end + + for (i,io) in enumerate(io_objs) + (host, port) = read_worker_host_port(io) + addresses[i] = (pids[i], host, port) + + let io=io, pid=pids[i] + redirect_worker_output("$pid", io) + end + end + + addresses end +function redirect_worker_output(ident, stream) + @schedule while !eof(stream) + line = readline(stream) + if beginswith(line, "\tFrom worker ") + print(line) + else + print("\tFrom worker $(ident):\t$line") + end + end +end + + + function read_worker_host_port(io::IO) io.line_buffered = true while true @@ -1033,8 +1158,95 @@ function read_worker_host_port(io::IO) end end -function create_worker(bind_addr, port, pubhost, stream, config, manager) - tunnel = config[:tunnel] +function parse_connection_info(str) + m = match(r"^julia_worker:(\d+)#(.*)", str) + if m != nothing + (m.captures[2], parseint(Int16, m.captures[1])) + else + ("", int16(-1)) + end +end + +let tunnel_port = 9201 + global next_tunnel_port + function next_tunnel_port() + retval = tunnel_port + if tunnel_port > 32000 + tunnel_port = 9201 + else + tunnel_port += 1 + end + retval + end +end + + + +# establish an SSH tunnel to a remote worker +# returns P such that localhost:P connects to host:port +function ssh_tunnel(user, host, bind_addr, port, sshflags) + localp = next_tunnel_port() + ntries = cnt = 100 + while !success(detach(`ssh -T -a -x -o ExitOnForwardFailure=yes -f $sshflags $(user)@$host -L $localp:$bind_addr:$(int(port)) sleep 60`)) && cnt > 0 + localp = next_tunnel_port() + cnt -= 1 + end + (cnt == 0) && error("Unable to create SSH tunnel after $cnt tries. No free port?") + + localp +end + +immutable LocalManager <: ClusterManager + np::Integer +end + +function init_worker(manager::ClusterManager=DefaultClusterManager()) + global cluster_manager + cluster_manager = manager + disable_threaded_libs() +end + +show(io::IO, manager::LocalManager) = println(io, "LocalManager()") + +function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition) + dir = params[:dir] + exename = params[:exename] + exeflags = params[:exeflags] + + for i in 1:manager.np + io, pobj = open(detach(`$(dir)/$(exename) $exeflags --bind-to $(LPROC.bind_addr) --worker`), "r") + wconfig = WorkerConfig() + wconfig.process = pobj + wconfig.io = io + push!(launched, wconfig) + end + + notify(c) +end + +function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol) + if op == :interrupt + kill(get(config.process), 2) + end +end + +function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) + if !isnull(config.connect_at) + # this is a worker-to-worker setup call. + return connect_w2w(pid, config) + end + + # master connecting to workers + if !isnull(config.io) + (bind_addr, port) = read_worker_host_port(get(config.io)) + pubhost=get(config.host, bind_addr) + else + pubhost=get(config.host) + port=get(config.port) + bind_addr=get(config.bind_addr, pubhost) + end + + tunnel = get(config.tunnel, false) s = split(pubhost,'@') user = "" @@ -1050,101 +1262,94 @@ function create_worker(bind_addr, port, pubhost, stream, config, manager) end if tunnel - sshflags = config[:sshflags] - w = Worker(pubhost, bind_addr, port, user, sshflags) + sshflags = get(config.sshflags) + (s, bind_addr) = connect_to_worker(pubhost, bind_addr, port, user, sshflags) else - w = Worker(bind_addr, port) + (s, bind_addr) = connect_to_worker(bind_addr, port) end - w.config = config - w.manager = manager + config.host = pubhost + config.port = port + config.bind_addr = bind_addr - if isa(stream, AsyncStream) - let wrker = w - # redirect console output from workers to the client's stdout: - @async begin - while !eof(stream) - line = readline(stream) - print("\tFrom worker $(wrker.id):\t$line") - end - end + # write out a subset of the connect_at required for further worker-worker connection setups + config.connect_at = (bind_addr, port) + + if !isnull(config.io) + let pid = pid + redirect_worker_output(pid, get(config.io)) end end - # install a finalizer to perform cleanup if necessary - finalizer(w, (w)->if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end) - - w + (s, s) end - -function parse_connection_info(str) - m = match(r"^julia_worker:(\d+)#(.*)", str) - if m != nothing - (m.captures[2], parseint(Int16, m.captures[1])) +function connect_w2w(pid::Int, config::WorkerConfig) + (rhost, rport) = get(config.connect_at) + config.host = rhost + config.port = rport + if get(get(config.environ), :self_is_local, false) && get(get(config.environ), :r_is_local, false) + # If on localhost, use the loopback address - this addresses + # the special case of system suspend wherein the local ip + # may be changed upon system awake. + (s, bind_addr) = connect_to_worker("127.0.0.1", rport) else - ("", int16(-1)) + (s, bind_addr)= connect_to_worker(rhost, rport) end + + (s,s) end -tunnel_port = 9201 -# establish an SSH tunnel to a remote worker -# returns P such that localhost:P connects to host:port -function ssh_tunnel(user, host, bind_addr, port, sshflags) - global tunnel_port - localp = tunnel_port::Int - while !success(detach(`ssh -T -a -x -o ExitOnForwardFailure=yes -f $sshflags $(user)@$host -L $localp:$bind_addr:$(int(port)) sleep 60`)) && localp < 10000 - localp += 1 - end - if localp >= 10000 - error("unable to assign a local tunnel port between 9201 and 10000") +function connect_to_worker(host::AbstractString, port::Integer) + # Connect to the loopback port if requested host has the same ipaddress as self. + if host == string(LPROC.bind_addr) + s = connect("127.0.0.1", uint16(port)) + else + s = connect(host, uint16(port)) end - tunnel_port = localp+1 - localp + # Avoid calling getaddrinfo if possible - involves a DNS lookup + # host may be a stringified ipv4 / ipv6 address or a dns name + if host == "localhost" + bind_addr = "127.0.0.1" + else + try + bind_addr = string(parseip(host)) + catch + bind_addr = string(getaddrinfo(host)) + end + end + (s, bind_addr) end -immutable LocalManager <: ClusterManager +function connect_to_worker(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags) + s = connect("localhost", ssh_tunnel(tunnel_user, host, bind_addr, uint16(port), sshflags)) + (s, bind_addr) end -show(io::IO, manager::LocalManager) = println(io, "LocalManager()") - -function launch(manager::LocalManager, np::Integer, config::Dict, resp_arr::Array, c::Condition) - dir = config[:dir] - exename = config[:exename] - exeflags = config[:exeflags] - - io_objs = cell(np) - configs = cell(np) - - # start the processes first... - for i in 1:np - io, pobj = open(detach(`$(dir)/$(exename) $exeflags --bind-to $(LPROC.bind_addr)`), "r") - io_objs[i] = io - configs[i] = merge(config, AnyDict(:process => pobj)) - end - - # ...and then read the host:port info. This optimizes overall start times. - push!(resp_arr, collect(zip(io_objs, configs))) - notify(c) -end - -function manage(manager::LocalManager, id::Integer, config::Dict, op::Symbol) - if op == :interrupt - kill(config[:process], 2) - end -end immutable SSHManager <: ClusterManager machines::Dict - function SSHManager(; machines=[]) + function SSHManager(machines) mhist = Dict() for m in machines - cnt = get(mhist, m, 0) - mhist[m] = cnt + 1 + if isa(m, Tuple) + host=m[1] + cnt=m[2] + else + host=m + cnt=1 + end + current_cnt = get(mhist, host, 0) + + if isa(cnt, Number) + mhist[host] = isa(current_cnt, Number) ? current_cnt + Int(cnt) : Int(cnt) + else + mhist[host] = cnt + end end new(mhist) end @@ -1152,50 +1357,46 @@ end show(io::IO, manager::SSHManager) = println(io, "SSHManager(machines=", manager.machines, ")") -function launch(manager::SSHManager, np::Integer, config::Dict, resp_arr::Array, machines_launch_ntfy::Condition) - # Launch on each unique host in parallel. +function launch(manager::SSHManager, params::Dict, launched::Array, machines_launch_ntfy::Condition) + # Launch one worker on each unique host in parallel. Additional workers are launched later. # Wait for all launches to complete. - plaunch_ntfy = Condition() launch_tasks = cell(length(manager.machines)) for (i,(machine, cnt)) in enumerate(manager.machines) - launch_tasks[i] = @schedule launch_on_machine(manager, config, resp_arr, machines_launch_ntfy, machine, cnt, plaunch_ntfy) + let machine=machine, cnt=cnt + launch_tasks[i] = @schedule try + launch_on_machine(manager, machine, cnt, params, launched, machines_launch_ntfy) + catch e + print(STDERR, "exception launching on machine $(machine) : $(e)\n") + end + end end - while length(launch_tasks) > 0 - if istaskdone(launch_tasks[1]) - shift!(launch_tasks) - else - wait(plaunch_ntfy) - end + for t in launch_tasks + wait(t) end notify(machines_launch_ntfy) end -function launch_on_machine(manager::SSHManager, config::Dict, resp_arr::Array, machines_launch_ntfy::Condition, - machine::AbstractString, cnt::Integer, plaunch_ntfy::Condition) - dir = config[:dir] - exename = config[:exename] - exeflags_base = config[:exeflags] - - thisconfig = copy(config) # config for this worker +function launch_on_machine(manager::SSHManager, machine, cnt, params, launched, machines_launch_ntfy::Condition) + dir = params[:dir] + exename = params[:exename] + exeflags = params[:exeflags] # machine could be of the format [user@]host[:port] bind_addr[:bind_port] machine_bind = split(machine) if length(machine_bind) > 1 - exeflags = `--bind-to $(machine_bind[2]) $exeflags_base` - else - exeflags = exeflags_base + exeflags = `--bind-to $(machine_bind[2]) $exeflags` end - machine_def = machine_bind[1] + exeflags = `$exeflags --worker` + machine_def = machine_bind[1] machine_def = split(machine_def, ':') portopt = length(machine_def) == 2 ? ` -p $(machine_def[2]) ` : `` - sshflags = `$(config[:sshflags]) $portopt` - thisconfig[:sshflags] = sshflags + sshflags = `$(params[:sshflags]) $portopt` host = machine_def[1] @@ -1204,110 +1405,64 @@ function launch_on_machine(manager::SSHManager, config::Dict, resp_arr::Array, m cmd = `sh -l -c $(shell_escape(cmd))` # shell to launch under cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch - thisconfig[:machine] = host - - # start the processes first... - maxp = config[:max_parallel] - - if config[:tunnel] - maxp = div(maxp,2) + 1 # Since the tunnel will also take up one ssh connection - end - - ios_to_check = [] - - t_check=time() - while cnt > 0 - ios_to_check2 = [] - for io in ios_to_check - if nb_available(io) == 0 - push!(ios_to_check2, io) - end - end - ios_to_check=ios_to_check2 - - maxp_in_loop = maxp - length(ios_to_check) - if maxp_in_loop == 0 - # wait for sometime and check again - sleep(0.1) - if (time() - t_check) > 50 - error("Timed out waiting for launched worker") - end - continue - end - lc = cnt > maxp_in_loop ? maxp_in_loop : cnt - - io_objs = cell(lc) - configs = cell(lc) + # launch + io, pobj = open(detach(cmd), "r") + wconfig = WorkerConfig() - for i in 1:lc - io, pobj = open(detach(cmd), "r") - io_objs[i] = io - push!(ios_to_check, io) - end - - cnt = cnt - lc + wconfig.io = io + wconfig.host = host + wconfig.sshflags = sshflags + wconfig.exeflags = exeflags + wconfig.count = cnt + wconfig.max_parallel = get(params, :max_parallel, Nullable{Integer}()) - # ...and then read the host:port info. This optimizes overall start times. - # For ssh, the tunnel connection, if any, has to be with the specified machine name. - # but the port needs to be forwarded to the bound hostname/ip-address - push!(resp_arr, collect(zip(io_objs, fill(host, lc), fill(thisconfig, lc)))) - notify(machines_launch_ntfy) - - t_check=time() - end - - notify(plaunch_ntfy) + push!(launched, wconfig) + notify(machines_launch_ntfy) end -function manage(manager::SSHManager, id::Integer, config::Dict, op::Symbol) + +function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symbol) if op == :interrupt - if haskey(config, :ospid) - machine = config[:machine] - if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $(config[:sshflags]) $machine "kill -2 $(config[:ospid])"`) + ospid = get(config.ospid, 0) + if ospid > 0 + host = get(config.host) + sshflags = get(config.sshflags) + if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host "kill -2 $ospid"`) println("Error sending a Ctrl-C to julia worker $id on $machine") end else # This state can happen immediately after an addprocs println("Worker $id cannot be presently interrupted.") end - elseif op == :register - config[:ospid] = remotecall_fetch(id, getpid) end end -# start and connect to processes via SSH. -# optionally through an SSH tunnel. -# the tunnel is only used from the head (process 1); the nodes are assumed -# to be mutually reachable without a tunnel, as is often the case in a cluster. -# Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config -function addprocs_internal(np::Integer; - tunnel=false, dir=JULIA_HOME, - exename=(ccall(:jl_is_debugbuild,Cint,())==0?"./julia":"./julia-debug"), - sshflags::Cmd=``, manager=LocalManager(), exeflags=``, max_parallel=10) - - config = AnyDict(:dir=>dir, :exename=>exename, :exeflags=>`$exeflags --worker`, :tunnel=>tunnel, :sshflags=>sshflags, :max_parallel=>max_parallel) +function addprocs(manager::ClusterManager; kwargs...) + params = merge(default_addprocs_params(), AnyDict(kwargs)) disable_threaded_libs() - ret = Array(Int, 0) rr_join = Array(RemoteRef, 0) - resp_arr = [] + resp_arr = RemoteRef[] c = Condition() - t = @schedule start_cluster_workers(np, config, manager, resp_arr, c) + t = @schedule try + start_cluster_workers(manager, params, resp_arr, c) + catch e + print(STDERR, "Error starting cluster workers : $(e)\n") + end while true if length(resp_arr) == 0 - istaskdone(t) && break + if istaskdone(t) + break + end @schedule (sleep(1); notify(c)) wait(c) end if length(resp_arr) > 0 - w = shift!(resp_arr) - id, rr = add_worker(PGRP, w) - push!(ret, id) - push!(rr_join, rr) + push!(rr_join, shift!(resp_arr)) end end @@ -1315,21 +1470,30 @@ function addprocs_internal(np::Integer; wait(rr) end - assert(length(ret) == np) - ret + new_w = sort!([fetch(rr) for rr in rr_join]) end -addprocs(np::Integer; kwargs...) = addprocs_internal(np; kwargs...) +immutable DefaultClusterManager <: ClusterManager +end -function addprocs(machines::AbstractVector; kwargs...) - manager_defined = any(x -> begin k,v = x; k==:manager end, kwargs) - if manager_defined - error("custom cluster managers unsupported on the ssh interface") - else - addprocs_internal(length(machines); manager=SSHManager(machines=machines), kwargs...) - end +addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...) +addprocs(np::Integer; kwargs...) = addprocs(LocalManager(np); kwargs...) + +# start and connect to processes via SSH, optionally through an SSH tunnel. +# the tunnel is only used from the head (process 1); the nodes are assumed +# to be mutually reachable without a tunnel, as is often the case in a cluster. +# Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config +# A machine is either a or a tuple of (, count) +function addprocs(machines::AbstractVector; tunnel=false, sshflags=``, max_parallel=10, kwargs...) + addprocs(SSHManager(machines); tunnel=tunnel, sshflags=sshflags, max_parallel=max_parallel, kwargs...) end +default_addprocs_params() = AnyDict( + :dir=>JULIA_HOME, + :exename=>exename(), + :exeflags=>``) + +exename() = ccall(:jl_is_debugbuild,Cint,())==0 ? "./julia" : "./julia-debug" ## higher-level functions: spawn, pmap, pfor, etc. ## @@ -1641,6 +1805,7 @@ function interrupt(pids::AbstractVector=workers()) end end + function disable_nagle(sock) # disable nagle on all OSes ccall(:uv_tcp_nodelay, Cint, (Ptr{Void}, Cint), sock.handle, 1) @@ -1659,11 +1824,11 @@ function check_same_host(pids) # We checkfirst if all test pids have been started using the local manager, # else we check for the same bind_to addr. This handles the special case # where the local ip address may change - as during a system sleep/awake - if all(p -> (p==1) || isa(map_pid_wrkr[p].manager, LocalManager), pids) + if all(p -> (p==1) || (isa(map_pid_wrkr[p].manager, LocalManager)), pids) return true else - first_bind_addr = map_pid_wrkr[pids[1]].bind_addr - return all(p -> (p != 1) && (map_pid_wrkr[p].bind_addr == first_bind_addr), pids[2:end]) + first_bind_addr = get(map_pid_wrkr[pids[1]].config.bind_addr) + return all(p -> (p != 1) && (get(map_pid_wrkr[p].config.bind_addr) == first_bind_addr), pids[2:end]) end end end @@ -1686,3 +1851,4 @@ function terminate_all_workers() end end end + diff --git a/base/sysimg.jl b/base/sysimg.jl index a33c5a37a52e6..c6d9553416394 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -138,16 +138,6 @@ include("multidimensional.jl") include("primes.jl") -# concurrency and parallelism -include("serialize.jl") -include("multi.jl") - -# Polling (requires multi.jl) -include("poll.jl") - -# code loading -include("loading.jl") - begin SOURCE_PATH = "" include = function(path) @@ -196,6 +186,23 @@ include("dSFMT.jl") include("random.jl") importall .Random +# (s)printf macros +include("printf.jl") +importall .Printf + +# nullable types +include("nullable.jl") + +# concurrency and parallelism +include("serialize.jl") +include("multi.jl") + +# code loading +include("loading.jl") + +# Polling (requires multi.jl) +include("poll.jl") + # distributed arrays and memory-mapped arrays include("darray.jl") include("mmap.jl") @@ -229,10 +236,6 @@ include("docs.jl") using .Docs using .Markdown -# (s)printf macros -include("printf.jl") -importall .Printf - # misc useful functions & macros include("util.jl") @@ -283,9 +286,6 @@ importall .Profile include("Dates.jl") import .Dates: Date, DateTime, now -# nullable types -include("nullable.jl") - # Some basic documentation include("basedocs.jl") diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index 23aa88a531907..4713d907412d0 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -1,7 +1,7 @@ .. _man-parallel-computing: ******************** - Parallel Computing + Parallel Computing ******************** Most modern computers possess more than one CPU, and several computers @@ -137,7 +137,7 @@ type the following into the julia prompt:: julia> @spawn rand2(2,2) RemoteRef(2,1,2) - julia> exception on 2: in anonymous: rand2 not defined + julia> exception on 2: in anonymous: rand2 not defined Process 1 knew about the function ``rand2``, but process 2 did not. @@ -186,7 +186,7 @@ Consequently, an easy way to load *and* use a package on all processes is:: A file can also be preloaded on multiple processes at startup, and a driver script can be used to drive the computation:: julia -p -L file1.jl -L file2.jl driver.jl - + Each process has an associated identifier. The process providing the interactive julia prompt always has an id equal to 1, as would the julia process running the driver script in the example above. @@ -194,15 +194,15 @@ The processes used by default for parallel operations are referred to as ``worke When there is only one process, process 1 is considered a worker. Otherwise, workers are considered to be all processes other than process 1. -The base Julia installation has in-built support for two types of clusters: +The base Julia installation has in-built support for two types of clusters: + + - A local cluster specified with the ``-p`` option as shown above. - - A local cluster specified with the ``-p`` option as shown above. - - - A cluster spanning machines using the ``--machinefile`` option. This uses a passwordless + - A cluster spanning machines using the ``--machinefile`` option. This uses a passwordless ``ssh`` login to start julia worker processes (from the same path as the current host) on the specified machines. - -Functions ``addprocs``, ``rmprocs``, ``workers``, and others are available as a programmatic means of + +Functions ``addprocs``, ``rmprocs``, ``workers``, and others are available as a programmatic means of adding, removing and querying the processes in a cluster. Other types of clusters can be supported by writing your own custom @@ -410,7 +410,7 @@ it completes its current task. This can be seen in the implementation of nextidx() = (idx=i; i+=1; idx) @sync begin for p=1:np - if p != myid() || np == 1 + if p != myid() || np == 1 @async begin while true idx = nextidx() @@ -580,7 +580,7 @@ is ``DArray``\ -specific, but we list it here for completeness:: end - + Shared Arrays (Experimental) ----------------------------------------------- @@ -611,7 +611,7 @@ across the processes specified by ``pids``. Unlike distributed arrays, a shared array is accessible only from those participating workers specified by the ``pids`` named argument (and the creating process too, if it is on the same host). - + If an ``init`` function, of signature ``initfn(S::SharedArray)``, is specified, it is called on all the participating workers. You can arrange it so that each worker runs the ``init`` function on a @@ -675,7 +675,7 @@ ClusterManagers Julia worker processes can also be spawned on arbitrary machines, enabling Julia's natural parallelism to function quite transparently in a cluster environment. The ``ClusterManager`` interface provides a -way to specify a means to launch and manage worker processes. +way to specify a means to launch and manage worker processes. Thus, a custom cluster manager would need to: @@ -683,63 +683,101 @@ Thus, a custom cluster manager would need to: - implement ``launch``, a method responsible for launching new workers - implement ``manage``, which is called at various events during a worker's lifetime -As an example let us see how the ``LocalManager``, the manager responsible for +Julia provides two in-built cluster managers: + +- ``LocalManager`` used when ``addprocs()`` or ``addprocs(::Integer)`` are called +- ``SSHManager`` used when ``addprocs(::Array)`` is called with a list of hostnames + +``addprocs(manager::FooManager)`` requires ``FooManager`` to implement:: + + function launch(manager::FooManager, params::Dict, launched::Array, c::Condition) + ... + end + + function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) + ... + end + + +As an example let us see how the ``LocalManager``, the manager responsible for starting workers on the same host, is implemented:: immutable LocalManager <: ClusterManager + np::Integer end - function launch(manager::LocalManager, np::Integer, config::Dict, - resp_arr::Array, c::Condition) + function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition) ... end - function manage(manager::LocalManager, id::Integer, config::Dict, op::Symbol) + function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol) ... end The ``launch`` method takes the following arguments: - - ``manager::LocalManager`` - used to dispatch the call to the appropriate implementation - - ``np::Integer`` - number of workers to be launched - - ``config::Dict`` - all the keyword arguments provided as part of the ``addprocs`` call - - ``resp_arr::Array`` - the array to append one or more worker information tuples too - - ``c::Condition`` - the condition variable to be notified as and when workers are launched. - -The ``launch`` method is called asynchronously in a separate task. The termination of this task -signals that all requested workers have been launched. Hence the ``launch`` function MUST exit as soon + - ``manager::ClusterManager`` - the cluster manager ``addprocs`` is called with + - ``params::Dict`` - all the keyword arguments passed to ``addprocs`` + - ``launched::Array`` - the array to append one or more ``WorkerConfig`` objects to + - ``c::Condition`` - the condition variable to be notified as and when workers are launched + +The ``launch`` method is called asynchronously in a separate task. The termination of this task +signals that all requested workers have been launched. Hence the ``launch`` function MUST exit as soon as all the requested workers have been launched. The julia worker MUST be launched with a ``--worker`` -argument. Optionally ``--bind-to bind_addr[:port]`` may also be specified to enable other workers -to connect to it only at the specified ``bind_addr`` and ``port``. +argument. Optionally ``--bind-to bind_addr[:port]`` may also be specified to enable other workers +to connect to it at the specified ``bind_addr`` and ``port``. Useful for multi-homed hosts. + + +For every worker launched, the ``launch`` method must add a ``WorkerConfig``object with appropriate +fields initialized to ``launched``:: + +type WorkerConfig + # Common fields relevant to all cluster managers + io::Nullable{IO} + host::Nullable{AbstractString} + port::Nullable{Integer} + + # Used when launching additional workers at a host + count::Nullable{Union(Int, Symbol)} + exeflags::Nullable{Cmd} + + # External cluster managers can use this to store information at a per-worker level + # Can be a dict if multiple fields need to be stored. + userdata::Nullable{Any} + + # SSHManager / SSH tunnel connections to workers + tunnel::Nullable{Bool} + bind_addr::Nullable{AbstractString} + sshflags::Nullable{Cmd} + max_parallel::Nullable{Integer} + + ..... +end + +Most of the fields in ``WorkerConfig`` are used by the inbuilt managers. +Custom cluster managers would typically specify only ``io`` or ``host`` / ``port``: + +If ``io`` is specified, it is used to read host/port information. A julia worker prints out +its bind address and port at startup. This allows julia workers to listen on any free port +available instead of requiring worker ports to be configured manually. +If ``io`` is not specfied, ``host`` and ``port`` are used to connect. -Arrays of worker information tuples that are appended to ``resp_arr`` can take any one of -the following forms:: +``count`` and ``exeflags`` are relevant for launching additional workers from a worker. +For example, a cluster manager may launch a single worker per node, and use that to launch +additional workers. ``count`` with an integer value ``n`` will launch a total of ``n`` workers, +while a value of ``:auto`` will launch as many workers as cores on that machine. ``exeflags`` +should be set to the required command line arguments for new workers. - (io::IO, config::Dict) - - (io::IO, host::AbstractString, config::Dict) - - (io::IO, host::AbstractString, port::Integer, config::Dict) - - (host::AbstractString, port::Integer, config::Dict) +``tunnel``, ``bind_addr``, ``sshflags`` and ``max_parallel`` are used when a ssh tunnel is +required to connect to the workers from the master process. -where: +``userdata`` is provided for custom cluster managers to store their own worker specific information. - - ``io::IO`` is the output stream of the worker. - - ``host::AbstractString`` and ``port::Integer`` are the host:port to connect to. If not provided - they are read from the ``io`` stream provided. - - ``config::Dict`` is the configuration dictionary for the worker. The ``launch`` - function can add/modify any data that may be required for managing - the worker. -The ``manage`` method takes the following arguments: - - ``manager::ClusterManager`` - used to dispatch the call to the appropriate implementation - - ``id::Integer`` - The Julia process id - - ``config::Dict`` - configuration dictionary for the worker. The data may have been modified by the ``launch`` method - - ``op::Symbol`` - one of ``:register``, ``:deregister``, ``:interrupt`` or ``:finalize``. - The ``manage`` method is called at different times during the worker's lifetime: +``manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)`` is called at different +times during the worker's lifetime with different ``op`` values: - with ``:register``/``:deregister`` when a worker is added / removed from the Julia worker pool. diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index 0288bc8bffb00..acf8c40490dbc 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -5319,37 +5319,57 @@ some built-in integration support in Julia. Parallel Computing ------------------ -.. function:: addprocs(n; manager::ClusterManager=LocalManager()) -> List of process identifiers +.. function:: addprocs(n::Integer; exeflags=``) -> List of process identifiers - ``addprocs(4)`` will add 4 processes on the local machine. This can be used to take - advantage of multiple cores. + Launches workers using the in-built ``LocalManager`` which only launches workers on the local host. + This can be used to take advantage of multiple cores. `addprocs(4)`` will add 4 processes on the local machine. - Keyword argument ``manager`` can be used to provide a custom cluster manager to start workers. - For example Beowulf clusters are supported via a custom cluster manager implemented - in package ``ClusterManagers``. +.. function:: addprocs() -> List of process identifiers - See the documentation for package ``ClusterManagers`` for more information on how to - write a custom cluster manager. + Equivalent to ``addprocs(CPU_CORES)`` -.. function:: addprocs(machines; tunnel=false, dir=JULIA_HOME, sshflags::Cmd=``) -> List of process identifiers +.. function:: addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, exeflags=``) -> List of process identifiers Add processes on remote machines via SSH. Requires julia to be installed in the same location on each node, or to be available via a shared file system. - ``machines`` is a vector of host definitions of the form ``[user@]host[:port] [bind_addr[:port]]``. ``user`` defaults - to current user, ``port`` to the standard ssh port. A worker is started at each host definition. - If the optional ``[bind_addr[:port]]`` is specified, other workers will connect to this worker at the - specified ``bind_addr`` and ``port``. + ``machines`` is a vector of machine specifications. Worker are started for each specification. - Keyword arguments: + A machine specification is either a string ``machine_spec`` or a tuple - ``(machine_spec, count)`` + + ``machine_spec`` is a string of the form ``[user@]host[:port] [bind_addr[:port]]``. ``user`` defaults + to current user, ``port`` to the standard ssh port. If ``[bind_addr[:port]]`` is specified, other + workers will connect to this worker at the specified ``bind_addr`` and ``port``. - ``tunnel`` : if ``true`` then SSH tunneling will be used to connect to the worker. + ``count`` is the number of workers to be launched on the specified host. If specified as ``"auto"`` + or ``:auto`` it will launch as many workers as the number of cores on the specific host. - ``dir`` : specifies the location of the julia binaries on the worker nodes. + + Keyword arguments: + + ``tunnel`` : if ``true`` then SSH tunneling will be used to connect to the worker from the master process. ``sshflags`` : specifies additional ssh options, e.g. :literal:`sshflags=\`-i /home/foo/bar.pem\`` . - ``max_parallel`` : specifies the maximum number of workers being launched in parallel at a host. Defaults to 10. + ``max_parallel`` : specifies the maximum number of workers connected to in parallel at a host. Defaults to 10. + + ``dir`` : specifies the location of the julia binaries on the worker nodes. Defaults to JULIA_HOME. + + ``exename`` : name of the julia executable. Defaults to "./julia" or "./julia-debug" as the case may be. + + ``exeflags`` : additional flags passed to the worker processes. + + +.. function:: addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers + + Launches worker processes via the specified cluster manager. + + For example Beowulf clusters are supported via a custom cluster manager implemented + in package ``ClusterManagers``. + + See the documentation for package ``ClusterManagers`` for more information on how to + write a custom cluster manager. + .. function:: nprocs()