Skip to content

Commit 5863b48

Browse files
committedJun 6, 2015
make start_reading/stop_reading automatic exactly when needed. fix #1925, fix #10655 (closes #11530)
this also makes the read throttle more intelligent so that it doesn't get tripped up by wait_nb requests for more than READ_BUFFER_SZ bytes
1 parent b67b0d1 commit 5863b48

File tree

8 files changed

+99
-78
lines changed

8 files changed

+99
-78
lines changed
 

‎base/LineEdit.jl

-4
Original file line numberDiff line numberDiff line change
@@ -1569,7 +1569,6 @@ function prompt!(term, prompt, s = init_state(term, prompt))
15691569
raw!(term, true)
15701570
enable_bracketed_paste(term)
15711571
try
1572-
Base.start_reading(term)
15731572
activate(prompt, s, term)
15741573
while true
15751574
map = keymap(s, prompt)
@@ -1585,14 +1584,11 @@ function prompt!(term, prompt, s = init_state(term, prompt))
15851584
state = :done
15861585
end
15871586
if state == :abort
1588-
Base.stop_reading(term)
15891587
return buffer(s), false, false
15901588
elseif state == :done
1591-
Base.stop_reading(term)
15921589
return buffer(s), true, false
15931590
elseif state == :suspend
15941591
@unix_only begin
1595-
Base.stop_reading(term)
15961592
return buffer(s), true, true
15971593
end
15981594
else

‎base/REPL.jl

-2
Original file line numberDiff line numberDiff line change
@@ -800,11 +800,9 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep
800800
LineEdit.commit_line(s)
801801
# This is slightly ugly but ok for now
802802
terminal = LineEdit.terminal(s)
803-
Base.stop_reading(terminal)
804803
raw!(terminal, false) && disable_bracketed_paste(terminal)
805804
LineEdit.mode(s).on_done(s, LineEdit.buffer(s), true)
806805
raw!(terminal, true) && enable_bracketed_paste(terminal)
807-
Base.start_reading(terminal)
808806
else
809807
break
810808
end

‎base/iobuffer.jl

-3
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@ show(io::IO, b::AbstractIOBuffer) = print(io, "IOBuffer(data=UInt8[...], ",
5252
"ptr=", b.ptr, ", ",
5353
"mark=", b.mark, ")")
5454

55-
is_maxsize_unlimited(io::AbstractIOBuffer) = (io.maxsize == typemax(Int))
56-
maxsize(io::AbstractIOBuffer) = io.maxsize
57-
5855
read!(from::AbstractIOBuffer, a::Array) = read_sub(from, a, 1, length(a))
5956

6057
function read_sub{T}(from::AbstractIOBuffer, a::AbstractArray{T}, offs, nel)

‎base/multi.jl

-1
Original file line numberDiff line numberDiff line change
@@ -811,7 +811,6 @@ end
811811
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket; kwargs...)
812812
@schedule begin
813813
disable_nagle(r_stream)
814-
Base.start_reading(r_stream)
815814
wait_connected(r_stream)
816815
if r_stream != w_stream
817816
disable_nagle(w_stream)

‎base/process.jl

+1-3
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,6 @@ eachline(cmd::AbstractCmd) = eachline(cmd, DevNull)
434434
function open(cmds::AbstractCmd, mode::AbstractString="r", stdio::AsyncStream=DevNull)
435435
if mode == "r"
436436
processes = @tmp_rpipe out tmp spawn(false, cmds, (stdio,tmp,STDERR))
437-
start_reading(out)
438437
(out, processes)
439438
elseif mode == "w"
440439
processes = @tmp_wpipe tmp inpipe spawn(false, cmds, (tmp,stdio,STDERR))
@@ -467,8 +466,7 @@ end
467466
function readbytes(cmd::AbstractCmd, stdin::AsyncStream=DevNull)
468467
(out,pc) = open(cmd, "r", stdin)
469468
!success(pc) && pipeline_error(pc)
470-
wait_close(out)
471-
return takebuf_array(out.buffer)
469+
return readbytes(out)
472470
end
473471

474472
function readall(cmd::AbstractCmd, stdin::AsyncStream=DevNull)

‎base/socket.jl

+3-1
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ type TCPSocket <: Socket
263263
closenotify::Condition
264264
sendbuf::Nullable{IOBuffer}
265265
lock::ReentrantLock
266+
throttle::Int
266267

267268
TCPSocket(handle) = new(
268269
handle,
@@ -273,7 +274,8 @@ type TCPSocket <: Socket
273274
false, Condition(),
274275
false, Condition(),
275276
nothing,
276-
ReentrantLock()
277+
ReentrantLock(),
278+
DEFAULT_READ_BUFFER_SZ
277279
)
278280
end
279281
function TCPSocket()

