diff --git a/base/deprecated.jl b/base/deprecated.jl index 2049cf5ade283..d34057c4e4b0c 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -207,16 +207,17 @@ const MemoryError = OutOfMemoryError #9295 @deprecate push!(t::Associative, key, v) setindex!(t, v, key) -@deprecate (|>)(src::AbstractCmd, dest::AbstractCmd) pipe(src, dest) -@deprecate (.>)(src::AbstractCmd, dest::AbstractCmd) pipe(src, stderr=dest) -@deprecate (|>)(src::Redirectable, dest::AbstractCmd) pipe(src, dest) -@deprecate (|>)(src::AbstractCmd, dest::Redirectable) pipe(src, dest) -@deprecate (.>)(src::AbstractCmd, dest::Redirectable) pipe(src, stderr=dest) -@deprecate (|>)(src::AbstractCmd, dest::AbstractString) pipe(src, dest) -@deprecate (|>)(src::AbstractString, dest::AbstractCmd) pipe(src, dest) -@deprecate (.>)(src::AbstractCmd, dest::AbstractString) pipe(src, stderr=dest) -@deprecate (>>)(src::AbstractCmd, dest::AbstractString) pipe(src, stdout=dest, append=true) -@deprecate (.>>)(src::AbstractCmd, dest::AbstractString) pipe(src, stderr=dest, append=true) +@deprecate (|>)(src::AbstractCmd, dest::AbstractCmd) pipeline(src, dest) +@deprecate (.>)(src::AbstractCmd, dest::AbstractCmd) pipeline(src, stderr=dest) +@deprecate (|>)(src::Redirectable, dest::AbstractCmd) pipeline(src, dest) +@deprecate (|>)(src::AbstractCmd, dest::Redirectable) pipeline(src, dest) +@deprecate (.>)(src::AbstractCmd, dest::Redirectable) pipeline(src, stderr=dest) +@deprecate (|>)(src::AbstractCmd, dest::AbstractString) pipeline(src, dest) +@deprecate (|>)(src::AbstractString, dest::AbstractCmd) pipeline(src, dest) +@deprecate (.>)(src::AbstractCmd, dest::AbstractString) pipeline(src, stderr=dest) +@deprecate (>>)(src::AbstractCmd, dest::AbstractString) pipeline(src, stdout=dest, append=true) +@deprecate (.>>)(src::AbstractCmd, dest::AbstractString) pipeline(src, stderr=dest, append=true) +@deprecate pipe pipeline # 10314 @deprecate filter!(r::Regex, d::Dict) filter!((k,v)->ismatch(r,k), d) diff --git a/base/docs/helpdb.jl b/base/docs/helpdb.jl index a3b1391fb43cd..1255f1f2d03f2 100644 --- a/base/docs/helpdb.jl +++ b/base/docs/helpdb.jl @@ -2621,9 +2621,9 @@ doc""" Connect to the host `host` on port `port` - connect(path) -> Pipe + connect(path) -> PipeEndpoint -Connect to the Named Pipe/Domain Socket at `path` +Connect to the Named Pipe / Domain Socket at ``path`` connect(manager::FooManager, pid::Int, config::WorkerConfig) -> (instrm::AsyncStream, outstrm::AsyncStream) @@ -9396,7 +9396,7 @@ Listen on port on the address specified by `addr`. By default this listens on lo listen(path) -> PipeServer -Listens on/Creates a Named Pipe/Domain Socket +Create and listen on a Named Pipe / Domain Socket """ listen diff --git a/base/exports.jl b/base/exports.jl index fb91694411578..9e69c1e00c45a 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1161,7 +1161,8 @@ export nb_available, ntoh, open, - pipe, + pipeline, + Pipe, PipeBuffer, poll_fd, poll_file, diff --git a/base/interactiveutil.jl b/base/interactiveutil.jl index 1d4c8c0f111c4..41b06768ad22a 100644 --- a/base/interactiveutil.jl +++ b/base/interactiveutil.jl @@ -87,7 +87,7 @@ end global _clipboardcmd _clipboardcmd !== nothing && return _clipboardcmd for cmd in (:xclip, :xsel) - success(pipe(`which $cmd`, DevNull)) && return _clipboardcmd = cmd + success(pipeline(`which $cmd`, DevNull)) && return _clipboardcmd = cmd end error("no clipboard command found, please install xsel or xclip") end @@ -165,7 +165,7 @@ function versioninfo(io::IO=STDOUT, verbose::Bool=false) println(io, " WORD_SIZE: ", Sys.WORD_SIZE) if verbose lsb = "" - @linux_only try lsb = readchomp(pipe(`lsb_release -ds`, stderr=DevNull)) end + @linux_only try lsb = readchomp(pipeline(`lsb_release -ds`, stderr=DevNull)) end @windows_only try lsb = strip(readall(`$(ENV["COMSPEC"]) /c ver`)) end if lsb != "" println(io, " ", lsb) @@ -343,7 +343,7 @@ downloadcmd = nothing global downloadcmd if downloadcmd === nothing for checkcmd in (:curl, :wget, :fetch) - if success(pipe(`which $checkcmd`, DevNull)) + if success(pipeline(`which $checkcmd`, DevNull)) downloadcmd = checkcmd break end diff --git a/base/multi.jl b/base/multi.jl index 58004f45d21ef..8064d7b56a4db 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -1032,7 +1032,6 @@ end # The master process uses this to connect to the worker and subsequently # setup a all-to-all network. function read_worker_host_port(io::IO) - io.line_buffered = true while true conninfo = readline(io) bind_addr, port = parse_connection_info(conninfo) diff --git a/base/pkg/entry.jl b/base/pkg/entry.jl index 036f7d24d95bb..6b17f0a28db92 100644 --- a/base/pkg/entry.jl +++ b/base/pkg/entry.jl @@ -55,7 +55,7 @@ function add(pkg::AbstractString, vers::VersionSet) outdated = :yes else try - run(pipe(Git.cmd(`fetch -q --all`, dir="METADATA"),stdout=DevNull,stderr=DevNull)) + run(pipeline(Git.cmd(`fetch -q --all`, dir="METADATA"),stdout=DevNull,stderr=DevNull)) outdated = Git.success(`diff --quiet origin/$branch`, dir="METADATA") ? (:no) : (:yes) end diff --git a/base/pkg/generate.jl b/base/pkg/generate.jl index 831174c61aa33..3076878bdf642 100644 --- a/base/pkg/generate.jl +++ b/base/pkg/generate.jl @@ -11,7 +11,7 @@ github_user() = readchomp(ignorestatus(`git config --global --get github.user`)) function git_contributors(dir::AbstractString, n::Int=typemax(Int)) contrib = Dict() tty = @windows? "CON:" : "/dev/tty" - for line in eachline(pipe(tty, Git.cmd(`shortlog -nes`, dir=dir))) + for line in eachline(pipeline(tty, Git.cmd(`shortlog -nes`, dir=dir))) m = match(r"\s*(\d+)\s+(.+?)\s+\<(.+?)\>\s*$", line) m === nothing && continue commits, name, email = m.captures diff --git a/base/pkg/git.jl b/base/pkg/git.jl index 5959f674dffde..32e492f074dc1 100644 --- a/base/pkg/git.jl +++ b/base/pkg/git.jl @@ -21,7 +21,7 @@ function git(d) end cmd(args::Cmd; dir="") = `$(git(dir)) $args` -run(args::Cmd; dir="", out=STDOUT) = Base.run(pipe(cmd(args,dir=dir), out)) +run(args::Cmd; dir="", out=STDOUT) = Base.run(pipeline(cmd(args,dir=dir), out)) readall(args::Cmd; dir="") = Base.readall(cmd(args,dir=dir)) readchomp(args::Cmd; dir="") = Base.readchomp(cmd(args,dir=dir)) diff --git a/base/process.jl b/base/process.jl index 2034a3c5971e7..879e4288bf356 100644 --- a/base/process.jl +++ b/base/process.jl @@ -2,28 +2,35 @@ abstract AbstractCmd -type Cmd <: AbstractCmd +immutable Cmd <: AbstractCmd exec::Vector{ByteString} ignorestatus::Bool detach::Bool env::Union{Array{ByteString},Void} dir::UTF8String - Cmd(exec::Vector{ByteString}) = new(exec, false, false, nothing, "") + Cmd(exec::Vector{ByteString}) = + new(exec, false, false, nothing, "") + Cmd(cmd::Cmd, ignorestatus, detach, env, dir) = + new(cmd.exec, ignorestatus, detach, env, + dir === cmd.dir ? dir : cstr(dir)) + Cmd(cmd::Cmd; ignorestatus=cmd.ignorestatus, detach=cmd.detach, env=cmd.env, dir=cmd.dir) = + new(cmd.exec, ignorestatus, detach, env, + dir === cmd.dir ? dir : cstr(dir)) end -type OrCmds <: AbstractCmd +immutable OrCmds <: AbstractCmd a::AbstractCmd b::AbstractCmd OrCmds(a::AbstractCmd, b::AbstractCmd) = new(a, b) end -type ErrOrCmds <: AbstractCmd +immutable ErrOrCmds <: AbstractCmd a::AbstractCmd b::AbstractCmd ErrOrCmds(a::AbstractCmd, b::AbstractCmd) = new(a, b) end -type AndCmds <: AbstractCmd +immutable AndCmds <: AbstractCmd a::AbstractCmd b::AbstractCmd AndCmds(a::AbstractCmd, b::AbstractCmd) = new(a, b) @@ -50,7 +57,7 @@ function show(io::IO, cmd::Cmd) end function show(io::IO, cmds::Union{OrCmds,ErrOrCmds}) - print(io, "pipe(") + print(io, "pipeline(") show(io, cmds.a) print(io, ", ") print(io, isa(cmds, ErrOrCmds) ? "stderr=" : "stdout=") @@ -81,7 +88,15 @@ end immutable DevNullStream <: AsyncStream end const DevNull = DevNullStream() +isreadable(::DevNullStream) = false +iswritable(::DevNullStream) = true +isopen(::DevNullStream) = true +read{T<:DevNullStream}(::T, args...) = throw(EOFErorr()) +write{T<:DevNullStream}(::T, args...) = 0 +close(::DevNullStream) = nothing +flush(::DevNullStream) = nothing copy(::DevNullStream) = DevNull + uvhandle(::DevNullStream) = C_NULL uvhandle(x::Ptr) = x uvtype(::Ptr) = UV_STREAM @@ -93,14 +108,14 @@ uvtype(x::RawFD) = UV_RAW_FD typealias Redirectable Union{AsyncStream, FS.File, FileRedirect, DevNullStream, IOStream, RawFD} -type CmdRedirect <: AbstractCmd +immutable CmdRedirect <: AbstractCmd cmd::AbstractCmd handle::Redirectable stream_no::Int end function show(io::IO, cr::CmdRedirect) - print(io, "pipe(") + print(io, "pipeline(") show(io, cr.cmd) print(io, ", ") if cr.stream_no == STDOUT_NO @@ -115,9 +130,10 @@ function show(io::IO, cr::CmdRedirect) end -ignorestatus(cmd::Cmd) = (cmd.ignorestatus=true; cmd) -ignorestatus(cmd::Union{OrCmds,AndCmds}) = (ignorestatus(cmd.a); ignorestatus(cmd.b); cmd) -detach(cmd::Cmd) = (cmd.detach=true; cmd) +ignorestatus(cmd::Cmd) = Cmd(cmd, ignorestatus=true) +ignorestatus(cmd::Union{OrCmds,AndCmds}) = + typeof(cmd)(ignorestatus(cmd.a), ignorestatus(cmd.b)) +detach(cmd::Cmd) = Cmd(cmd, detach=true) # like bytestring(s), but throw an error if s contains NUL, since # libuv requires NUL-terminated strings @@ -128,10 +144,21 @@ function cstr(s) return bytestring(s) end -setenv{S<:ByteString}(cmd::Cmd, env::Array{S}; dir="") = (cmd.env = ByteString[cstr(x) for x in env]; setenv(cmd, dir=dir); cmd) -setenv(cmd::Cmd, env::Associative; dir="") = (cmd.env = ByteString[cstr(string(k)*"="*string(v)) for (k,v) in env]; setenv(cmd, dir=dir); cmd) -setenv{T<:AbstractString}(cmd::Cmd, env::Pair{T}...; dir="") = (cmd.env = ByteString[cstr(k*"="*string(v)) for (k,v) in env]; setenv(cmd, dir=dir); cmd) -setenv(cmd::Cmd; dir="") = (cmd.dir = cstr(dir); cmd) +function setenv{S<:ByteString}(cmd::Cmd, env::Array{S}; dir="") + byteenv = ByteString[cstr(x) for x in env] + return Cmd(cmd; env = byteenv, dir = dir) +end +function setenv(cmd::Cmd, env::Associative; dir="") + byteenv = ByteString[cstr(string(k)*"="*string(v)) for (k,v) in env] + return Cmd(cmd; env = byteenv, dir = dir) +end +function setenv{T<:AbstractString}(cmd::Cmd, env::Pair{T}...; dir="") + byteenv = ByteString[cstr(k*"="*string(v)) for (k,v) in env] + return Cmd(cmd; env = byteenv, dir = dir) +end +function setenv(cmd::Cmd; dir="") + return Cmd(cmd; dir = dir) +end (&)(left::AbstractCmd, right::AbstractCmd) = AndCmds(left, right) redir_out(src::AbstractCmd, dest::AbstractCmd) = OrCmds(src, dest) @@ -149,7 +176,7 @@ redir_err(src::AbstractCmd, dest::AbstractString) = CmdRedirect(src, FileRedirec redir_out_append(src::AbstractCmd, dest::AbstractString) = CmdRedirect(src, FileRedirect(dest, true), STDOUT_NO) redir_err_append(src::AbstractCmd, dest::AbstractString) = CmdRedirect(src, FileRedirect(dest, true), STDERR_NO) -function pipe(cmd::AbstractCmd; stdin=nothing, stdout=nothing, stderr=nothing, append::Bool=false) +function pipeline(cmd::AbstractCmd; stdin=nothing, stdout=nothing, stderr=nothing, append::Bool=false) if append && stdout === nothing && stderr === nothing error("append set to true, but no output redirections specified") end @@ -165,21 +192,21 @@ function pipe(cmd::AbstractCmd; stdin=nothing, stdout=nothing, stderr=nothing, a return cmd end -pipe(cmd::AbstractCmd, dest) = pipe(cmd, stdout=dest) -pipe(src::Union{Redirectable,AbstractString}, cmd::AbstractCmd) = pipe(cmd, stdin=src) +pipeline(cmd::AbstractCmd, dest) = pipeline(cmd, stdout=dest) +pipeline(src::Union{Redirectable,AbstractString}, cmd::AbstractCmd) = pipeline(cmd, stdin=src) -pipe(a, b, c, d...) = pipe(pipe(a,b), c, d...) +pipeline(a, b, c, d...) = pipeline(pipeline(a,b), c, d...) typealias RawOrBoxedHandle Union{UVHandle,AsyncStream,Redirectable,IOStream} typealias StdIOSet NTuple{3,RawOrBoxedHandle} -type Process +type Process <: AbstractPipe cmd::Cmd handle::Ptr{Void} in::AsyncStream out::AsyncStream err::AsyncStream - exitcode::Int32 + exitcode::Int64 termsignal::Int32 exitcb::Callback exitnotify::Condition @@ -195,20 +222,22 @@ type Process if !isa(err, AsyncStream) || err === DevNull err=DevNull end - this = new(cmd, handle, in, out, err, typemin(Int32), typemin(Int32), false, Condition(), false, Condition()) + this = new(cmd, handle, in, out, err, + typemin(fieldtype(Process, :exitcode)), + typemin(fieldtype(Process, :termsignal)), + false, Condition(), false, Condition()) finalizer(this, uvfinalize) this end end -type ProcessChain +immutable ProcessChain <: AbstractPipe processes::Vector{Process} in::Redirectable out::Redirectable err::Redirectable ProcessChain(stdios::StdIOSet) = new(Process[], stdios[1], stdios[2], stdios[3]) end -typealias ProcessChainOrNot Union{Bool,ProcessChain} function _jl_spawn(cmd, argv, loop::Ptr{Void}, pp::Process, in, out, err) @@ -239,7 +268,7 @@ function uv_return_spawn(p::Ptr{Void}, exit_status::Int64, termsignal::Int32) data = ccall(:jl_uv_process_data, Ptr{Void}, (Ptr{Void},), p) data == C_NULL && return proc = unsafe_pointer_to_objref(data)::Process - proc.exitcode = Int32(exit_status) + proc.exitcode = exit_status proc.termsignal = termsignal if isa(proc.exitcb, Function) proc.exitcb(proc, exit_status, termsignal) end ccall(:jl_close_uv, Void, (Ptr{Void},), proc.handle) @@ -253,128 +282,147 @@ function _uv_hook_close(proc::Process) notify(proc.closenotify) end -function spawn(pc::ProcessChainOrNot, redirect::CmdRedirect, stdios::StdIOSet, exitcb::Callback, closecb::Callback) - spawn(pc, redirect.cmd, (redirect.stream_no == STDIN_NO ? redirect.handle : stdios[1], - redirect.stream_no == STDOUT_NO ? redirect.handle : stdios[2], - redirect.stream_no == STDERR_NO ? redirect.handle : stdios[3]), exitcb, closecb) +function spawn(redirect::CmdRedirect, stdios::StdIOSet, exitcb::Callback, closecb::Callback; chain::Nullable{ProcessChain}=Nullable{ProcessChain}()) + spawn(redirect.cmd, + (redirect.stream_no == STDIN_NO ? redirect.handle : stdios[1], + redirect.stream_no == STDOUT_NO ? redirect.handle : stdios[2], + redirect.stream_no == STDERR_NO ? redirect.handle : stdios[3]), + exitcb, closecb, chain=chain) end -function spawn(pc::ProcessChainOrNot, cmds::OrCmds, stdios::StdIOSet, exitcb::Callback, closecb::Callback) - out_pipe = box(Ptr{Void}, Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) - in_pipe = box(Ptr{Void}, Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) - #out_pipe = Libc.malloc(_sizeof_uv_named_pipe) - #in_pipe = Libc.malloc(_sizeof_uv_named_pipe) +function spawn(cmds::OrCmds, stdios::StdIOSet, exitcb::Callback, closecb::Callback; chain::Nullable{ProcessChain}=Nullable{ProcessChain}()) + out_pipe = Libc.malloc(_sizeof_uv_named_pipe) + in_pipe = Libc.malloc(_sizeof_uv_named_pipe) link_pipe(in_pipe, false, out_pipe, false) - if pc == false - pc = ProcessChain(stdios) + if isnull(chain) + chain = Nullable(ProcessChain(stdios)) end try - spawn(pc, cmds.a, (stdios[1], out_pipe, stdios[3]), exitcb, closecb) - spawn(pc, cmds.b, (in_pipe, stdios[2], stdios[3]), exitcb, closecb) - catch err + spawn(cmds.a, (stdios[1], out_pipe, stdios[3]), exitcb, closecb, chain=chain) + spawn(cmds.b, (in_pipe, stdios[2], stdios[3]), exitcb, closecb, chain=chain) + finally close_pipe_sync(out_pipe) close_pipe_sync(in_pipe) - rethrow(err) + Libc.free(out_pipe) + Libc.free(in_pipe) end - close_pipe_sync(out_pipe) - close_pipe_sync(in_pipe) - pc + get(chain) end -function spawn(pc::ProcessChainOrNot, cmds::ErrOrCmds, stdios::StdIOSet, exitcb::Callback, closecb::Callback) - out_pipe = box(Ptr{Void}, Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) - in_pipe = box(Ptr{Void}, Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) - #out_pipe = Libc.malloc(_sizeof_uv_named_pipe) - #in_pipe = Libc.malloc(_sizeof_uv_named_pipe) +function spawn(cmds::ErrOrCmds, stdios::StdIOSet, exitcb::Callback, closecb::Callback; chain::Nullable{ProcessChain}=Nullable{ProcessChain}()) + out_pipe = Libc.malloc(_sizeof_uv_named_pipe) + in_pipe = Libc.malloc(_sizeof_uv_named_pipe) link_pipe(in_pipe, false, out_pipe, false) - if pc == false - pc = ProcessChain(stdios) + if isnull(chain) + chain = Nullable(ProcessChain(stdios)) end try - spawn(pc, cmds.a, (stdios[1], stdios[2], out_pipe), exitcb, closecb) - spawn(pc, cmds.b, (in_pipe, stdios[2], stdios[3]), exitcb, closecb) - catch err + spawn(cmds.a, (stdios[1], stdios[2], out_pipe), exitcb, closecb, chain=chain) + spawn(cmds.b, (in_pipe, stdios[2], stdios[3]), exitcb, closecb, chain=chain) + finally close_pipe_sync(out_pipe) close_pipe_sync(in_pipe) - rethrow(err) - end - close_pipe_sync(out_pipe) - close_pipe_sync(in_pipe) - pc -end - - -macro setup_stdio() - esc( - quote - close_in,close_out,close_err = false,false,false - in,out,err = stdios - if isa(stdios[1], Pipe) - if stdios[1].handle == C_NULL - error("pipes passed to spawn must be initialized") - end - elseif isa(stdios[1], FileRedirect) - in = FS.open(stdios[1].filename, JL_O_RDONLY) - close_in = true - elseif isa(stdios[1], IOStream) - in = FS.File(RawFD(fd(stdios[1]))) - end - if isa(stdios[2], Pipe) - if stdios[2].handle == C_NULL - error("pipes passed to spawn must be initialized") - end - elseif isa(stdios[2], FileRedirect) - out = FS.open(stdios[2].filename, JL_O_WRONLY | JL_O_CREAT | (stdios[2].append?JL_O_APPEND:JL_O_TRUNC), S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) - close_out = true - elseif isa(stdios[2], IOStream) - out = FS.File(RawFD(fd(stdios[2]))) - end - if isa(stdios[3], Pipe) - if stdios[3].handle == C_NULL - error("pipes passed to spawn must be initialized") - end - elseif isa(stdios[3], FileRedirect) - err = FS.open(stdios[3].filename, JL_O_WRONLY | JL_O_CREAT | (stdios[3].append?JL_O_APPEND:JL_O_TRUNC), S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) - close_err = true - elseif isa(stdios[3], IOStream) - err = FS.File(RawFD(fd(stdios[3]))) + Libc.free(out_pipe) + Libc.free(in_pipe) + end + get(chain) +end + +function setup_stdio(stdio::PipeEndpoint, readable::Bool) + closeafter = false + if stdio.handle == C_NULL + io = Libc.malloc(_sizeof_uv_named_pipe) + if readable + link_pipe(io, false, stdio, true) + else + link_pipe(stdio, true, io, false) end - end) + closeafter = true + else + io = stdio.handle + end + return (io, closeafter) +end + +function setup_stdio(stdio::Pipe, readable::Bool) + if stdio.in.handle == C_NULL && stdio.out.handle == C_NULL + link_pipe(stdio) + end + io = readable ? stdio.out : stdio.in + return (io, false) +end + +function setup_stdio(stdio::IOStream, readable::Bool) + io = FS.File(RawFD(fd(stdio))) + return (io, false) end -macro cleanup_stdio() - esc( - quote - close_in && close(in) - close_out && close(out) - close_err && close(err) - end) +function setup_stdio(stdio::FileRedirect, readable::Bool) + if readable + attr = JL_O_RDONLY + perm = zero(S_IRUSR) + else + attr = JL_O_WRONLY | JL_O_CREAT + attr |= stdio.append ? JL_O_APPEND : JL_O_TRUNC + perm = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH + end + io = FS.open(stdio.filename, attr, perm) + return (io, true) +end + +function setup_stdio(io, readable::Bool) + # if there is no specialization, + # assume that uvhandle and uvtype are defined for it + return io, false +end + +function setup_stdio(stdio::Ptr{Void}, readable::Bool) + return (stdio, false) +end + +function close_stdio(stdio::Ptr{Void}) + close_pipe_sync(stdio) + Libc.free(stdio) +end + +function close_stdio(stdio) + close(stdio) end -function spawn(pc::ProcessChainOrNot, cmd::Cmd, stdios::StdIOSet, exitcb::Callback, closecb::Callback) +function setup_stdio(anon::Function, stdio::StdIOSet) + in, close_in = setup_stdio(stdio[1], true) + out, close_out = setup_stdio(stdio[2], false) + err, close_err = setup_stdio(stdio[3], false) + anon(in, out, err) + close_in && close_stdio(in) + close_out && close_stdio(out) + close_err && close_stdio(err) +end + +function spawn(cmd::Cmd, stdios::StdIOSet, exitcb::Callback, closecb::Callback; chain::Nullable{ProcessChain}=Nullable{ProcessChain}()) loop = eventloop() pp = Process(cmd, C_NULL, stdios[1], stdios[2], stdios[3]); - @setup_stdio pp.exitcb = exitcb pp.closecb = closecb - pp.handle = _jl_spawn(cmd.exec[1], cmd.exec, loop, pp, - in, out, err) - @cleanup_stdio - if isa(pc, ProcessChain) - push!(pc.processes, pp) + setup_stdio(stdios) do in, out, err + pp.handle = _jl_spawn(cmd.exec[1], cmd.exec, loop, pp, + in, out, err) + end + if !isnull(chain) + push!(get(chain).processes, pp) end pp end -function spawn(pc::ProcessChainOrNot, cmds::AndCmds, stdios::StdIOSet, exitcb::Callback, closecb::Callback) - if pc == false - pc = ProcessChain(stdios) +function spawn(cmds::AndCmds, stdios::StdIOSet, exitcb::Callback, closecb::Callback; chain::Nullable{ProcessChain}=Nullable{ProcessChain}()) + if isnull(chain) + chain = Nullable(ProcessChain(stdios)) end - @setup_stdio - spawn(pc, cmds.a, (in,out,err), exitcb, closecb) - spawn(pc, cmds.b, (in,out,err), exitcb, closecb) - @cleanup_stdio - pc + setup_stdio(stdios) do in, out, err + spawn(cmds.a, (in,out,err), exitcb, closecb, chain=chain) + spawn(cmds.b, (in,out,err), exitcb, closecb, chain=chain) + end + get(chain) end # INTERNAL @@ -400,55 +448,37 @@ spawn_opts_inherit(stdios::StdIOSet, exitcb::Callback=false, closecb::Callback=f spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2), args...) = (tuple(in,out,err,args...),false,false) -spawn(pc::ProcessChainOrNot, cmds::AbstractCmd, args...) = spawn(pc, cmds, spawn_opts_swallow(args...)...) -spawn(cmds::AbstractCmd, args...) = spawn(false, cmds, spawn_opts_swallow(args...)...) - -macro tmp_rpipe(pipe, tmppipe, code, args...) - esc(quote - $pipe = Pipe(C_NULL) - $tmppipe = Pipe(C_NULL) - link_pipe($pipe, true, $tmppipe, false) - r = begin - $code - end - close_pipe_sync($tmppipe) - r - end) -end - -macro tmp_wpipe(tmppipe, pipe, code) - esc(quote - $pipe = Pipe(C_NULL) - $tmppipe = Pipe(C_NULL) - link_pipe($tmppipe, false, $pipe, true) - r = begin - $code - end - close_pipe_sync($tmppipe) - r - end) -end +spawn(cmds::AbstractCmd, args...; chain::Nullable{ProcessChain}=Nullable{ProcessChain}()) = + spawn(cmds, spawn_opts_swallow(args...)...; chain=chain) +spawn(cmds::AbstractCmd, args...; chain::Nullable{ProcessChain}=Nullable{ProcessChain}()) = + spawn(cmds, spawn_opts_swallow(args...)...; chain=chain) function eachline(cmd::AbstractCmd, stdin) - @tmp_rpipe out tmp begin - processes = spawn(false, cmd, (stdin,tmp,STDERR)) - # implicitly close after reading lines, since we opened - EachLine(out, ()->close(out)) - end + stdout = Pipe() + processes = spawn(cmd, (stdin,stdout,STDERR)) + close(stdout.in) + out = stdout.out + # implicitly close after reading lines, since we opened + return EachLine(out, ()->close(out)) end eachline(cmd::AbstractCmd) = eachline(cmd, DevNull) -# return a (Pipe,Process) pair to write/read to/from the pipeline -function open(cmds::AbstractCmd, mode::AbstractString="r", stdio::AsyncStream=DevNull) +# return a Process object to read-to/write-from the pipeline +function open(cmds::AbstractCmd, mode::AbstractString="r", other::AsyncStream=DevNull) if mode == "r" - processes = @tmp_rpipe out tmp spawn(false, cmds, (stdio,tmp,STDERR)) - (out, processes) + in = other + out = io = Pipe() + processes = spawn(cmds, (in,out,STDERR)) + close(out.in) elseif mode == "w" - processes = @tmp_wpipe tmp inpipe spawn(false, cmds, (tmp,stdio,STDERR)) - (inpipe, processes) + in = io = Pipe() + out = other + processes = spawn(cmds, (in,out,STDERR)) + close(in.out) else throw(ArgumentError("mode must be \"r\" or \"w\", not \"$mode\"")) end + return (io, processes) end function open(f::Function, cmds::AbstractCmd, args...) @@ -465,16 +495,17 @@ function open(f::Function, cmds::AbstractCmd, args...) return ret end -# TODO: convert this to use open(cmd, "r+"), with a single read/write pipe +# TODO: deprecate this function readandwrite(cmds::AbstractCmd) - (out, processes) = @tmp_wpipe tmp inpipe open(cmds, "r", tmp) - (out, inpipe, processes) + in = Pipe() + out, processes = open(cmds, "r", in) + (out, in, processes) end function readbytes(cmd::AbstractCmd, stdin::AsyncStream=DevNull) - (out,pc) = open(cmd, "r", stdin) + out, procs = open(cmd, "r", stdin) bytes = readbytes(out) - !success(pc) && pipeline_error(pc) + !success(procs) && pipeline_error(procs) return bytes end @@ -553,7 +584,7 @@ function _contains_newline(bufptr::Ptr{Void}, len::Int32) end ## process status ## -process_running(s::Process) = s.exitcode == typemin(Int32) +process_running(s::Process) = s.exitcode == typemin(fieldtype(Process, :exitcode)) process_running(s::Vector{Process}) = any(process_running, s) process_running(s::ProcessChain) = process_running(s.processes) diff --git a/base/random.jl b/base/random.jl index a174522fd9509..1ae9bcb9f0f49 100644 --- a/base/random.jl +++ b/base/random.jl @@ -142,7 +142,7 @@ function make_seed() seed = reinterpret(UInt64, time()) seed = hash(seed, UInt64(getpid())) try - seed = hash(seed, parse(UInt64, readall(pipe(`ifconfig`, `sha1sum`))[1:40], 16)) + seed = hash(seed, parse(UInt64, readall(pipeline(`ifconfig`, `sha1sum`))[1:40], 16)) end return make_seed(seed) end diff --git a/base/socket.jl b/base/socket.jl index 99ae4eb4b78cb..5b8d1fb866781 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -343,7 +343,12 @@ _jl_sockaddr_set_port(ptr::Ptr{Void},port::UInt16) = ccall(:jl_sockaddr_set_port,Void,(Ptr{Void},UInt16),ptr,port) accept(server::TCPServer) = accept(server, TCPSocket()) -accept(server::PipeServer) = accept(server, Pipe()) + +# Libuv will internally reset the readable and writable flags on +# this pipe after it has successfully accepted the connection, to +# remember that before that this is an invalid pipe +accept(server::PipeServer) = accept(server, init_pipe!(PipeEndpoint(); + readable=false, writable=false, julia_only=true)) ## @@ -388,7 +393,7 @@ function UDPSocket() this end -function uvfinalize(uv::Union{TTY,Pipe,PipeServer,TCPServer,TCPSocket,UDPSocket}) +function uvfinalize(uv::Union{TTY,PipeEndpoint,PipeServer,TCPServer,TCPSocket,UDPSocket}) if (uv.status != StatusUninit && uv.status != StatusInit) close(uv) end diff --git a/base/stream.jl b/base/stream.jl index 105b9356a2c2c..18b2085369560 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -55,7 +55,7 @@ nb_available(s::AsyncStream) = nb_available(s.buffer) function eof(s::AsyncStream) wait_readnb(s,1) - !isopen(s) && nb_available(s.buffer)<=0 + !isopen(s) && nb_available(s)<=0 end const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB @@ -102,7 +102,7 @@ uv_req_data(handle) = ccall(:jl_uv_req_data,Ptr{Void},(Ptr{Void},),handle) uv_req_set_data(req,data) = ccall(:jl_uv_req_set_data,Void,(Ptr{Void},Any),req,data) uv_req_set_data(req,data::Ptr{Void}) = ccall(:jl_uv_req_set_data,Void,(Ptr{Void},Ptr{Void}),req,data) -type Pipe <: AsyncStream +type PipeEndpoint <: AsyncStream handle::Ptr{Void} status::Int buffer::IOBuffer @@ -117,7 +117,7 @@ type Pipe <: AsyncStream lock::ReentrantLock throttle::Int - Pipe(handle) = new( + PipeEndpoint(handle::Ptr{Void} = C_NULL) = new( handle, StatusUninit, PipeBuffer(), @@ -128,18 +128,6 @@ type Pipe <: AsyncStream nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) end -function Pipe() - handle = Libc.malloc(_sizeof_uv_named_pipe) - try - ret = Pipe(handle) - associate_julia_struct(ret.handle,ret) - finalizer(ret,uvfinalize) - return init_pipe!(ret;readable=true) - catch - Libc.free(handle) - rethrow() - end -end type PipeServer <: UVServer handle::Ptr{Void} @@ -155,12 +143,16 @@ type PipeServer <: UVServer false,Condition()) end -function init_pipe!(pipe::Union{Pipe,PipeServer};readable::Bool=false,writable=false,julia_only=true) - if pipe.handle == C_NULL - error("failed to initialize pipe") - elseif pipe.status != StatusUninit +function init_pipe!(pipe::Union{PipeEndpoint,PipeServer}; + readable::Bool = false, + writable::Bool = false, + julia_only::Bool = true) + if pipe.status != StatusUninit error("pipe is already initialized") end + if pipe.handle == C_NULL + malloc_julia_pipe!(pipe) + end uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), pipe.handle, writable,readable,julia_only)) pipe.status = StatusInit pipe @@ -179,7 +171,7 @@ function PipeServer() end end -show(io::IO,stream::Pipe) = print(io,"Pipe(",uv_status_string(stream),", ", +show(io::IO,stream::PipeEndpoint) = print(io,"PipeEndpoint(",uv_status_string(stream),", ", nb_available(stream.buffer)," bytes waiting)") show(io::IO,stream::PipeServer) = print(io,"PipeServer(",uv_status_string(stream),")") @@ -226,10 +218,14 @@ end # note that uv_is_readable/writable work for any subtype of # uv_stream_t, including uv_tty_t and uv_pipe_t -isreadable(io::Union{Pipe,TTY}) = - ccall(:uv_is_readable, Cint, (Ptr{Void},), io.handle)!=0 -iswritable(io::Union{Pipe,TTY}) = - ccall(:uv_is_writable, Cint, (Ptr{Void},), io.handle)!=0 +function isreadable(io::Union{PipeEndpoint,TTY}) + isopen(io) || return false + return ccall(:uv_is_readable, Cint, (Ptr{Void},), io.handle) != 0 +end +function iswritable(io::Union{PipeEndpoint,TTY}) + isopen(io) || return false + return ccall(:uv_is_writable, Cint, (Ptr{Void},), io.handle) != 0 +end nb_available(stream::AsyncStream) = nb_available(stream.buffer) @@ -271,7 +267,7 @@ function init_stdio(handle) elseif t == UV_TCP ret = TCPSocket(handle) elseif t == UV_NAMED_PIPE - ret = Pipe(handle) + ret = PipeEndpoint(handle) else throw(ArgumentError("invalid stdio type: $t")) end @@ -314,14 +310,14 @@ function reinit_stdio() global STDERR = init_stdio(ccall(:jl_stderr_stream,Ptr{Void},())) end -function isopen{T<:Union{AsyncStream,UVServer}}(x::T) - if !(x.status != StatusUninit && x.status != StatusInit) +function isopen(x::Union{AsyncStream,UVServer}) + if x.status == StatusUninit || x.status == StatusInit throw(ArgumentError("$T object not initialized")) end x.status != StatusClosed && x.status != StatusEOF end -function check_open(x) +function check_open(x::Union{AsyncStream,UVServer}) if !isopen(x) || x.status == StatusClosing throw(ArgumentError("stream is closed or unusable")) end @@ -519,6 +515,53 @@ function _uv_hook_close(uv::Union{AsyncStream,UVServer}) nothing end + +########################################## +# Pipe Abstraction +# (composed of two half-pipes) +########################################## + +abstract AbstractPipe <: AsyncStream +# allows sharing implementation with Process and ProcessChain + +type Pipe <: AbstractPipe + in::PipeEndpoint # writable + out::PipeEndpoint # readable +end +Pipe() = Pipe(PipeEndpoint(), PipeEndpoint()) + +function link_pipe(pipe::Pipe; + julia_only_read = false, + julia_only_write = false) + link_pipe(pipe.out, julia_only_read, pipe.in, julia_only_write); +end + +show(io::IO,stream::Pipe) = print(io, + "Pipe(", + uv_status_string(stream.in), ", ", + uv_status_string(stream.out), ", ", + nb_available(stream), " bytes waiting)") +isreadable(io::AbstractPipe) = isreadable(io.out) +iswritable(io::AbstractPipe) = iswritable(io.in) +read{T<:AbstractPipe}(io::T, args...) = read(io.out, args...) +write{T<:AbstractPipe}(io::T, args...) = write(io.in, args...) +write{S<:AbstractPipe,T}(io::S, a::Array{T}) = write(io.in, a) +buffer_or_write(io::AbstractPipe, p::Ptr, n::Integer) = buffer_or_write(io.in, p, n) +readuntil{T<:AbstractPipe}(io::T, args...) = readuntil(io.out, args...) +read!{T<:AbstractPipe}(io::T, args...) = read!(io.out, args...) +readbytes(io::AbstractPipe) = readbytes(io.out) +readavailable(io::AbstractPipe) = readavailable(io.out) +println{T<:AbstractPipe}(io::T, args...) = println(io.out, args...) +flush(io::AbstractPipe) = flush(io.in) +buffer_writes(io::AbstractPipe, args...) = buffer_writes(io.in, args...) +isopen(io::AbstractPipe) = isopen(io.in) || isopen(io.out) +close(io::AbstractPipe) = (close(io.in); close(io.out)) +wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(io.out, nb) +wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(io.out, byte) +wait_close(io::AbstractPipe) = (wait_close(io.in); wait_close(io.out)) +nb_available(io::AbstractPipe) = nb_available(io.out) +eof(io::AbstractPipe) = eof(io.out) + ########################################## # Async Worker ########################################## @@ -656,66 +699,95 @@ function process_events(block::Bool) end ## pipe functions ## -function malloc_julia_pipe(x) +function malloc_julia_pipe!(x) + assert(x.handle == C_NULL) x.handle = Libc.malloc(_sizeof_uv_named_pipe) - associate_julia_struct(x.handle,x) - finalizer(x,uvfinalize) + associate_julia_struct(x.handle, x) + finalizer(x, uvfinalize) end -_link_pipe(read_end::Ptr{Void},write_end::Ptr{Void}) = uv_error("pipe_link",ccall(:uv_pipe_link, Int32, (Ptr{Void}, Ptr{Void}), read_end, write_end)) +function _link_pipe(read_end::Ptr{Void}, write_end::Ptr{Void}) + uv_error("pipe_link", + ccall(:uv_pipe_link, Int32, (Ptr{Void}, Ptr{Void}), read_end, write_end)) +end -function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Ptr{Void},writable_julia_only::Bool,readpipe::AsyncStream,writepipe::AsyncStream) +function link_pipe(read_end::Ptr{Void}, readable_julia_only::Bool, + write_end::Ptr{Void}, writable_julia_only::Bool, + readpipe::AsyncStream, writepipe::AsyncStream) #make the pipe an unbuffered stream for now #TODO: this is probably not freeing memory properly after errors - uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only)) - uv_error("init_pipe(2)",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), write_end, 1, 0, writable_julia_only)) + uv_error("init_pipe(read)", + ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only)) + uv_error("init_pipe(write)", + ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), write_end, 1, 0, writable_julia_only)) + _link_pipe(read_end, write_end) +end + +function link_pipe(read_end::Ptr{Void}, readable_julia_only::Bool, + write_end::Ptr{Void}, writable_julia_only::Bool) + uv_error("init_pipe(read)", + ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only)) + uv_error("init_pipe(write)", + ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), write_end, 1, 0, writable_julia_only)) _link_pipe(read_end,write_end) end -function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Ptr{Void},writable_julia_only::Bool) - uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only)) - uv_error("init_pipe(2)",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), write_end, 1, 0, writable_julia_only)) - _link_pipe(read_end,write_end) -end - -function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Ptr{Void},writable_julia_only::Bool) +function link_pipe(read_end::PipeEndpoint, readable_julia_only::Bool, + write_end::Ptr{Void}, writable_julia_only::Bool) if read_end.handle == C_NULL - malloc_julia_pipe(read_end) + malloc_julia_pipe!(read_end) end - init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only) - uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), write_end, 1, 0, writable_julia_only)) - _link_pipe(read_end.handle,write_end) + init_pipe!(read_end; + readable = true, writable = false, julia_only = readable_julia_only) + uv_error("init_pipe", + ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), write_end, 1, 0, writable_julia_only)) + _link_pipe(read_end.handle, write_end) read_end.status = StatusOpen end -function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Pipe,writable_julia_only::Bool) + +function link_pipe(read_end::Ptr{Void}, readable_julia_only::Bool, + write_end::PipeEndpoint, writable_julia_only::Bool) if write_end.handle == C_NULL - malloc_julia_pipe(write_end) + malloc_julia_pipe!(write_end) end - uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only)) - init_pipe!(write_end; readable = false, writable = true, julia_only = writable_julia_only) - _link_pipe(read_end,write_end.handle) + uv_error("init_pipe", + ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only)) + init_pipe!(write_end; + readable = false, writable = true, julia_only = writable_julia_only) + _link_pipe(read_end, write_end.handle) write_end.status = StatusOpen end -function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Pipe,writable_julia_only::Bool) + +function link_pipe(read_end::PipeEndpoint, readable_julia_only::Bool, + write_end::PipeEndpoint, writable_julia_only::Bool) if write_end.handle == C_NULL - malloc_julia_pipe(write_end) + malloc_julia_pipe!(write_end) end if read_end.handle == C_NULL - malloc_julia_pipe(read_end) + malloc_julia_pipe!(read_end) end - init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only) - init_pipe!(write_end; readable = false, writable = true, julia_only = writable_julia_only) - _link_pipe(read_end.handle,write_end.handle) + init_pipe!(read_end; + readable = true, writable = false, julia_only = readable_julia_only) + init_pipe!(write_end; + readable = false, writable = true, julia_only = writable_julia_only) + _link_pipe(read_end.handle, write_end.handle) write_end.status = StatusOpen read_end.status = StatusOpen nothing end -close_pipe_sync(p::Pipe) = (ccall(:uv_pipe_close_sync,Void,(Ptr{Void},),p.handle); p.status = StatusClosed) -close_pipe_sync(handle::UVHandle) = ccall(:uv_pipe_close_sync,Void,(UVHandle,),handle) -function close(stream::Union{AsyncStream,UVServer}) +function close_pipe_sync(p::PipeEndpoint) + ccall(:uv_pipe_close_sync, Void, (Ptr{Void},), p.handle) + p.status = StatusClosed + nothing +end +function close_pipe_sync(handle::UVHandle) + ccall(:uv_pipe_close_sync, Void, (UVHandle,), handle) +end + +function close(stream::Union{AsyncStream, UVServer}) if isopen(stream) && stream.status != StatusClosing - ccall(:jl_close_uv,Void,(Ptr{Void},),stream.handle) + ccall(:jl_close_uv,Void, (Ptr{Void},), stream.handle) stream.status = StatusClosing end nothing @@ -746,7 +818,11 @@ function start_reading(stream::AsyncStream, cb::Function) end return failure_code end -start_reading(stream::AsyncStream, cb::Bool) = (failure_code = start_reading(stream); stream.readcb = cb; return failure_code) +function start_reading(stream::AsyncStream, cb::Bool) + failure_code = start_reading(stream) + stream.readcb = cb + return failure_code +end function stop_reading(stream::AsyncStream) if stream.status == StatusActive @@ -825,22 +901,22 @@ readline() = readline(STDIN) function readavailable(this::AsyncStream) buf = this.buffer @assert buf.seekable == false - wait_readnb(this,1) + wait_readnb(this, 1) takebuf_array(buf) end -function readuntil(this::AsyncStream,c::UInt8) +function readuntil(this::AsyncStream, c::UInt8) buf = this.buffer @assert buf.seekable == false - wait_readbyte(this,c) - readuntil(buf,c) + wait_readbyte(this, c) + readuntil(buf, c) end -#function finish_read(pipe::Pipe) +#function finish_read(pipe::PipeEndpoint) # close(pipe) #handles to UV and ios will be invalid after this point #end # -#function finish_read(state::(Pipe,ByteString)) +#function finish_read(state::(PipeEndpoint,ByteString)) # finish_read(state...) #end @@ -953,7 +1029,7 @@ show(io::IO, e::UVError) = print(io, e.prefix*": "*struverror(e)*" ("*uverrornam ## server functions ## -function accept_nonblock(server::PipeServer,client::Pipe) +function accept_nonblock(server::PipeServer,client::PipeEndpoint) if client.status != StatusInit error(client.status == StatusUninit ? "client is not initialized" : "client is already in use or has been closed") @@ -965,7 +1041,7 @@ function accept_nonblock(server::PipeServer,client::Pipe) err end function accept_nonblock(server::PipeServer) - client = Pipe() + client = init_pipe!(PipeEndpoint(); readable=true, writable=true, julia_only=true) uv_error("accept", accept_nonblock(server,client) != 0) client end @@ -1020,7 +1096,7 @@ function listen(path::AbstractString) sock end -function connect!(sock::Pipe, path::AbstractString) +function connect!(sock::PipeEndpoint, path::AbstractString) @assert sock.status == StatusInit req = Libc.malloc(_sizeof_uv_connect) uv_req_set_data(req,C_NULL) @@ -1035,7 +1111,9 @@ function connect(sock::AsyncStream, args...) sock end -connect(path::AbstractString) = connect(Pipe(),path) +# Libuv will internally reset read/writability, which is uses to +# mark that this is an invalid pipe. +connect(path::AbstractString) = connect(init_pipe!(PipeEndpoint(); readable=false, writable=false, julia_only=true),path) _fd(x::IOStream) = RawFD(fd(x)) @unix_only _fd(x::AsyncStream) = RawFD(ccall(:jl_uv_handle,Int32,(Ptr{Void},),x.handle)) @@ -1061,7 +1139,7 @@ for (x,writable,unix_fd,c_symbol) in ((:STDIN,false,0,:jl_uv_stdin),(:STDOUT,tru handle end function ($f)() - read,write = (Pipe(C_NULL), Pipe(C_NULL)) + read,write = (PipeEndpoint(), PipeEndpoint()) link_pipe(read,$(writable),write,$(!writable)) ($f)($(writable? :write : :read)) (read,write) diff --git a/test/repl.jl b/test/repl.jl index 307ad3445310a..51968f0e07d27 100644 --- a/test/repl.jl +++ b/test/repl.jl @@ -9,9 +9,9 @@ function fake_repl() # Use pipes so we can easily do blocking reads # In the future if we want we can add a test that the right object # gets displayed by intercepting the display - stdin_read,stdin_write = (Base.Pipe(C_NULL), Base.Pipe(C_NULL)) - stdout_read,stdout_write = (Base.Pipe(C_NULL), Base.Pipe(C_NULL)) - stderr_read,stderr_write = (Base.Pipe(C_NULL), Base.Pipe(C_NULL)) + stdin_read,stdin_write = (Base.PipeEndpoint(), Base.PipeEndpoint()) + stdout_read,stdout_write = (Base.PipeEndpoint(), Base.PipeEndpoint()) + stderr_read,stderr_write = (Base.PipeEndpoint(), Base.PipeEndpoint()) Base.link_pipe(stdin_read,true,stdin_write,true) Base.link_pipe(stdout_read,true,stdout_write,true) Base.link_pipe(stderr_read,true,stderr_write,true) diff --git a/test/spawn.jl b/test/spawn.jl index 313d03ed10012..cc33cb0811010 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -19,23 +19,23 @@ yes = `perl -le 'while (1) {print STDOUT "y"}'` #### Examples used in the manual #### @test readall(`echo hello | sort`) == "hello | sort\n" -@test readall(pipe(`echo hello`, `sort`)) == "hello\n" -@test length(spawn(pipe(`echo hello`, `sort`)).processes) == 2 +@test readall(pipeline(`echo hello`, `sort`)) == "hello\n" +@test length(spawn(pipeline(`echo hello`, `sort`)).processes) == 2 out = readall(`echo hello` & `echo world`) @test search(out,"world") != (0,0) @test search(out,"hello") != (0,0) -@test readall(pipe(`echo hello` & `echo world`, `sort`)) == "hello\nworld\n" +@test readall(pipeline(`echo hello` & `echo world`, `sort`)) == "hello\nworld\n" @test (run(`printf " \033[34m[stdio passthrough ok]\033[0m\n"`); true) # Test for SIGPIPE being treated as normal termination (throws an error if broken) -@unix_only @test (run(pipe(yes,`head`,DevNull)); true) +@unix_only @test (run(pipeline(yes,`head`,DevNull)); true) begin a = Base.Condition() @schedule begin - p = spawn(pipe(yes,DevNull)) + p = spawn(pipeline(yes,DevNull)) Base.notify(a,p) @test !success(p) end @@ -51,30 +51,30 @@ end if false prefixer(prefix, sleep) = `perl -nle '$|=1; print "'$prefix' ", $_; sleep '$sleep';'` - @test success(pipe(`perl -le '$|=1; for(0..2){ print; sleep 1 }'`, + @test success(pipeline(`perl -le '$|=1; for(0..2){ print; sleep 1 }'`, prefixer("A",2) & prefixer("B",2))) - @test success(pipe(`perl -le '$|=1; for(0..2){ print; sleep 1 }'`, + @test success(pipeline(`perl -le '$|=1; for(0..2){ print; sleep 1 }'`, prefixer("X",3) & prefixer("Y",3) & prefixer("Z",3), prefixer("A",2) & prefixer("B",2))) end @test success(`true`) @test !success(`false`) -@test success(pipe(`true`, `true`)) +@test success(pipeline(`true`, `true`)) if false @test success(ignorestatus(`false`)) - @test success(pipe(ignorestatus(`false`), `true`)) - @test !success(pipe(ignorestatus(`false`), `false`)) + @test success(pipeline(ignorestatus(`false`), `true`)) + @test !success(pipeline(ignorestatus(`false`), `false`)) @test !success(ignorestatus(`false`) & `false`) - @test success(ignorestatus(pipe(`false`, `false`))) + @test success(ignorestatus(pipeline(`false`, `false`))) @test success(ignorestatus(`false` & `false`)) end # STDIN Redirection file = tempname() -run(pipe(`echo hello world`, file)) -@test readall(pipe(file, `cat`)) == "hello world\n" -@test open(readall, pipe(file, `cat`), "r") == "hello world\n" +run(pipeline(`echo hello world`, file)) +@test readall(pipeline(file, `cat`)) == "hello world\n" +@test open(readall, pipeline(file, `cat`), "r") == "hello world\n" rm(file) # Stream Redirection @@ -84,12 +84,12 @@ rm(file) port, server = listenany(2326) put!(r,port) client = accept(server) - @test readall(pipe(client, `cat`)) == "hello world\n" + @test readall(pipeline(client, `cat`)) == "hello world\n" close(server) end @async begin sock = connect(fetch(r)) - run(pipe(`echo hello world`, sock)) + run(pipeline(`echo hello world`, sock)) close(sock) end end @@ -118,7 +118,7 @@ str2 = readall(stdout) # This test hangs if the end of run walk across uv streams calls shutdown on a stream that is shutting down. file = tempname() -open(pipe(`cat -`, file), "w") do io +open(pipeline(`cat -`, file), "w") do io write(io, str) end rm(file) @@ -169,18 +169,23 @@ unmark(sock) close(sock) # issue #4535 -exename = joinpath(JULIA_HOME, Base.julia_exename()) +exename = Base.julia_cmd() if valgrind_off # If --trace-children=yes is passed to valgrind, we will get a # valgrind banner here, not "Hello World\n". - @test readall(pipe(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr=`cat`)) == "Hello World\n" + @test readall(pipeline(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr=`cat`)) == "Hello World\n" + out = Pipe() + proc = spawn(pipeline(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr = out)) + close(out.in) + @test readall(out) == "Hello World\n" + @test success(proc) end # issue #6310 -@test readall(pipe(`echo "2+2"`, `$exename -f`)) == "4\n" +@test readall(pipeline(`echo "2+2"`, `$exename -f`)) == "4\n" # issue #5904 -@test run(pipe(ignorestatus(`false`), `true`)) === nothing +@test run(pipeline(ignorestatus(`false`), `true`)) === nothing # issue #6010 @@ -244,16 +249,15 @@ let bad = "bad\0name" end # issue #8529 -@test_throws ErrorException run(pipe(Base.Pipe(C_NULL), `cat`)) let fname = tempname() open(fname, "w") do f println(f, "test") end code = """ for line in eachline(STDIN) - run(pipe(`echo asdf`,`cat`)) + run(pipeline(`echo asdf`,`cat`)) end """ - @test success(pipe(`cat $fname`, `$exename -e $code`)) + @test success(pipeline(`cat $fname`, `$exename -e $code`)) rm(fname) end diff --git a/test/strings/io.jl b/test/strings/io.jl index 1677094d93f9e..dc315fb3bb62e 100644 --- a/test/strings/io.jl +++ b/test/strings/io.jl @@ -155,7 +155,7 @@ else for encoding in ["UTF-32LE", "UTF-16BE", "UTF-16LE", "UTF-8"] output_path = joinpath(unicodedir, encoding*".unicode") f = Base.FS.open(output_path,Base.JL_O_WRONLY|Base.JL_O_CREAT,Base.S_IRUSR | Base.S_IWUSR | Base.S_IRGRP | Base.S_IROTH) - run(pipe(`iconv -f $primary_encoding -t $encoding $primary_path`, f)) + run(pipeline(`iconv -f $primary_encoding -t $encoding $primary_path`, f)) Base.FS.close(f) end