Skip to content

Commit 7a1b408

Browse files
committed
streaming: Fix concurrency issues
1 parent 32c301a commit 7a1b408

File tree

2 files changed

+62
-50
lines changed

2 files changed

+62
-50
lines changed

Diff for: src/stream-fetchers.jl

+25-19
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,31 @@
11
struct RemoteFetcher end
22
function stream_fetch_values!(::Type{RemoteFetcher}, T, store_ref::Chunk{Store_remote}, buffer::Blocal, id::UInt) where {Store_remote, Blocal}
3-
if store_ref.handle.owner == myid()
4-
store = fetch(store_ref)::Store_remote
5-
while !isfull(buffer)
6-
value = take!(store, id)::T
7-
put!(buffer, value)
3+
thunk_id = STREAM_THUNK_ID[]
4+
@dagdebug thunk_id :stream "fetching values"
5+
@label fetch_values
6+
# FIXME: Pass buffer free space
7+
# TODO: It would be ideal if we could wait on store.lock, but get unlocked during migration
8+
values = MemPool.access_ref(store_ref.handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
9+
if !isopen(store)
10+
throw(InvalidStateException("Stream is closed", :closed))
811
end
9-
else
10-
thunk_id = STREAM_THUNK_ID[]
11-
values = remotecall_fetch(store_ref.handle.owner, store_ref.handle, id, T, Store_remote) do store_ref, id, T, Store_remote
12-
STREAM_THUNK_ID[] = thunk_id
13-
store = MemPool.poolget(store_ref)::Store_remote
14-
values = T[]
15-
while !isempty(store, id)
16-
value = take!(store, id)::T
17-
push!(values, value)
18-
end
19-
return values
20-
end::Vector{T}
21-
for value in values
22-
put!(buffer, value)
12+
@dagdebug thunk_id :stream "trying to fetch values at $(myid())"
13+
store::Store_remote
14+
in_store = store
15+
STREAM_THUNK_ID[] = thunk_id
16+
values = T[]
17+
while !isempty(store, id)
18+
value = take!(store, id)::T
19+
push!(values, value)
2320
end
21+
return values
22+
end::Vector{T}
23+
if isempty(values)
24+
@goto fetch_values
25+
end
26+
27+
@dagdebug thunk_id :stream "fetched $(length(values)) values"
28+
for value in values
29+
put!(buffer, value)
2430
end
2531
end

Diff for: src/stream.jl

+37-31
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@ function tid_to_uid(thunk_id)
1919
end
2020
function Base.put!(store::StreamStore{T,B}, value) where {T,B}
2121
thunk_id = STREAM_THUNK_ID[]
22-
uid = tid_to_uid(thunk_id)
2322
@lock store.lock begin
2423
if !isopen(store)
25-
@dagdebug thunk_id :stream "[$uid] closed!"
24+
@dagdebug thunk_id :stream "closed!"
2625
throw(InvalidStateException("Stream is closed", :closed))
2726
end
28-
@dagdebug thunk_id :stream "[$uid] adding $value"
27+
@dagdebug thunk_id :stream "adding $value"
2928
for buffer in values(store.buffers)
3029
while isfull(buffer)
31-
@dagdebug thunk_id :stream "[$uid] buffer full, waiting"
30+
@dagdebug thunk_id :stream "buffer full, waiting"
3231
wait(store.lock)
3332
end
3433
put!(buffer, value)
@@ -38,16 +37,15 @@ function Base.put!(store::StreamStore{T,B}, value) where {T,B}
3837
end
3938
function Base.take!(store::StreamStore, id::UInt)
4039
thunk_id = STREAM_THUNK_ID[]
41-
uid = tid_to_uid(thunk_id)
4240
@lock store.lock begin
4341
buffer = store.buffers[id]
4442
while isempty(buffer) && isopen(store, id)
45-
@dagdebug thunk_id :stream "[$uid] no elements, not taking"
43+
@dagdebug thunk_id :stream "no elements, not taking"
4644
wait(store.lock)
4745
end
48-
@dagdebug thunk_id :stream "[$uid] wait finished"
46+
@dagdebug thunk_id :stream "wait finished"
4947
if !isopen(store, id)
50-
@dagdebug thunk_id :stream "[$uid] closed!"
48+
@dagdebug thunk_id :stream "closed!"
5149
throw(InvalidStateException("Stream is closed", :closed))
5250
end
5351
unlock(store.lock)
@@ -56,7 +54,7 @@ function Base.take!(store::StreamStore, id::UInt)
5654
finally
5755
lock(store.lock)
5856
end
59-
@dagdebug thunk_id :stream "[$uid] value accepted"
57+
@dagdebug thunk_id :stream "value accepted"
6058
notify(store.lock)
6159
return value
6260
end
@@ -129,46 +127,53 @@ function Base.take!(stream::Stream{T,B}, id::UInt) where {T,B}
129127
return take!(stream.input_buffer)
130128
end
131129
function Base.isopen(stream::Stream, id::UInt)::Bool
132-
return remotecall_fetch(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
133-
return isopen(MemPool.poolget(ref)::StreamStore, id)
130+
return MemPool.access_ref(stream.store_ref.handle, id) do store, id
131+
return isopen(store::StreamStore, id)
134132
end
135133
end
136134
function Base.close(stream::Stream)
137-
remotecall_wait(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
138-
close(MemPool.poolget(ref)::StreamStore)
135+
MemPool.access_ref(stream.store_ref.handle) do store
136+
close(store::StreamStore)
137+
return
139138
end
139+
return
140140
end
141141
function add_waiters!(stream::Stream, waiters::Vector{Int})
142-
remotecall_wait(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
143-
add_waiters!(MemPool.poolget(ref)::StreamStore, waiters)
142+
MemPool.access_ref(stream.store_ref.handle, waiters) do store, waiters
143+
add_waiters!(store::StreamStore, waiters)
144+
return
144145
end
146+
return
145147
end
146148
add_waiters!(stream::Stream, waiter::Integer) =
147149
add_waiters!(stream::Stream, Int[waiter])
148150
function remove_waiters!(stream::Stream, waiters::Vector{Int})
149-
remotecall_wait(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
150-
remove_waiters!(MemPool.poolget(ref)::StreamStore, waiters)
151+
MemPool.access_ref(stream.store_ref.handle, waiters) do store, waiters
152+
remove_waiters!(store::StreamStore, waiters)
153+
return
151154
end
155+
return
152156
end
153157
remove_waiters!(stream::Stream, waiter::Integer) =
154158
remove_waiters!(stream::Stream, Int[waiter])
155159

156160
function migrate_stream!(stream::Stream, w::Integer=myid())
157-
if !isdefined(MemPool, :migrate!)
158-
@warn "MemPool migration support not enabled!\nPerformance may be degraded" maxlog=1
159-
return
160-
end
161-
162161
# Perform migration of the StreamStore
163162
# MemPool will block access to the new ref until the migration completes
163+
# FIXME: Do this with MemPool.access_ref, in case stream was already migrated
164164
if stream.store_ref.handle.owner != w
165-
# Take lock to prevent any further modifications
166-
# N.B. Serialization automatically unlocks
167-
remotecall_wait(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
168-
lock((MemPool.poolget(ref)::StreamStore).lock)
165+
new_store_ref = MemPool.migrate!(stream.store_ref.handle, w; pre_migration=store->begin
166+
# Lock store to prevent any further modifications
167+
# N.B. Serialization automatically unlocks the migrated copy
168+
lock((store::StreamStore).lock)
169+
end, post_migration=store->begin
170+
# Unlock the store
171+
# FIXME: Indicate to all waiters that this store is dead
172+
unlock((store::StreamStore).lock)
173+
end)
174+
if w == myid()
175+
stream.store = MemPool.access_ref(identity, new_store_ref; local_only=true)
169176
end
170-
171-
MemPool.migrate!(stream.store_ref.handle, w)
172177
end
173178
end
174179

@@ -272,6 +277,7 @@ function (sf::StreamingFunction)(args...; kwargs...)
272277
# Migrate our output stream to this worker
273278
if sf.stream isa Stream
274279
migrate_stream!(sf.stream)
280+
@dagdebug thunk_id :stream "Migration complete"
275281
end
276282

277283
try
@@ -299,13 +305,13 @@ function (sf::StreamingFunction)(args...; kwargs...)
299305
end
300306
end
301307
for stream in streams
302-
@dagdebug thunk_id :stream "[$uid] dropping waiter"
308+
@dagdebug thunk_id :stream "dropping waiter"
303309
remove_waiters!(stream, uid)
304-
@dagdebug thunk_id :stream "[$uid] dropped waiter"
310+
@dagdebug thunk_id :stream "dropped waiter"
305311
end
306312

307313
# Ensure downstream tasks also terminate
308-
@dagdebug thunk_id :stream "[$uid] closed stream"
314+
@dagdebug thunk_id :stream "closed stream"
309315
close(sf.stream)
310316
end
311317
end

0 commit comments

Comments
 (0)