Skip to content

Commit 6e85cf1

Browse files
authored
Merge pull request #329 from JuliaParallel/jps/init-dynamic
Sch: Start dynamic listener in init_proc
2 parents 2abbf13 + e77e8d3 commit 6e85cf1

File tree

2 files changed

+47
-51
lines changed

2 files changed

+47
-51
lines changed

Diff for: src/sch/Sch.jl

+4-3
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,10 @@ function init_proc(state, p, log_sink)
286286
lock(state.lock) do
287287
state.worker_chans[p.pid] = (inp_chan, out_chan)
288288
end
289+
290+
# Setup dynamic listener
291+
dynamic_listener!(ctx, state, p.pid)
292+
289293
timespan_finish(ctx, :init_proc, p.pid, 0)
290294
end
291295
function _cleanup_proc(uid, log_sink)
@@ -405,9 +409,6 @@ function scheduler_init(ctx, state::ComputeState, d::Thunk, options, deps)
405409
@error "Error assigning workers" exception=(err,catch_backtrace())
406410
end
407411
end
408-
409-
# setup dynamic listeners
410-
dynamic_listener!(ctx, state)
411412
end
412413

413414
function scheduler_run(ctx, state::ComputeState, d::Thunk, options)

Diff for: src/sch/dynamic.jl

+43-48
Original file line numberDiff line numberDiff line change
@@ -41,61 +41,56 @@ function safepoint(state)
4141
end
4242

4343
"Processes dynamic messages from worker-executing thunks."
44-
function dynamic_listener!(ctx, state)
44+
function dynamic_listener!(ctx, state, wid)
4545
task = current_task() # The scheduler's main task
46-
listener_tasks = Task[]
47-
for tid in keys(state.worker_chans)
48-
inp_chan, out_chan = state.worker_chans[tid]
49-
push!(listener_tasks, @async begin
50-
while isopen(inp_chan) && !state.halt.set
51-
tid, f, data = try
52-
take!(inp_chan)
53-
catch err
54-
if !(unwrap_nested_exception(err) isa Union{SchedulerHaltedException,
55-
ProcessExitedException,
56-
InvalidStateException})
57-
iob = IOContext(IOBuffer(), :color=>true)
58-
println(iob, "Error in sending dynamic request:")
59-
Base.showerror(iob, err)
60-
Base.show_backtrace(iob, catch_backtrace())
61-
println(iob)
62-
seek(iob.io, 0)
63-
write(stderr, iob)
64-
end
65-
break
46+
inp_chan, out_chan = state.worker_chans[wid]
47+
listener_task = @async begin
48+
while isopen(inp_chan) && !state.halt.set
49+
tid, f, data = try
50+
take!(inp_chan)
51+
catch err
52+
if !(unwrap_nested_exception(err) isa Union{SchedulerHaltedException,
53+
ProcessExitedException,
54+
InvalidStateException})
55+
iob = IOContext(IOBuffer(), :color=>true)
56+
println(iob, "Error in sending dynamic request:")
57+
Base.showerror(iob, err)
58+
Base.show_backtrace(iob, catch_backtrace())
59+
println(iob)
60+
seek(iob.io, 0)
61+
write(stderr, iob)
6662
end
67-
res = try
68-
(false, lock(state.lock) do
69-
Base.invokelatest(f, ctx, state, task, tid, data)
70-
end)
71-
catch err
72-
(true, RemoteException(CapturedException(err,catch_backtrace())))
73-
end
74-
try
75-
put!(out_chan, res)
76-
catch err
77-
if !(unwrap_nested_exception(err) isa Union{SchedulerHaltedException,
78-
ProcessExitedException,
79-
InvalidStateException})
80-
iob = IOContext(IOBuffer(), :color=>true)
81-
println(iob, "Error in sending dynamic result from $f:")
82-
Base.showerror(iob, err)
83-
Base.show_backtrace(iob, catch_backtrace())
84-
println(iob)
85-
seek(iob.io, 0)
86-
write(stderr, iob)
87-
end
63+
break
64+
end
65+
res = try
66+
(false, lock(state.lock) do
67+
Base.invokelatest(f, ctx, state, task, tid, data)
68+
end)
69+
catch err
70+
(true, RemoteException(CapturedException(err,catch_backtrace())))
71+
end
72+
try
73+
put!(out_chan, res)
74+
catch err
75+
if !(unwrap_nested_exception(err) isa Union{SchedulerHaltedException,
76+
ProcessExitedException,
77+
InvalidStateException})
78+
iob = IOContext(IOBuffer(), :color=>true)
79+
println(iob, "Error in sending dynamic result from $f:")
80+
Base.showerror(iob, err)
81+
Base.show_backtrace(iob, catch_backtrace())
82+
println(iob)
83+
seek(iob.io, 0)
84+
write(stderr, iob)
8885
end
8986
end
90-
end)
87+
end
9188
end
9289
@async begin
9390
wait(state.halt)
94-
for ltask in listener_tasks
95-
# TODO: Not sure why we need the @async here, but otherwise we
96-
# don't stop all the listener tasks
97-
@async Base.throwto(ltask, SchedulerHaltedException())
98-
end
91+
# TODO: Not sure why we need the @async here, but otherwise we
92+
# don't stop all the listener tasks
93+
@async Base.throwto(listener_task, SchedulerHaltedException())
9994
end
10095
end
10196

0 commit comments

Comments
 (0)