Skip to content

Commit 1c1edab

Browse files
committed
Ensure that stream_fetch_values!() yields in its loop
Otherwise it may spin (see comments for details). Also refactored it into a while-loop instead of using a @goto.
1 parent 79f2066 commit 1c1edab

File tree

1 file changed

+26
-20
lines changed

1 file changed

+26
-20
lines changed

Diff for: src/stream-fetchers.jl

+26-20
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,32 @@ struct RemoteFetcher end
22
function stream_fetch_values!(::Type{RemoteFetcher}, T, store_ref::Chunk{Store_remote}, buffer::Blocal, id::UInt) where {Store_remote, Blocal}
33
thunk_id = STREAM_THUNK_ID[]
44
@dagdebug thunk_id :stream "fetching values"
5-
@label fetch_values
6-
# FIXME: Pass buffer free space
7-
# TODO: It would be ideal if we could wait on store.lock, but get unlocked during migration
8-
values = MemPool.access_ref(store_ref.handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
9-
if !isopen(store)
10-
throw(InvalidStateException("Stream is closed", :closed))
11-
end
12-
@dagdebug thunk_id :stream "trying to fetch values at $(myid())"
13-
store::Store_remote
14-
in_store = store
15-
STREAM_THUNK_ID[] = thunk_id
16-
values = T[]
17-
while !isempty(store, id)
18-
value = take!(store, id)::T
19-
push!(values, value)
20-
end
21-
return values
22-
end::Vector{T}
23-
if isempty(values)
24-
@goto fetch_values
5+
6+
values = T[]
7+
while isempty(values)
8+
# FIXME: Pass buffer free space
9+
# TODO: It would be ideal if we could wait on store.lock, but get unlocked during migration
10+
values = MemPool.access_ref(store_ref.handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
11+
if !isopen(store)
12+
throw(InvalidStateException("Stream is closed", :closed))
13+
end
14+
@dagdebug thunk_id :stream "trying to fetch values at $(myid())"
15+
store::Store_remote
16+
in_store = store
17+
STREAM_THUNK_ID[] = thunk_id
18+
values = T[]
19+
while !isempty(store, id)
20+
value = take!(store, id)::T
21+
push!(values, value)
22+
end
23+
return values
24+
end::Vector{T}
25+
26+
# We explicitly yield in the loop to allow other tasks to run. This
27+
# matters on single-threaded instances because MemPool.access_ref()
28+
# might not yield when accessing data locally, which can cause this loop
29+
# to spin forever.
30+
yield()
2531
end
2632

2733
@dagdebug thunk_id :stream "fetched $(length(values)) values"

0 commit comments

Comments
 (0)