Skip to content

Commit 2b71950

Browse files
authored
Merge pull request #364 from JuliaParallel/jps/weak-chunk
chunks: Allow weak Chunk references in Thunk args
2 parents d270587 + 66bf970 commit 2b71950

File tree

4 files changed

+16
-3
lines changed

4 files changed

+16
-3
lines changed

Diff for: src/chunks.jl

+10
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,16 @@ function savechunk(data, dir, f)
266266
Chunk{typeof(data),typeof(fr),typeof(proc),typeof(scope)}(typeof(data), domain(data), fr, proc, scope, true)
267267
end
268268

269+
struct WeakChunk
270+
x::WeakRef
271+
end
272+
WeakChunk(c::Chunk) = WeakChunk(WeakRef(c))
273+
unwrap_weak(c::WeakChunk) = c.x.value
274+
function unwrap_weak_checked(c::WeakChunk)
275+
c = unwrap_weak(c)
276+
@assert c !== nothing
277+
return c
278+
end
269279

270280
Base.@deprecate_binding AbstractPart Union{Chunk, Thunk}
271281
Base.@deprecate_binding Part Chunk

Diff for: src/sch/Sch.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import Statistics: mean
88
import Random: randperm
99

1010
import ..Dagger
11-
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, AnyScope
11+
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, AnyScope
1212
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, default_enabled, get_processors, get_parent, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime
1313

1414
const OneToMany = Dict{Thunk, Set{Thunk}}

Diff for: src/sch/eager.jl

+4-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ function eager_thunk()
9090
added_future, future, uid, ref, f, args, opts = take!(EAGER_THUNK_CHAN)
9191
# preserve inputs until they enter the scheduler
9292
tid = GC.@preserve args begin
93-
_args = map(x->x isa Dagger.EagerThunk ? ThunkID(EAGER_ID_MAP[x.uid], x.thunk_ref) : x, args)
93+
_args = map(x->x isa Dagger.EagerThunk ? ThunkID(EAGER_ID_MAP[x.uid], x.thunk_ref) :
94+
x isa Dagger.Chunk ? WeakChunk(x) :
95+
x,
96+
args)
9497
add_thunk!(f, h, _args...; future=future, ref=ref, opts...)
9598
end
9699
EAGER_ID_MAP[uid] = tid.id

Diff for: src/sch/util.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ function reschedule_inputs!(state, thunk, seen=Set{Thunk}())
113113
input in seen && continue
114114

115115
# Unseen
116-
if istask(input) || (input isa Chunk)
116+
if istask(input) || isa(input, Chunk)
117117
push!(get!(()->Set{Thunk}(), state.waiting_data, input), thunk)
118118
end
119119
istask(input) || continue

0 commit comments

Comments
 (0)