Skip to content

Commit bcc744b

Browse files
authored
Merge pull request #516 from JuliaParallel/test-fixes
Test fixes
2 parents d53b5d4 + 1ad248e commit bcc744b

File tree

2 files changed

+37
-21
lines changed

2 files changed

+37
-21
lines changed

Diff for: src/cancellation.jl

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
1+
"""
2+
cancel!(tid=nothing; sch_uid=nothing, force=false, halt_sch=false)
3+
4+
Cancel a running thunk. Note that if the scheduler has already shut down this
5+
will not do anything.
6+
"""
17
function cancel!(tid::Union{Int,Nothing}=nothing;
28
sch_uid::Union{UInt64,Nothing}=nothing,
39
force::Bool=false, halt_sch::Bool=false)
410
remotecall_fetch(1, tid, sch_uid, force, halt_sch) do tid, sch_uid, force, halt_sch
511
state = Sch.EAGER_STATE[]
6-
@lock state.lock _cancel!(state, tid, sch_uid, force, halt_sch)
12+
13+
# Check that the scheduler isn't stopping or has already stopped
14+
if !isnothing(state) && !state.halt.set
15+
@lock state.lock _cancel!(state, tid, sch_uid, force, halt_sch)
16+
end
717
end
818
end
919
function _cancel!(state, tid, sch_uid, force, halt_sch)

Diff for: src/stream-fetchers.jl

+26-20
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,32 @@ struct RemoteFetcher end
22
function stream_fetch_values!(::Type{RemoteFetcher}, T, store_ref::Chunk{Store_remote}, buffer::Blocal, id::UInt) where {Store_remote, Blocal}
33
thunk_id = STREAM_THUNK_ID[]
44
@dagdebug thunk_id :stream "fetching values"
5-
@label fetch_values
6-
# FIXME: Pass buffer free space
7-
# TODO: It would be ideal if we could wait on store.lock, but get unlocked during migration
8-
values = MemPool.access_ref(store_ref.handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
9-
if !isopen(store)
10-
throw(InvalidStateException("Stream is closed", :closed))
11-
end
12-
@dagdebug thunk_id :stream "trying to fetch values at $(myid())"
13-
store::Store_remote
14-
in_store = store
15-
STREAM_THUNK_ID[] = thunk_id
16-
values = T[]
17-
while !isempty(store, id)
18-
value = take!(store, id)::T
19-
push!(values, value)
20-
end
21-
return values
22-
end::Vector{T}
23-
if isempty(values)
24-
@goto fetch_values
5+
6+
values = T[]
7+
while isempty(values)
8+
# FIXME: Pass buffer free space
9+
# TODO: It would be ideal if we could wait on store.lock, but get unlocked during migration
10+
values = MemPool.access_ref(store_ref.handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
11+
if !isopen(store)
12+
throw(InvalidStateException("Stream is closed", :closed))
13+
end
14+
@dagdebug thunk_id :stream "trying to fetch values at $(myid())"
15+
store::Store_remote
16+
in_store = store
17+
STREAM_THUNK_ID[] = thunk_id
18+
values = T[]
19+
while !isempty(store, id)
20+
value = take!(store, id)::T
21+
push!(values, value)
22+
end
23+
return values
24+
end::Vector{T}
25+
26+
# We explicitly yield in the loop to allow other tasks to run. This
27+
# matters on single-threaded instances because MemPool.access_ref()
28+
# might not yield when accessing data locally, which can cause this loop
29+
# to spin forever.
30+
yield()
2531
end
2632

2733
@dagdebug thunk_id :stream "fetched $(length(values)) values"

0 commit comments

Comments
 (0)