Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 9e19627

Browse files
committedJan 11, 2016
Revert "make start_reading/stop_reading automatic exactly when needed. fix #1925, fix #10655 (closes #11530)"
This reverts commit 5863b48.
1 parent b9dc4c9 commit 9e19627

File tree

8 files changed

+69
-78
lines changed

8 files changed

+69
-78
lines changed
 

‎base/LineEdit.jl

+4
Original file line numberDiff line numberDiff line change
@@ -1625,6 +1625,7 @@ function prompt!(term, prompt, s = init_state(term, prompt))
16251625
raw!(term, true)
16261626
enable_bracketed_paste(term)
16271627
try
1628+
Base.start_reading(term)
16281629
activate(prompt, s, term, term)
16291630
while true
16301631
map = keymap(s, prompt)
@@ -1640,11 +1641,14 @@ function prompt!(term, prompt, s = init_state(term, prompt))
16401641
state = :done
16411642
end
16421643
if state == :abort
1644+
Base.stop_reading(term)
16431645
return buffer(s), false, false
16441646
elseif state == :done
1647+
Base.stop_reading(term)
16451648
return buffer(s), true, false
16461649
elseif state == :suspend
16471650
@unix_only begin
1651+
Base.stop_reading(term)
16481652
return buffer(s), true, true
16491653
end
16501654
else

‎base/REPL.jl

+2
Original file line numberDiff line numberDiff line change
@@ -824,9 +824,11 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep
824824
LineEdit.commit_line(s)
825825
# This is slightly ugly but ok for now
826826
terminal = LineEdit.terminal(s)
827+
Base.stop_reading(terminal)
827828
raw!(terminal, false) && disable_bracketed_paste(terminal)
828829
LineEdit.mode(s).on_done(s, LineEdit.buffer(s), true)
829830
raw!(terminal, true) && enable_bracketed_paste(terminal)
831+
Base.start_reading(terminal)
830832
else
831833
break
832834
end

‎base/iobuffer.jl

+3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ show(io::IO, b::AbstractIOBuffer) = print(io, "IOBuffer(data=UInt8[...], ",
5353
"mark=", b.mark, ")")
5454

5555
read!(from::AbstractIOBuffer, a::Vector{UInt8}) = read_sub(from, a, 1, length(a))
56+
is_maxsize_unlimited(io::AbstractIOBuffer) = (io.maxsize == typemax(Int))
57+
maxsize(io::AbstractIOBuffer) = io.maxsize
58+
5659
read!(from::AbstractIOBuffer, a::Array) = read_sub(from, a, 1, length(a))
5760

5861
function read_sub{T}(from::AbstractIOBuffer, a::AbstractArray{T}, offs, nel)

‎base/multi.jl

+1
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,7 @@ process_messages(r_stream::TCPSocket, w_stream::TCPSocket) = @schedule process_t
946946

947947
function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket)
948948
disable_nagle(r_stream)
949+
Base.start_reading(r_stream)
949950
wait_connected(r_stream)
950951
if r_stream != w_stream
951952
disable_nagle(w_stream)

‎base/process.jl

+1
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ function open(cmds::AbstractCmd, mode::AbstractString="r", other::Redirectable=D
546546
in = other
547547
out = io = Pipe()
548548
processes = spawn(cmds, (in,out,STDERR))
549+
start_reading(io)
549550
close(out.in)
550551
elseif mode == "w"
551552
in = io = Pipe()

‎base/socket.jl

+1-3
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,6 @@ type TCPSocket <: LibuvStream
255255
closenotify::Condition
256256
sendbuf::Nullable{IOBuffer}
257257
lock::ReentrantLock
258-
throttle::Int
259258

260259
TCPSocket(handle) = new(
261260
handle,
@@ -266,8 +265,7 @@ type TCPSocket <: LibuvStream
266265
false, Condition(),
267266
false, Condition(),
268267
nothing,
269-
ReentrantLock(),
270-
DEFAULT_READ_BUFFER_SZ
268+
ReentrantLock()
271269
)
272270
end
273271
function TCPSocket()

‎base/stream.jl

+47-63
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ function eof(s::LibuvStream)
9191
!isopen(s) && nb_available(s)<=0
9292
end
9393

94-
const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB
95-
9694
const StatusUninit = 0 # handle is allocated, but not initialized
9795
const StatusInit = 1 # handle is valid, but not connected/active
9896
const StatusConnecting = 2 # handle is in process of connecting
@@ -148,7 +146,6 @@ type PipeEndpoint <: LibuvStream
148146
closenotify::Condition
149147
sendbuf::Nullable{IOBuffer}
150148
lock::ReentrantLock
151-
throttle::Int
152149

153150
PipeEndpoint(handle::Ptr{Void} = C_NULL) = new(
154151
handle,
@@ -158,8 +155,7 @@ type PipeEndpoint <: LibuvStream
158155
false,Condition(),
159156
false,Condition(),
160157
false,Condition(),
161-
nothing, ReentrantLock(),
162-
DEFAULT_READ_BUFFER_SZ)
158+
nothing, ReentrantLock())
163159
end
164160

