Skip to content

Commit 9a6da0b

Browse files
committed
Ensure dynamic listeners terminate (fixes memory leak)
1 parent 2ed91b7 commit 9a6da0b

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

src/sch/Sch.jl

+7-5
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ Fields:
6060
- `worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}}` - Communication channels between the scheduler and each worker
6161
- `procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}}` - Cached linked list of processors ready to be used
6262
- `function_cost_cache::Dict{Type{<:Tuple},UInt}` - Cache of estimated CPU time required to compute the given signature
63-
- `halt::Base.RefValue{Bool}` - Flag indicating, when set, that the scheduler should halt immediately
64-
- `lock::ReentrantLock()` - Lock around operations which modify the state
63+
- `halt::Base.Event` - Event indicating that the scheduler is halting
64+
- `lock::ReentrantLock` - Lock around operations which modify the state
6565
- `futures::Dict{Thunk, Vector{ThunkFuture}}` - Futures registered for waiting on the result of a thunk.
6666
- `errored::Set{Thunk}` - Thunks that threw an error
6767
- `chan::RemoteChannel{Channel{Any}}` - Channel for receiving completed thunks
@@ -84,7 +84,7 @@ struct ComputeState
8484
worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}}
8585
procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}}
8686
function_cost_cache::Dict{Type{<:Tuple},UInt}
87-
halt::Base.RefValue{Bool}
87+
halt::Base.Event
8888
lock::ReentrantLock
8989
futures::Dict{Thunk, Vector{ThunkFuture}}
9090
errored::Set{Thunk}
@@ -109,7 +109,7 @@ function start_state(deps::Dict, node_order, chan)
109109
Dict{Int, Tuple{RemoteChannel,RemoteChannel}}(),
110110
Ref{Union{ProcessorCacheEntry,Nothing}}(nothing),
111111
Dict{Type{<:Tuple},UInt}(),
112-
Ref{Bool}(false),
112+
Base.Event(),
113113
ReentrantLock(),
114114
Dict{Thunk, Vector{ThunkFuture}}(),
115115
Set{Thunk}(),
@@ -441,7 +441,9 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
441441

442442
safepoint(state)
443443
end
444-
state.halt[] = true
444+
@assert !isready(state.chan)
445+
close(state.chan)
446+
notify(state.halt)
445447
@sync for p in procs_to_use(ctx)
446448
@async cleanup_proc(state, p)
447449
end

src/sch/dynamic.jl

+13-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ struct DynamicThunkException <: Exception
2525
end
2626

2727
function safepoint(state)
28-
if state.halt[]
28+
if state.halt.set
2929
# Force dynamic thunks and listeners to terminate
3030
for (inp_chan,out_chan) in values(state.worker_chans)
3131
close(inp_chan)
@@ -39,10 +39,11 @@ end
3939
"Processes dynamic messages from worker-executing thunks."
4040
function dynamic_listener!(ctx, state)
4141
task = current_task() # The scheduler's main task
42+
listener_tasks = Task[]
4243
for tid in keys(state.worker_chans)
4344
inp_chan, out_chan = state.worker_chans[tid]
44-
@async begin
45-
while isopen(inp_chan)
45+
push!(listener_tasks, @async begin
46+
while isopen(inp_chan) && !state.halt.set
4647
tid, f, data = try
4748
take!(inp_chan)
4849
catch err
@@ -70,6 +71,14 @@ function dynamic_listener!(ctx, state)
7071
end
7172
end
7273
end
74+
end)
75+
end
76+
@async begin
77+
wait(state.halt)
78+
for ltask in listener_tasks
79+
# TODO: Not sure why we need the @async here, but otherwise we
80+
# don't stop all the listener tasks
81+
@async Base.throwto(ltask, SchedulerHaltedException())
7382
end
7483
end
7584
end
@@ -91,7 +100,7 @@ end
91100
"Commands the scheduler to halt execution immediately."
92101
halt!(h::SchedulerHandle) = exec!(_halt, h, nothing)
93102
function _halt(ctx, state, task, tid, _)
94-
state.halt[] = true
103+
notify(state.halt)
95104
put!(state.chan, (1, nothing, SchedulerHaltedException(), nothing))
96105
Base.throwto(task, SchedulerHaltedException())
97106
end

0 commit comments

Comments
 (0)