@@ -2,26 +2,34 @@ struct RemoteFetcher end
2
2
function stream_fetch_values! (:: Type{RemoteFetcher} , T, store_ref:: Chunk{Store_remote} , buffer:: Blocal , id:: UInt ) where {Store_remote, Blocal}
3
3
thunk_id = STREAM_THUNK_ID[]
4
4
@dagdebug thunk_id :stream " fetching values"
5
- @label fetch_values
6
- # FIXME : Pass buffer free space
7
- # TODO : It would be ideal if we could wait on store.lock, but get unlocked during migration
8
- values = MemPool. access_ref (store_ref. handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
9
- if ! isopen (store)
10
- throw (InvalidStateException (" Stream is closed" , :closed ))
11
- end
12
- @dagdebug thunk_id :stream " trying to fetch values at $(myid ()) "
13
- store:: Store_remote
14
- in_store = store
15
- STREAM_THUNK_ID[] = thunk_id
16
- values = T[]
17
- while ! isempty (store, id)
18
- value = take! (store, id):: T
19
- push! (values, value)
20
- end
21
- return values
22
- end :: Vector{T}
23
- if isempty (values)
24
- @goto fetch_values
5
+
6
+ values = T[]
7
+ while isempty (values)
8
+ # FIXME : Pass buffer free space
9
+ # TODO : It would be ideal if we could wait on store.lock, but get unlocked during migration
10
+ values = MemPool. access_ref (store_ref. handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
11
+ # @info "in function"
12
+
13
+ if ! isopen (store)
14
+ throw (InvalidStateException (" Stream is closed" , :closed ))
15
+ end
16
+ @dagdebug thunk_id :stream " trying to fetch values at $(myid ()) "
17
+ store:: Store_remote
18
+ in_store = store
19
+ STREAM_THUNK_ID[] = thunk_id
20
+ values = T[]
21
+ while ! isempty (store, id)
22
+ value = take! (store, id):: T
23
+ push! (values, value)
24
+ end
25
+ return values
26
+ end :: Vector{T}
27
+
28
+ # We explicitly yield in the loop to allow other tasks to run. This
29
+ # matters on single-threaded instances because MemPool.access_ref()
30
+ # might not yield when accessing data locally, which can cause this loop
31
+ # to spin forever.
32
+ yield ()
25
33
end
26
34
27
35
@dagdebug thunk_id :stream " fetched $(length (values)) values"
0 commit comments