Skip to content

Commit 3a3b4a3

Browse files
authored
Merge pull request #221 from JuliaParallel/jps/net-min
Optimize thunk placement for input affinity
2 parents b3ec61e + 9391081 commit 3a3b4a3

14 files changed

+342
-174
lines changed

Diff for: Project.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "Dagger"
22
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
3-
version = "0.13.4"
3+
version = "0.13.5"
44

55
[deps]
66
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
@@ -21,7 +21,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2121

2222
[compat]
2323
Colors = "0.10, 0.11, 0.12"
24-
MemPool = "0.3.4"
24+
MemPool = "0.3.5"
2525
Requires = "1"
2626
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33"
2727
TableOperations = "1"

Diff for: benchmarks/benchmark.jl

+10-10
Original file line numberDiff line numberDiff line change
@@ -204,17 +204,17 @@ function nmf_suite(ctx; dagger, accel)
204204
@info "Starting $_nw worker Dagger NNMF (scale by $_scale)"
205205
if $accel == "cuda"
206206
# FIXME: Allocate with CUDA.rand if possible
207-
$X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $ncol); options=$opts))
208-
$W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $nfeatures); options=$opts))
209-
$H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nfeatures, $ncol); options=$opts))
207+
$X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $ncol); options=$opts))
208+
$W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $nfeatures); options=$opts))
209+
$H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($bsz, $bsz), Float32, $nfeatures, $ncol); options=$opts))
210210
elseif $accel == "amdgpu"
211-
$X[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts))
212-
$W[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts))
213-
$H[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts))
211+
$X[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $ncol); options=$opts))
212+
$W[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $nfeatures); options=$opts))
213+
$H[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($bsz, $bsz), Float32, $nfeatures, $ncol); options=$opts))
214214
elseif $accel == "cpu"
215-
$X[] = compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $ncol); options=$opts)
216-
$W[] = compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $nfeatures); options=$opts)
217-
$H[] = compute(rand(Blocks($nrow, $bsz), Float32, $nfeatures, $ncol); options=$opts)
215+
$X[] = compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $ncol); options=$opts)
216+
$W[] = compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $nfeatures); options=$opts)
217+
$H[] = compute(rand(Blocks($bsz, $bsz), Float32, $nfeatures, $ncol); options=$opts)
218218
end
219219
end teardown=begin
220220
if render != "" && !live
@@ -280,7 +280,7 @@ function main()
280280
push!(d3r, LinePlot(:core, :loadavg, "CPU Load Average", "Average Running Threads"))
281281
push!(d3r, LinePlot(:core, :bytes, "Allocated Bytes", "Bytes"))
282282
push!(d3r, LinePlot(:core, :mem, "Available Memory", "% Free"))
283-
push!(d3r, GraphPlot(:core, :id, :timeline, :profile, "DAG"))
283+
#push!(d3r, GraphPlot(:core, :id, :timeline, :profile, "DAG"))
284284
push!(lw.creation_handlers, d3r)
285285
push!(lw.deletion_handlers, d3r)
286286
ml.aggregators[:logwindow] = lw

Diff for: src/Dagger.jl

+5-3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ include("array/sort.jl")
5151

5252
# Other
5353
include("ui/graph.jl")
54+
include("ui/gantt-common.jl")
55+
include("ui/gantt-text.jl")
5456

5557
include("table/dtable.jl")
5658
include("table/operations.jl")
@@ -63,15 +65,15 @@ function __init__()
6365

6466
@require Luxor="ae8d54c2-7ccd-5906-9d76-62fc9837b5bc" begin
6567
# Gantt chart renderer
66-
include("ui/gantt.jl")
68+
include("ui/gantt-luxor.jl")
6769
end
6870
@require Mux="a975b10e-0019-58db-a62f-e48ff68538c9" begin
6971
# Gantt chart HTTP server
70-
include("ui/gantt-server.jl")
72+
include("ui/gantt-mux.jl")
7173
end
7274
@require ProfileSVG="132c30aa-f267-4189-9183-c8a63c7e05e6" begin
7375
# Profile renderer
74-
include("ui/profile.jl")
76+
include("ui/profile-profilesvg.jl")
7577
end
7678
@require FFMPEG="c87230d0-a227-11e9-1b43-d7ebe4e7570a" begin
7779
@require FileIO="5789e2e9-d7fb-5bc7-8068-2c6fae9b9549" begin

Diff for: src/chunks.jl

+2-6
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,13 @@ mutable struct Chunk{T, H, P<:Processor, S<:AbstractScope}
5353
processor::P
5454
scope::S
5555
persist::Bool
56-
function (::Type{Chunk{T,H,P,S}})(::Type{T}, domain, handle, processor, scope, persist) where {T,H,P,S}
57-
c = new{T,H,P,S}(T, domain, handle, processor, scope, persist)
58-
finalizer(x -> @async(myid() == 1 && free!(x)), c)
59-
c
60-
end
6156
end
6257

6358
domain(c::Chunk) = c.domain
6459
chunktype(c::Chunk) = c.chunktype
6560
persist!(t::Chunk) = (t.persist=true; t)
6661
shouldpersist(p::Chunk) = t.persist
62+
processor(c::Chunk) = c.processor
6763
affinity(c::Chunk) = affinity(c.handle)
6864

6965
function unrelease(c::Chunk{<:Any,DRef})
@@ -117,7 +113,7 @@ move(to_proc::Processor, x) =
117113
move(OSProc(), to_proc, x)
118114

119115
### ChunkIO
120-
affinity(r::DRef) = Pair{OSProc, UInt64}[OSProc(r.owner) => r.size]
116+
affinity(r::DRef) = OSProc(r.owner)=>r.size
121117
function affinity(r::FileRef)
122118
if haskey(MemPool.who_has_read, r.file)
123119
Pair{OSProc, UInt64}[OSProc(dref.owner) => r.size for dref in MemPool.who_has_read[r.file]]

Diff for: src/processor.jl

+9-2
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ struct OSProc <: Processor
101101
end
102102
end
103103
const OSPROC_CACHE = Dict{Int,Vector{Processor}}()
104-
children(proc::OSProc) = OSPROC_CACHE[proc.pid]
104+
get_parent(proc::OSProc) = proc
105+
children(proc::OSProc) = get(OSPROC_CACHE, proc.pid, Processor[])
105106
function get_proc_hierarchy()
106107
children = Processor[]
107108
for name in keys(PROCESSOR_CALLBACKS)
@@ -158,11 +159,17 @@ iscompatible_arg(proc::ThreadProc, opts, x) = true
158159
@static if VERSION >= v"1.3.0-DEV.573"
159160
function execute!(proc::ThreadProc, f, args...)
160161
tls = get_tls()
161-
task = Threads.@spawn begin
162+
task = Task() do
162163
set_tls!(tls)
163164
prof_task_put!(tls.sch_handle.thunk_id.id)
164165
f(args...)
165166
end
167+
ret = ccall(:jl_set_task_tid, Cint, (Any, Cint), task, proc.tid-1)
168+
if ret == 0
169+
error("jl_set_task_tid == 0")
170+
end
171+
@assert Threads.threadid(task) == proc.tid
172+
schedule(task)
166173
try
167174
fetch(task)
168175
catch err

0 commit comments

Comments
 (0)