‎base/stream.jl

+83-54
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ function eof(s::AsyncStream)
4949
!isopen(s) && nb_available(s.buffer)<=0
5050
end
5151

52+
const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB
53+
5254
const StatusUninit = 0 # handle is allocated, but not initialized
5355
const StatusInit = 1 # handle is valid, but not connected/active
5456
const StatusConnecting = 2 # handle is in process of connecting
@@ -104,6 +106,7 @@ type Pipe <: AsyncStream
104106
closenotify::Condition
105107
sendbuf::Nullable{IOBuffer}
106108
lock::ReentrantLock
109+
throttle::Int
107110

108111
Pipe(handle) = new(
109112
handle,
@@ -113,7 +116,8 @@ type Pipe <: AsyncStream
113116
false,Condition(),
114117
false,Condition(),
115118
false,Condition(),
116-
nothing, ReentrantLock())
119+
nothing, ReentrantLock(),
120+
DEFAULT_READ_BUFFER_SZ)
117121
end
118122
function Pipe()
119123
handle = Libc.malloc(_sizeof_uv_named_pipe)
@@ -182,6 +186,7 @@ type TTY <: AsyncStream
182186
closenotify::Condition
183187
sendbuf::Nullable{IOBuffer}
184188
lock::ReentrantLock
189+
throttle::Int
185190
@windows_only ispty::Bool
186191
function TTY(handle)
187192
tty = new(
@@ -191,7 +196,8 @@ type TTY <: AsyncStream
191196
PipeBuffer(),
192197
false,Condition(),
193198
false,Condition(),
194-
nothing, ReentrantLock())
199+
nothing, ReentrantLock(),
200+
DEFAULT_READ_BUFFER_SZ)
195201
@windows_only tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Void},), handle)!=0
196202
tty
197203
end
@@ -269,7 +275,7 @@ function init_stdio(handle)
269275
end
270276
end
271277

272-
function stream_wait(x,c...)
278+
function stream_wait(x, c...) # for x::LibuvObject
273279
preserve_handle(x)
274280
try
275281
return wait(c...)
@@ -307,26 +313,51 @@ end
307313
function wait_connected(x)
308314
check_open(x)
309315
while x.status == StatusConnecting
310-
stream_wait(x,x.connectnotify)
316+
stream_wait(x, x.connectnotify)
311317
check_open(x)
312318
end
313319
end
314320

315321
function wait_readbyte(x::AsyncStream, c::UInt8)
316-
while isopen(x) && search(x.buffer,c) <= 0
317-
start_reading(x)
318-
stream_wait(x,x.readnotify)
322+
preserve_handle(x)
323+
try
324+
while isopen(x) && search(x.buffer,c) <= 0
325+
start_reading(x) # ensure we are reading
326+
wait(x.readnotify)
327+
end
328+
finally
329+
if isempty(x.readnotify.waitq)
330+
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
331+
end
332+
unpreserve_handle(x)
319333
end
320334
end
321335

322336
function wait_readnb(x::AsyncStream, nb::Int)
323-
while isopen(x) && nb_available(x.buffer) < nb
324-
start_reading(x)
325-
stream_wait(x,x.readnotify)
337+
oldthrottle = x.throttle
338+
preserve_handle(x)
339+
try
340+
while isopen(x) && nb_available(x.buffer) < nb
341+
x.throttle = max(nb, x.throttle)
342+
start_reading(x) # ensure we are reading
343+
wait(x.readnotify)
344+
end
345+
finally
346+
if oldthrottle <= x.throttle <= nb
347+
x.throttle = oldthrottle
348+
end
349+
if isempty(x.readnotify.waitq)
350+
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
351+
end
352+
unpreserve_handle(x)
326353
end
327354
end
328355

329-
wait_close(x) = if isopen(x) stream_wait(x,x.closenotify); end
356+
function wait_close(x::AsyncStream)
357+
if isopen(x)
358+
stream_wait(x, x.closenotify)
359+
end
360+
end
330361

331362
#from `connect`
332363
function _uv_hook_connectcb(sock::AsyncStream, status::Int32)
@@ -367,8 +398,7 @@ function alloc_request(buffer::IOBuffer, recommended_size::UInt)
367398
end
368399
function _uv_hook_alloc_buf(stream::AsyncStream, recommended_size::UInt)
369400
(buf,size) = alloc_request(stream.buffer, recommended_size)
370-
@assert size>0 # because libuv requires this (TODO: possibly stop reading too if it fails)
371-
(buf,UInt(size))
401+
return (buf,UInt(size))
372402
end
373403

