Skip to content

Commit 0a74110

Browse files
authored
Merge pull request #350 from JuliaParallel/kr/mempool-fix
Remove `poolset` kwarg that doesn't exist anymore
2 parents 52c90af + 7515391 commit 0a74110

File tree

5 files changed

+7
-56
lines changed

5 files changed

+7
-56
lines changed

Diff for: Project.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "Dagger"
22
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
3-
version = "0.15.0"
3+
version = "0.15.1"
44

55
[deps]
66
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
@@ -21,7 +21,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2121
[compat]
2222
Colors = "0.10, 0.11, 0.12"
2323
ContextVariablesX = "0.1"
24-
MemPool = "0.3.5"
24+
MemPool = "0.3.5, 0.4"
2525
Requires = "1"
2626
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33"
2727
julia = "1.6"

Diff for: src/array/darray.jl

+1-19
Original file line numberDiff line numberDiff line change
@@ -115,29 +115,11 @@ mutable struct DArray{T,N,F} <: ArrayOp{T, N}
115115
subdomains::AbstractArray{ArrayDomain{N}, N}
116116
chunks::AbstractArray{Union{Chunk,Thunk}, N}
117117
concat::F
118-
freed::Threads.Atomic{UInt8}
119118
function DArray{T,N,F}(domain, subdomains, chunks, concat::Function) where {T, N,F}
120-
new(domain, subdomains, chunks, concat, Threads.Atomic{UInt8}(0))
119+
new(domain, subdomains, chunks, concat)
121120
end
122121
end
123122

124-
function free_chunks(chunks)
125-
@sync for c in chunks
126-
if c isa Chunk{<:Any, DRef}
127-
# increment refcount on the master node
128-
@async free!(c.handle)
129-
elseif c isa Thunk
130-
free_chunks(c.inputs)
131-
end
132-
end
133-
end
134-
135-
function free!(x::DArray)
136-
freed = Bool(Threads.atomic_cas!(x.freed, UInt8(0), UInt8(1)))
137-
!freed && @async Dagger.free_chunks(x.chunks)
138-
nothing
139-
end
140-
141123
# mainly for backwards-compatibility
142124
DArray{T, N}(domain, subdomains, chunks) where {T,N} = DArray(T, domain, subdomains, chunks)
143125

Diff for: src/chunks.jl

+1-31
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,6 @@ shouldpersist(p::Chunk) = t.persist
6262
processor(c::Chunk) = c.processor
6363
affinity(c::Chunk) = affinity(c.handle)
6464

65-
function unrelease(c::Chunk{<:Any,DRef})
66-
# set spilltodisk = true if data is still around
67-
try
68-
destroyonevict(c.handle, false)
69-
return c
70-
catch err
71-
if isa(err, KeyError)
72-
return nothing
73-
else
74-
rethrow(err)
75-
end
76-
end
77-
end
78-
unrelease(c::Chunk) = c
79-
8065
Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle
8166
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, x)
8267

@@ -248,26 +233,11 @@ Base.map(f, s::Shard) = [Dagger.spawn(f, c) for c in values(s.chunks)]
248233
Create a chunk from sequential object `x` which resides on `proc`.
249234
"""
250235
function tochunk(x::X, proc::P=OSProc(), scope::S=AnyScope(); persist=false, cache=false) where {X,P,S}
251-
ref = poolset(x, destroyonevict=persist ? false : cache)
236+
ref = poolset(x)
252237
Chunk{X,typeof(ref),P,S}(X, domain(x), ref, proc, scope, persist)
253238
end
254239
tochunk(x::Union{Chunk, Thunk}, proc=nothing, scope=nothing) = x
255240

256-
# Check to see if the node is set to persist
257-
# if it is foce can override it
258-
function free!(s::Chunk{X,DRef,P,S}; force=true, cache=false) where {X,P,S}
259-
if force || !s.persist
260-
if cache
261-
try
262-
destroyonevict(s.handle, true) # keep around, but remove when evicted
263-
catch err
264-
isa(err, KeyError) || rethrow(err)
265-
end
266-
end
267-
end
268-
end
269-
free!(x; force=true,cache=false) = x # catch-all for non-chunks
270-
271241
function savechunk(data, dir, f)
272242
sz = open(joinpath(dir, f), "w") do io
273243
serialize(io, MemPool.MMWrap(data))

Diff for: src/compute.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
export stage, cached_stage, compute, debug_compute, free!, cleanup
1+
export stage, cached_stage, compute, debug_compute, cleanup
22

33
###### Scheduler #######
44

Diff for: src/sch/Sch.jl

+2-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import Random: randperm
77

88
import ..Dagger
99
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, AnyScope
10-
import ..Dagger: order, free!, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, unrelease, procs, move, capacity, chunktype, processor, default_enabled, get_processors, get_parent, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime
10+
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, capacity, chunktype, processor, default_enabled, get_processors, get_parent, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime
1111

1212
const OneToMany = Dict{Thunk, Set{Thunk}}
1313

@@ -850,8 +850,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
850850
state.running_on[thunk] = gproc
851851
if thunk.cache && thunk.cache_ref !== nothing
852852
# the result might be already cached
853-
data = unrelease(thunk.cache_ref) # ask worker to keep the data around
854-
# till this compute cycle frees it
853+
data = thunk.cache_ref
855854
if data !== nothing
856855
# cache hit
857856
state.cache[thunk] = data

0 commit comments

Comments
 (0)