Skip to content

Commit 2719224

Browse files
authored
Revert "Revert "add stream shutdown and support half-duplex operation (#40783)" (#41808)"
This reverts commit d15a330.
1 parent d15a330 commit 2719224

File tree

14 files changed

+240
-111
lines changed

14 files changed

+240
-111
lines changed

NEWS.md

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Standard library changes
4545
overflow in most cases. The new function `checked_length` is now available, which will try to use checked
4646
arithmetic to error if the result may be wrapping. Or use a package such as SaferIntegers.jl when
4747
constructing the range. ([#40382])
48+
* TCP socket objects now expose `shutdown` functionality and support half-open mode usage ([#40783]).
4849

4950
#### InteractiveUtils
5051
* A new macro `@time_imports` for reporting any time spent importing packages and their dependencies ([#41612])

base/coreio.jl

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ write(::DevNull, ::UInt8) = 1
1313
unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
1414
close(::DevNull) = nothing
1515
wait_close(::DevNull) = wait()
16+
bytesavailable(io::DevNull) = 0
1617

1718
let CoreIO = Union{Core.CoreSTDOUT, Core.CoreSTDERR}
1819
global write(io::CoreIO, x::UInt8) = Core.write(io, x)

base/exports.jl

+1
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,7 @@ export
803803

804804
# I/O and events
805805
close,
806+
shutdown,
806807
countlines,
807808
eachline,
808809
readeach,

base/io.jl

+72-52
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,49 @@ function isopen end
6060
Close an I/O stream. Performs a [`flush`](@ref) first.
6161
"""
6262
function close end
63+
64+
"""
65+
shutdown(stream)
66+
67+
Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref)
68+
first. Notify the other end that no more data will be written to the underlying
69+
file. This is not supported by all IO types.
70+
71+
# Examples
72+
```jldoctest
73+
julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task
74+
75+
julia> write(io, "request");
76+
77+
julia> # calling `read(io)` here would block forever
78+
79+
julia> shutdown(io);
80+
81+
julia> read(io, String)
82+
"request"
83+
"""
84+
function shutdown end
85+
86+
"""
87+
flush(stream)
88+
89+
Commit all currently buffered writes to the given stream.
90+
"""
6391
function flush end
64-
function wait_readnb end
65-
function wait_close end
92+
93+
"""
94+
bytesavailable(io)
95+
96+
Return the number of bytes available for reading before a read from this stream or buffer will block.
97+
98+
# Examples
99+
```jldoctest
100+
julia> io = IOBuffer("JuliaLang is a GitHub organization");
101+
102+
julia> bytesavailable(io)
103+
34
104+
```
105+
"""
66106
function bytesavailable end
67107

68108
"""
@@ -81,7 +121,7 @@ function readavailable end
81121
"""
82122
isreadable(io) -> Bool
83123
84-
Return `true` if the specified IO object is readable (if that can be determined).
124+
Return `false` if the specified IO object is not readable.
85125
86126
# Examples
87127
```jldoctest
@@ -99,12 +139,12 @@ true
99139
julia> rm("myfile.txt")
100140
```
101141
"""
102-
function isreadable end
142+
isreadable(io::IO) = isopen(io)
103143

104144
"""
105145
iswritable(io) -> Bool
106146
107-
Return `true` if the specified IO object is writable (if that can be determined).
147+
Return `false` if the specified IO object is not writable.
108148
109149
# Examples
110150
```jldoctest
@@ -122,10 +162,23 @@ false
122162
julia> rm("myfile.txt")
123163
```
124164
"""
125-
function iswritable end
126-
function copy end
165+
iswritable(io::IO) = isopen(io)
166+
167+
"""
168+
eof(stream) -> Bool
169+
170+
Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
171+
function will block to wait for more data if necessary, and then return `false`. Therefore
172+
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
173+
`false` as long as buffered data is still available, even if the remote end of a connection
174+
is closed.
175+
"""
127176
function eof end
128177

178+
function copy end
179+
function wait_readnb end
180+
function wait_close end
181+
129182
"""
130183
read(io::IO, T)
131184
@@ -357,65 +410,37 @@ end
357410
function pipe_reader end
358411
function pipe_writer end
359412

413+
for f in (:flush, :shutdown, :iswritable)
414+
@eval $(f)(io::AbstractPipe) = $(f)(pipe_writer(io)::IO)
415+
end
360416
write(io::AbstractPipe, byte::UInt8) = write(pipe_writer(io)::IO, byte)
361417
write(to::IO, from::AbstractPipe) = write(to, pipe_reader(from))
362418
unsafe_write(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_write(pipe_writer(io)::IO, p, nb)::Union{Int,UInt}
363419
buffer_writes(io::AbstractPipe, args...) = buffer_writes(pipe_writer(io)::IO, args...)
364-
flush(io::AbstractPipe) = flush(pipe_writer(io)::IO)
365420

421+
for f in (
422+
# peek/mark interface
423+
:mark, :unmark, :reset, :ismarked,
424+
# Simple reader functions
425+
:read, :readavailable, :bytesavailable, :reseteof, :isreadable)
426+
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
427+
end
366428
read(io::AbstractPipe, byte::Type{UInt8}) = read(pipe_reader(io)::IO, byte)::UInt8
367429
unsafe_read(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_read(pipe_reader(io)::IO, p, nb)
368-
read(io::AbstractPipe) = read(pipe_reader(io)::IO)
369430
readuntil(io::AbstractPipe, arg::UInt8; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
370431
readuntil(io::AbstractPipe, arg::AbstractChar; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
371432
readuntil(io::AbstractPipe, arg::AbstractString; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
372433
readuntil(io::AbstractPipe, arg::AbstractVector; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
373434
readuntil_vector!(io::AbstractPipe, target::AbstractVector, keep::Bool, out) = readuntil_vector!(pipe_reader(io)::IO, target, keep, out)
374435
readbytes!(io::AbstractPipe, target::AbstractVector{UInt8}, n=length(target)) = readbytes!(pipe_reader(io)::IO, target, n)
375-
376-
for f in (
377-
# peek/mark interface
378-
:mark, :unmark, :reset, :ismarked,
379-
# Simple reader functions
380-
:readavailable, :isreadable)
381-
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
382-
end
383436
peek(io::AbstractPipe, ::Type{T}) where {T} = peek(pipe_reader(io)::IO, T)::T
437+
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
438+
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool
384439

385-
iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)::IO)
386440
isopen(io::AbstractPipe) = isopen(pipe_writer(io)::IO) || isopen(pipe_reader(io)::IO)
387441
close(io::AbstractPipe) = (close(pipe_writer(io)::IO); close(pipe_reader(io)::IO))
388-
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
389442
wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)::IO); wait_close(pipe_reader(io)::IO))
390443

391-
"""
392-
bytesavailable(io)
393-
394-
Return the number of bytes available for reading before a read from this stream or buffer will block.
395-
396-
# Examples
397-
```jldoctest
398-
julia> io = IOBuffer("JuliaLang is a GitHub organization");
399-
400-
julia> bytesavailable(io)
401-
34
402-
```
403-
"""
404-
bytesavailable(io::AbstractPipe) = bytesavailable(pipe_reader(io)::IO)
405-
bytesavailable(io::DevNull) = 0
406-
407-
"""
408-
eof(stream) -> Bool
409-
410-
Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
411-
function will block to wait for more data if necessary, and then return `false`. Therefore
412-
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
413-
`false` as long as buffered data is still available, even if the remote end of a connection
414-
is closed.
415-
"""
416-
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool
417-
reseteof(io::AbstractPipe) = reseteof(pipe_reader(io)::IO)
418-
419444

420445
# Exception-safe wrappers (io = open(); try f(io) finally close(io))
421446

@@ -1119,11 +1144,6 @@ ismarked(io::IO) = io.mark >= 0
11191144
# Make sure all IO streams support flush, even if only as a no-op,
11201145
# to make it easier to write generic I/O code.
11211146

1122-
"""
1123-
flush(stream)
1124-
1125-
Commit all currently buffered writes to the given stream.
1126-
"""
11271147
flush(io::IO) = nothing
11281148

11291149
"""

base/iobuffer.jl

+6
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,12 @@ end
334334

335335
eof(io::GenericIOBuffer) = (io.ptr-1 == io.size)
336336

337+
function shutdown(io::GenericIOBuffer)
338+
io.writable = false
339+
# OR throw(_UVError("shutdown", UV_ENOTSOCK))
340+
nothing
341+
end
342+
337343
@noinline function close(io::GenericIOBuffer{T}) where T
338344
io.readable = false
339345
io.writable = false

base/libuv.jl

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ end
107107
function uv_alloc_buf end
108108
function uv_readcb end
109109
function uv_writecb_task end
110+
function uv_shutdowncb_task end
110111
function uv_return_spawn end
111112
function uv_asynccb end
112113
function uv_timercb end

base/process.jl

+1
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ function setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool)
275275
@warn "Process error" exception=(ex, catch_backtrace())
276276
finally
277277
close(parent)
278+
child_readable || shutdown(stdio)
278279
end
279280
end
280281
catch ex

0 commit comments

Comments
 (0)