374404
function notify_filled(buffer::IOBuffer, nread::Int, base::Ptr{Void}, len::UInt)
@@ -394,36 +424,39 @@ function notify_filled(stream::AsyncStream, nread::Int)
394424
end
395425
end
396426

397-
const READ_BUFFER_SZ=10485760 # 10 MB
398427
function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr{Void}, len::UInt)
399428
if nread < 0
400-
if nread != UV_EOF
401-
# This is a fatal connectin error. Shutdown requests as per the usual
402-
# close function won't work and libuv will fail with an assertion failure
403-
ccall(:jl_forceclose_uv,Void,(Ptr{Void},),stream.handle)
404-
notify_error(stream.readnotify, UVError("readcb",nread))
405-
else
429+
if nread == UV_ENOBUFS && len == 0
430+
# remind the client that stream.buffer is full
431+
notify(stream.readnotify)
432+
elseif nread == UV_EOF
406433
if isa(stream,TTY)
407-
stream.status = StatusEOF
434+
stream.status = StatusEOF # libuv called stop_reading already
408435
notify(stream.readnotify)
409436
notify(stream.closenotify)
410437
else
411438
close(stream)
412439
end
440+
else
441+
# This is a fatal connection error. Shutdown requests as per the usual
442+
# close function won't work and libuv will fail with an assertion failure
443+
ccall(:jl_forceclose_uv,Void,(Ptr{Void},),stream.handle)
444+
notify_error(stream.readnotify, UVError("readcb",nread))
413445
end
414446
else
415447
notify_filled(stream.buffer, nread, base, len)
416448
notify_filled(stream, nread)
417449
notify(stream.readnotify)
418450
end
419451

420-
# Stop reading when
421-
# 1) when we have an infinite buffer, and we have accumulated a lot of unread data OR
452+
# Stop background reading when
453+
# 1) we have accumulated a lot of unread data OR
422454
# 2) we have an alternate buffer that has reached its limit.
423-
if (is_maxsize_unlimited(stream.buffer) && (nb_available(stream.buffer) > READ_BUFFER_SZ )) ||
424-
(nb_available(stream.buffer) == stream.buffer.maxsize)
455+
if (nb_available(stream.buffer) >= stream.throttle) ||
456+
(nb_available(stream.buffer) >= stream.buffer.maxsize)
425457
stop_reading(stream)
426458
end
459+
nothing
427460
end
428461

429462
reseteof(x::IO) = nothing
@@ -532,7 +565,7 @@ function sleep(sec::Real)
532565
end)
533566
start_timer(timer, float(sec), 0)
534567
try
535-
stream_wait(timer,w)
568+
stream_wait(timer, w)
536569
finally
537570
stop_timer(timer)
538571
end
@@ -640,15 +673,15 @@ function start_reading(stream::AsyncStream)
640673
end
641674
end
642675
function start_reading(stream::AsyncStream, cb::Function)
643-
start_reading(stream)
676+
failure = start_reading(stream)
644677
stream.readcb = cb
645678
nread = nb_available(stream.buffer)
646679
if nread > 0
647680
notify_filled(stream, nread)
648681
end
649-
nothing
682+
return failure_code
650683
end
651-
start_reading(stream::AsyncStream, cb::Bool) = (start_reading(stream); stream.readcb = cb; nothing)
684+
start_reading(stream::AsyncStream, cb::Bool) = (failure_code = start_reading(stream); stream.readcb = cb; return failure_code)
652685

653686
function stop_reading(stream::AsyncStream)
654687
if stream.status == StatusActive
@@ -662,10 +695,9 @@ function stop_reading(stream::AsyncStream)
662695
end
663696
end
664697

665-
function readall(stream::AsyncStream)
666-
start_reading(stream)
667-
wait_close(stream)
668-
return takebuf_string(stream.buffer)
698+
function readbytes(stream::AsyncStream)
699+
wait_readnb(stream, typemax(Int))
700+
return takebuf_array(stream.buffer)
669701
end
670702

671703
function read!{T}(s::AsyncStream, a::Array{T})
@@ -687,16 +719,22 @@ function read!(s::AsyncStream, a::Vector{UInt8})
687719
end
688720

