Skip to content

Commit 8423fb8

Browse files
authored
Merge pull request #496 from JamesWrigley/streaming-fixes
Streaming branch fixes
2 parents 5381435 + 3b0a355 commit 8423fb8

File tree

3 files changed

+21
-4
lines changed

3 files changed

+21
-4
lines changed

Diff for: src/sch/Sch.jl

+9-1
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,15 @@ function cleanup_proc(state, p, log_sink)
409409

410410
# If the worker process is still alive, clean it up
411411
if wid in workers()
412-
remotecall_wait(_cleanup_proc, wid, state.uid, log_sink)
412+
try
413+
remotecall_wait(_cleanup_proc, wid, state.uid, log_sink)
414+
catch ex
415+
# We allow ProcessExitedException's, which means that the worker
416+
# shutdown halfway through cleanup.
417+
if !(ex isa ProcessExitedException)
418+
rethrow()
419+
end
420+
end
413421
end
414422

415423
timespan_finish(ctx, :cleanup_proc, (;worker=wid), nothing)

Diff for: src/sch/dynamic.jl

+11-2
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,18 @@ function safepoint(state)
3232
if state.halt.set
3333
# Force dynamic thunks and listeners to terminate
3434
for (inp_chan,out_chan) in values(state.worker_chans)
35-
close(inp_chan)
36-
close(out_chan)
35+
# Closing these channels will fail if the worker died, which we
36+
# allow.
37+
try
38+
close(inp_chan)
39+
close(out_chan)
40+
catch ex
41+
if !(ex isa ProcessExitedException)
42+
rethrow()
43+
end
44+
end
3745
end
46+
3847
# Throw out of scheduler
3948
throw(SchedulerHaltedException())
4049
end

Diff for: src/sch/eager.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const EAGER_STATE = Ref{Union{ComputeState,Nothing}}(nothing)
66

77
function eager_context()
88
if EAGER_CONTEXT[] === nothing
9-
EAGER_CONTEXT[] = Context([myid(),workers()...])
9+
EAGER_CONTEXT[] = Context(procs())
1010
end
1111
return EAGER_CONTEXT[]
1212
end

0 commit comments

Comments
 (0)