Skip to content

Test fixes #516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/cancellation.jl
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
"""
cancel!(tid=nothing; sch_uid=nothing, force=false, halt_sch=false)

Cancel a running thunk. Note that if the scheduler has already shut down this
will not do anything.
"""
function cancel!(tid::Union{Int,Nothing}=nothing;
sch_uid::Union{UInt64,Nothing}=nothing,
force::Bool=false, halt_sch::Bool=false)
remotecall_fetch(1, tid, sch_uid, force, halt_sch) do tid, sch_uid, force, halt_sch
state = Sch.EAGER_STATE[]
@lock state.lock _cancel!(state, tid, sch_uid, force, halt_sch)

# Check that the scheduler isn't stopping or has already stopped
if !isnothing(state) && !state.halt.set
@lock state.lock _cancel!(state, tid, sch_uid, force, halt_sch)
end
end
end
function _cancel!(state, tid, sch_uid, force, halt_sch)
Expand Down
46 changes: 26 additions & 20 deletions src/stream-fetchers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,32 @@ struct RemoteFetcher end
function stream_fetch_values!(::Type{RemoteFetcher}, T, store_ref::Chunk{Store_remote}, buffer::Blocal, id::UInt) where {Store_remote, Blocal}
thunk_id = STREAM_THUNK_ID[]
@dagdebug thunk_id :stream "fetching values"
@label fetch_values
# FIXME: Pass buffer free space
# TODO: It would be ideal if we could wait on store.lock, but get unlocked during migration
values = MemPool.access_ref(store_ref.handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
if !isopen(store)
throw(InvalidStateException("Stream is closed", :closed))
end
@dagdebug thunk_id :stream "trying to fetch values at $(myid())"
store::Store_remote
in_store = store
STREAM_THUNK_ID[] = thunk_id
values = T[]
while !isempty(store, id)
value = take!(store, id)::T
push!(values, value)
end
return values
end::Vector{T}
if isempty(values)
@goto fetch_values

values = T[]
while isempty(values)
# FIXME: Pass buffer free space
# TODO: It would be ideal if we could wait on store.lock, but get unlocked during migration
values = MemPool.access_ref(store_ref.handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
if !isopen(store)
throw(InvalidStateException("Stream is closed", :closed))
end
@dagdebug thunk_id :stream "trying to fetch values at $(myid())"
store::Store_remote
in_store = store
STREAM_THUNK_ID[] = thunk_id
values = T[]
while !isempty(store, id)
value = take!(store, id)::T
push!(values, value)
end
return values
end::Vector{T}

# We explicitly yield in the loop to allow other tasks to run. This
# matters on single-threaded instances because MemPool.access_ref()
# might not yield when accessing data locally, which can cause this loop
# to spin forever.
yield()
end

@dagdebug thunk_id :stream "fetched $(length(values)) values"
Expand Down