165161
type PipeServer <: LibuvServer
@@ -202,7 +198,6 @@ type TTY <: LibuvStream
202198
closenotify::Condition
203199
sendbuf::Nullable{IOBuffer}
204200
lock::ReentrantLock
205-
throttle::Int
206201
@windows_only ispty::Bool
207202
function TTY(handle)
208203
tty = new(
@@ -212,8 +207,7 @@ type TTY <: LibuvStream
212207
PipeBuffer(),
213208
false,Condition(),
214209
false,Condition(),
215-
nothing, ReentrantLock(),
216-
DEFAULT_READ_BUFFER_SZ)
210+
nothing, ReentrantLock())
217211
@windows_only tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Void},), handle)!=0
218212
tty
219213
end
@@ -289,6 +283,15 @@ function init_stdio(handle::Ptr{Void})
289283
end
290284
end
291285

286+
function stream_wait(x,c...)
287+
preserve_handle(x)
288+
try
289+
return wait(c...)
290+
finally
291+
unpreserve_handle(x)
292+
end
293+
end
294+
292295
function reinit_stdio()
293296
global uv_jl_asynccb = cfunction(uv_asynccb, Void, (Ptr{Void},))
294297
global uv_jl_timercb = cfunction(uv_timercb, Void, (Ptr{Void},))
@@ -324,51 +327,27 @@ end
324327
function wait_connected(x::Union{LibuvStream, LibuvServer})
325328
check_open(x)
326329
while x.status == StatusConnecting
327-
stream_wait(x, x.connectnotify)
330+
stream_wait(x,x.connectnotify)
328331
check_open(x)
329332
end
330333
end
331334

335+
332336
function wait_readbyte(x::LibuvStream, c::UInt8)
333-
preserve_handle(x)
334-
try
335-
while isopen(x) && search(x.buffer, c) <= 0
336-
start_reading(x) # ensure we are reading
337-
wait(x.readnotify)
338-
end
339-
finally
340-
if isempty(x.readnotify.waitq)
341-
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
342-
end
343-
unpreserve_handle(x)
337+
while isopen(x) && search(x.buffer,c) <= 0
338+
start_reading(x)
339+
stream_wait(x,x.readnotify)
344340
end
345341
end
346342

347343
function wait_readnb(x::LibuvStream, nb::Int)
348-
oldthrottle = x.throttle
349-
preserve_handle(x)
350-
try
351-
while isopen(x) && nb_available(x.buffer) < nb
352-
x.throttle = max(nb, x.throttle)
353-
start_reading(x) # ensure we are reading
354-
wait(x.readnotify)
355-
end
356-
finally
357-
if oldthrottle <= x.throttle <= nb
358-
x.throttle = oldthrottle
359-
end
360-
if isempty(x.readnotify.waitq)
361-
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
362-
end
363-
unpreserve_handle(x)
344+
while isopen(x) && nb_available(x.buffer) < nb
345+
start_reading(x)
346+
stream_wait(x,x.readnotify)
364347
end
365348
end
366349

367-
function wait_close(x::Union{LibuvStream, LibuvServer})
368-
if isopen(x)
369-
stream_wait(x, x.closenotify)
370-
end
371-
end
350+
wait_close(x) = if isopen(x) stream_wait(x,x.closenotify); end
372351

373352
function close(stream::Union{LibuvStream, LibuvServer})
374353
if isopen(stream) && stream.status != StatusClosing
@@ -503,6 +482,7 @@ function notify_filled(stream::LibuvStream, nread::Int)
503482
end
504483
end
505484

485+
const READ_BUFFER_SZ=10485760 # 10 MB
506486
function uv_readcb(handle::Ptr{Void}, nread::Cssize_t, buf::Ptr{Void})
507487
stream = @handle_as handle LibuvStream
508488
nread = Int(nread)
@@ -533,11 +513,11 @@ function uv_readcb(handle::Ptr{Void}, nread::Cssize_t, buf::Ptr{Void})
533513
notify(stream.readnotify)
534514
end
535515

536-
# Stop background reading when
537-
# 1) we have accumulated a lot of unread data OR
516+
# Stop reading when
517+
# 1) when we have an infinite buffer, and we have accumulated a lot of unread data OR
538518
# 2) we have an alternate buffer that has reached its limit.
539-
if (nb_available(stream.buffer) >= stream.throttle) ||
540-
(nb_available(stream.buffer) >= stream.buffer.maxsize)
519+
if (is_maxsize_unlimited(stream.buffer) && (nb_available(stream.buffer) > READ_BUFFER_SZ )) ||
520+
(nb_available(stream.buffer) == stream.buffer.maxsize)
541521
stop_reading(stream)
542522
end
543523
nothing
@@ -855,7 +835,7 @@ function start_reading(stream::LibuvStream, cb::Function)
855835
if nread > 0
856836
notify_filled(stream, nread)
857837
end
858-
return failure_code
838+
nothing
859839
end
860840

861841
function start_reading(stream::LibuvStream, cb::Bool)
@@ -892,22 +872,16 @@ function read!(s::LibuvStream, a::Array{UInt8, 1})
892872
end
893873