689721
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
690-
wait_readnb(s,nb)
722+
wait_readnb(s, nb)
691723
read!(sbuf, a)
692724
else
693-
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
694-
newbuf = PipeBuffer(a, nb)
695-
newbuf.size = 0
696-
s.buffer = newbuf
697-
write(newbuf, sbuf)
698-
wait_readnb(s,nb)
699-
s.buffer = sbuf
725+
try
726+
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
727+
newbuf = PipeBuffer(a, #=maxsize=# nb)
728+
newbuf.size = 0 # reset the write pointer to the beginning
729+
s.buffer = newbuf
730+
write(newbuf, sbuf)
731+
wait_readnb(s, nb)
732+
finally
733+
s.buffer = sbuf
734+
if !isempty(s.readnotify.waitq)
735+
start_reading(x) # resume reading iff there are currently other read clients of the stream
736+
end
737+
end
700738
end
701739
return a
702740
end
@@ -711,8 +749,8 @@ end
711749
function read(this::AsyncStream,::Type{UInt8})
712750
buf = this.buffer
713751
@assert buf.seekable == false
714-
wait_readnb(this,1)
715-
read(buf,UInt8)
752+
wait_readnb(this, 1)
753+
read(buf, UInt8)
716754
end
717755

718756
readline(this::AsyncStream) = readuntil(this, '\n')
@@ -776,11 +814,11 @@ function buffer_or_write(s::AsyncStream, p::Ptr, n::Integer)
776814
end
777815

778816
totb = nb_available(buf) + n
779-
if totb < maxsize(buf)
817+
if totb < buf.maxsize
780818
nb = write(buf, p, n)
781819
else
782820
flush(s)
783-
if n > maxsize(buf)
821+
if n > buf.maxsize
784822
nb = uv_write(s, p, n)
785823
else
786824
nb = write(buf, p, n)
@@ -988,19 +1026,10 @@ function wait_readnb(s::BufferStream, nb::Int)
9881026
while isopen(s) && nb_available(s.buffer) < nb
9891027
wait(s.r_c)
9901028
end
991-
992-
(nb_available(s.buffer) < nb) && error("closed BufferStream")
993-
end
994-
995-
function eof(s::BufferStream)
996-
wait_readnb(s,1)
997-
!isopen(s) && nb_available(s.buffer)<=0
9981029
end
9991030

10001031
show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open)
10011032

1002-
nb_available(s::BufferStream) = nb_available(s.buffer)
1003-
10041033
function wait_readbyte(s::BufferStream, c::UInt8)
10051034
while isopen(s) && search(s.buffer,c) <= 0
10061035
wait(s.r_c)

‎test/repl.jl

+12-10
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ function fake_repl()
1616
Base.link_pipe(stderr_read,true,stderr_write,true)
1717

1818
repl = Base.REPL.LineEditREPL(TestHelpers.FakeTerminal(stdin_read, stdout_write, stderr_write))
19-
stdin_write, stdout_read, stdout_read, repl
19+
stdin_write, stdout_read, stderr_read, repl
2020
end
2121

2222
# Writing ^C to the repl will cause sigint, so let's not die on that
@@ -28,7 +28,7 @@ ccall(:jl_exit_on_sigint, Void, (Cint,), 0)
2828
# this should make sure nothing crashes without depending on how exactly the control
2929
# characters are being used.
3030
if @unix? true : (Base.windows_version() >= Base.WINDOWS_VISTA_VER)
31-
stdin_write, stdout_read, stdout_read, repl = fake_repl()
31+
stdin_write, stdout_read, stderr_read, repl = fake_repl()
3232

3333
repl.specialdisplay = Base.REPL.REPLDisplay(repl)
3434
repl.history_file = false
@@ -261,17 +261,19 @@ master = Base.TTY(RawFD(fdm); readable = true)
261261
nENV = copy(ENV)
262262
nENV["TERM"] = "dumb"
263263
p = spawn(setenv(`$exename --startup-file=no --quiet`,nENV),slave,slave,slave)
264-
Base.start_reading(master)
265-
Base.wait_readnb(master,1)
264+
output = readuntil(master,"julia> ")
265+
if ccall(:jl_running_on_valgrind,Cint,()) == 0
266+
# If --trace-children=yes is passed to valgrind, we will get a
267+
# valgrind banner here, not just the prompt.
268+
@test output == "julia> "
269+
end
266270
write(master,"1\nquit()\n")
267271

268272
wait(p)
269-
270-
ccall(:close,Cint,(Cint,),fds)
271-
output = readall(master)
272-
if ccall(:jl_running_on_valgrind,Cint,()) == 0
273-
@test output == "julia> 1\r\nquit()\r\n1\r\n\r\njulia> "
274-
end
273+
output = readuntil(master,' ')
274+
@test output == "1\r\nquit()\r\n1\r\n\r\njulia> "
275+
@test nb_available(master) == 0
276+
ccall(:close,Cint,(Cint,),fds) # XXX: this causes the kernel to throw away all unread data on the pty
275277
close(master)
276278

277279
end

0 commit comments

Comments
 (0)
Please sign in to comment.