Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process and pipe cleanup #12739

Merged
merged 15 commits into from
Aug 26, 2015
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Make it possible again to read STDERR without calling cat
Merge the Pipe() and Pipe(C_NULL) constructors having the former just
call the latter and delaying initialization to the place where it is
actually needed. Now passing an empty Pipe to spawn will once again
initialize it with the appropriate pipe end.
Keno authored and vtjnash committed Aug 22, 2015
commit 771ec0bd8a16be00cae8858f161e363340688b3b
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
@@ -1162,6 +1162,7 @@ export
ntoh,
open,
pipe,
Pipe,
PipeBuffer,
poll_fd,
poll_file,
18 changes: 12 additions & 6 deletions base/process.jl
Original file line number Diff line number Diff line change
@@ -311,7 +311,9 @@ macro setup_stdio()
in,out,err = stdios
if isa(stdios[1], Pipe)
if stdios[1].handle == C_NULL
error("pipes passed to spawn must be initialized")
in = box(Ptr{Void},Intrinsics.jl_alloca(_sizeof_uv_named_pipe))
link_pipe(in,false,stdios[1],true)
close_in = true
end
elseif isa(stdios[1], FileRedirect)
in = FS.open(stdios[1].filename, JL_O_RDONLY)
@@ -321,7 +323,9 @@ macro setup_stdio()
end
if isa(stdios[2], Pipe)
if stdios[2].handle == C_NULL
error("pipes passed to spawn must be initialized")
out = box(Ptr{Void},Intrinsics.jl_alloca(_sizeof_uv_named_pipe))
link_pipe(stdios[2],true,out,false)
close_out = true
end
elseif isa(stdios[2], FileRedirect)
out = FS.open(stdios[2].filename, JL_O_WRONLY | JL_O_CREAT | (stdios[2].append?JL_O_APPEND:JL_O_TRUNC), S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
@@ -331,7 +335,9 @@ macro setup_stdio()
end
if isa(stdios[3], Pipe)
if stdios[3].handle == C_NULL
error("pipes passed to spawn must be initialized")
err = box(Ptr{Void},Intrinsics.jl_alloca(_sizeof_uv_named_pipe))
link_pipe(stdios[3],true,err,false)
close_err = true
end
elseif isa(stdios[3], FileRedirect)
err = FS.open(stdios[3].filename, JL_O_WRONLY | JL_O_CREAT | (stdios[3].append?JL_O_APPEND:JL_O_TRUNC), S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
@@ -345,9 +351,9 @@ end
macro cleanup_stdio()
esc(
quote
close_in && close(in)
close_out && close(out)
close_err && close(err)
close_in && (isa(in,Ptr) ? close_pipe_sync(in) : close(in))
close_out && (isa(out,Ptr) ? close_pipe_sync(out) : close(out))
close_err && (isa(err,Ptr) ? close_pipe_sync(err) : close(err))
end)
end

7 changes: 6 additions & 1 deletion base/socket.jl
Original file line number Diff line number Diff line change
@@ -343,7 +343,12 @@ _jl_sockaddr_set_port(ptr::Ptr{Void},port::UInt16) =
ccall(:jl_sockaddr_set_port,Void,(Ptr{Void},UInt16),ptr,port)

accept(server::TCPServer) = accept(server, TCPSocket())
accept(server::PipeServer) = accept(server, Pipe())

# Libuv will internally reset the readable and writable flags on
# this pipe after it has successfully accepted the connection, to
# remember that before that this is an invalid pipe
accept(server::PipeServer) = accept(server, init_pipe!(Pipe();
readable=false, writable=false, julia_only=true))

##

34 changes: 13 additions & 21 deletions base/stream.jl
Original file line number Diff line number Diff line change
@@ -128,18 +128,7 @@ type Pipe <: AsyncStream
nothing, ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
end
function Pipe()
handle = Libc.malloc(_sizeof_uv_named_pipe)
try
ret = Pipe(handle)
associate_julia_struct(ret.handle,ret)
finalizer(ret,uvfinalize)
return init_pipe!(ret;readable=true)
catch
Libc.free(handle)
rethrow()
end
end
Pipe() = Pipe(C_NULL)

type PipeServer <: UVServer
handle::Ptr{Void}
@@ -157,8 +146,9 @@ end

function init_pipe!(pipe::Union{Pipe,PipeServer};readable::Bool=false,writable=false,julia_only=true)
if pipe.handle == C_NULL
error("failed to initialize pipe")
elseif pipe.status != StatusUninit
malloc_julia_pipe!(pipe)
end
if pipe.status != StatusUninit
error("pipe is already initialized")
end
uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), pipe.handle, writable,readable,julia_only))
@@ -656,7 +646,7 @@ function process_events(block::Bool)
end

## pipe functions ##
function malloc_julia_pipe(x)
function malloc_julia_pipe!(x)
x.handle = Libc.malloc(_sizeof_uv_named_pipe)
associate_julia_struct(x.handle,x)
finalizer(x,uvfinalize)
@@ -680,7 +670,7 @@ end

function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Ptr{Void},writable_julia_only::Bool)
if read_end.handle == C_NULL
malloc_julia_pipe(read_end)
malloc_julia_pipe!(read_end)
end
init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only)
uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), write_end, 1, 0, writable_julia_only))
@@ -689,7 +679,7 @@ function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Ptr{Void}
end
function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Pipe,writable_julia_only::Bool)
if write_end.handle == C_NULL
malloc_julia_pipe(write_end)
malloc_julia_pipe!(write_end)
end
uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only))
init_pipe!(write_end; readable = false, writable = true, julia_only = writable_julia_only)
@@ -698,10 +688,10 @@ function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Pipe
end
function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Pipe,writable_julia_only::Bool)
if write_end.handle == C_NULL
malloc_julia_pipe(write_end)
malloc_julia_pipe!(write_end)
end
if read_end.handle == C_NULL
malloc_julia_pipe(read_end)
malloc_julia_pipe!(read_end)
end
init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only)
init_pipe!(write_end; readable = false, writable = true, julia_only = writable_julia_only)
@@ -965,7 +955,7 @@ function accept_nonblock(server::PipeServer,client::Pipe)
err
end
function accept_nonblock(server::PipeServer)
client = Pipe()
client = init_pipe!(Pipe(); readable=true, writable=true, julia_only=true)
uv_error("accept", accept_nonblock(server,client) != 0)
client
end
@@ -1035,7 +1025,9 @@ function connect(sock::AsyncStream, args...)
sock
end

connect(path::AbstractString) = connect(Pipe(),path)
# Libuv will internally reset read/writability, which is uses to
# mark that this is an invalid pipe.
connect(path::AbstractString) = connect(init_pipe!(Pipe(); readable=false, writable=false, julia_only=true),path)

_fd(x::IOStream) = RawFD(fd(x))
@unix_only _fd(x::AsyncStream) = RawFD(ccall(:jl_uv_handle,Int32,(Ptr{Void},),x.handle))
3 changes: 3 additions & 0 deletions test/spawn.jl
Original file line number Diff line number Diff line change
@@ -174,6 +174,9 @@ if valgrind_off
# If --trace-children=yes is passed to valgrind, we will get a
# valgrind banner here, not "Hello World\n".
@test readall(pipe(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr=`cat`)) == "Hello World\n"
p = Pipe()
run(pipe(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr = p))
@test readall(p) == "Hello World\n"
end

# issue #6310