894874
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
895-
wait_readnb(s, nb)
875+
wait_readnb(s,nb)
896876
read!(sbuf, a)
897877
else
898-
try
899-
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
900-
newbuf = PipeBuffer(a, #=maxsize=# nb)
901-
newbuf.size = 0 # reset the write pointer to the beginning
902-
s.buffer = newbuf
903-
write(newbuf, sbuf)
904-
wait_readnb(s, nb)
905-
finally
906-
s.buffer = sbuf
907-
if !isempty(s.readnotify.waitq)
908-
start_reading(x) # resume reading iff there are currently other read clients of the stream
909-
end
910-
end
878+
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
879+
newbuf = PipeBuffer(a, nb)
880+
newbuf.size = 0
881+
s.buffer = newbuf
882+
write(newbuf, sbuf)
883+
wait_readnb(s,nb)
884+
s.buffer = sbuf
911885
end
912886
return a
913887
end
@@ -916,7 +890,8 @@ function read(this::LibuvStream, ::Type{UInt8})
916890
wait_readnb(this, 1)
917891
buf = this.buffer
918892
@assert buf.seekable == false
919-
read(buf, UInt8)
893+
wait_readnb(this,1)
894+
read(buf,UInt8)
920895
end
921896

922897
function readavailable(this::LibuvStream)
@@ -964,11 +939,11 @@ function buffer_or_write(s::LibuvStream, p::Ptr, n::Integer)
964939

965940
buf = get(s.sendbuf)
966941
totb = nb_available(buf) + n
967-
if totb < buf.maxsize
942+
if totb < maxsize(buf)
968943
nb = write(buf, p, n)
969944
else
970945
flush(s)
971-
if n > buf.maxsize
946+
if n > maxsize(buf)
972947
nb = uv_write(s, p, n)
973948
else
974949
nb = write(buf, p, n)
@@ -1187,10 +1162,19 @@ function wait_readnb(s::BufferStream, nb::Int)
11871162
while isopen(s) && nb_available(s.buffer) < nb
11881163
wait(s.r_c)
11891164
end
1165+
1166+
(nb_available(s.buffer) < nb) && error("closed BufferStream")
1167+
end
1168+
1169+
function eof(s::BufferStream)
1170+
wait_readnb(s,1)
1171+
!isopen(s) && nb_available(s.buffer)<=0
11901172
end
11911173

11921174
show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open)
11931175

1176+
nb_available(s::BufferStream) = nb_available(s.buffer)
1177+
11941178
function wait_readbyte(s::BufferStream, c::UInt8)
11951179
while isopen(s) && search(s.buffer,c) <= 0
11961180
wait(s.r_c)

‎test/repl.jl

+10-12
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ function fake_repl()
1717
Base.link_pipe(stderr_read,true,stderr_write,true)
1818

1919
repl = Base.REPL.LineEditREPL(TestHelpers.FakeTerminal(stdin_read, stdout_write, stderr_write))
20-
stdin_write, stdout_read, stderr_read, repl
20+
stdin_write, stdout_read, stdout_read, repl
2121
end
2222

2323
# Writing ^C to the repl will cause sigint, so let's not die on that
@@ -29,7 +29,7 @@ ccall(:jl_exit_on_sigint, Void, (Cint,), 0)
2929
# this should make sure nothing crashes without depending on how exactly the control
3030
# characters are being used.
3131
if @unix? true : (Base.windows_version() >= Base.WINDOWS_VISTA_VER)
32-
stdin_write, stdout_read, stderr_read, repl = fake_repl()
32+
stdin_write, stdout_read, stdout_read, repl = fake_repl()
3333

3434
repl.specialdisplay = Base.REPL.REPLDisplay(repl)
3535
repl.history_file = false
@@ -367,19 +367,17 @@ master = Base.TTY(RawFD(fdm); readable = true)
367367
nENV = copy(ENV)
368368
nENV["TERM"] = "dumb"
369369
p = spawn(setenv(`$exename --startup-file=no --quiet`,nENV),slave,slave,slave)
370-
output = readuntil(master,"julia> ")
371-
if ccall(:jl_running_on_valgrind,Cint,()) == 0
372-
# If --trace-children=yes is passed to valgrind, we will get a
373-
# valgrind banner here, not just the prompt.
374-
@test output == "julia> "
375-
end
370+
Base.start_reading(master)
371+
Base.wait_readnb(master,1)
376372
write(master,"1\nquit()\n")
377373

378374
wait(p)
379-
output = readuntil(master,' ')
380-
@test output == "1\r\nquit()\r\n1\r\n\r\njulia> "
381-
@test nb_available(master) == 0
382-
ccall(:close,Cint,(Cint,),fds) # XXX: this causes the kernel to throw away all unread data on the pty
375+
376+
ccall(:close,Cint,(Cint,),fds)
377+
output = readall(master)
378+
if ccall(:jl_running_on_valgrind,Cint,()) == 0
379+
@test output == "julia> 1\r\nquit()\r\n1\r\n\r\njulia> "
380+
end
383381
close(master)
384382

385383
end

0 commit comments

Comments
 (0)
Please sign in to comment.