Skip to content

Commit b7db82f

Browse files
committed
Task names and GraphVizExt improvements
at-spawn: Add `name` option for custom task names logging: Allow `DTask` in `logs_annotate!` logging: Allow `DArray` in `logs_annotate!` (labels array chunks) logging: Add task result, DTask-to-Thunk, UID-to-TID logging events logging: Add `all_task_deps` option to `enable_logging!` for convenience GraphVizExt: Properly label tasks and objects GraphVizExt: Show box for task and oval for object GraphVizExt: Improve connectivity checks
1 parent a628800 commit b7db82f

File tree

6 files changed

+349
-90
lines changed

6 files changed

+349
-90
lines changed

Diff for: ext/GraphVizExt.jl

+226-76
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ else
77
end
88

99
import Dagger
10-
import Dagger: DTask, Chunk, Processor
10+
import Dagger: DTask, Chunk, Processor, LoggedMutableObject
1111
import Dagger.TimespanLogging: Timespan
1212
import Graphs: SimpleDiGraph, add_edge!, add_vertex!, inneighbors, outneighbors, vertices, is_directed, edges, nv, src, dst
1313

@@ -24,6 +24,8 @@ function pretty_time(t; digits=3)
2424
end
2525
end
2626

27+
sanitize_label(label::String) = replace(label, "\"" => "\\\"")
28+
2729
_name_to_color(name::AbstractString, colors) =
2830
colors[mod1(hash(name), length(colors))]
2931
_name_to_color(name::AbstractString, ::Nothing) = "black"
@@ -51,12 +53,22 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
5153
colors=_default_colors, name_to_color=_name_to_color)
5254
# Lookup all relevant task/argument dependencies and values in logs
5355
g = SimpleDiGraph()
56+
5457
tid_to_vertex = Dict{Int,Int}()
55-
task_names = String[]
58+
tid_to_auto_name = Dict{Int,String}()
59+
tid_to_name = Dict{Int,String}()
5660
tid_to_proc = Dict{Int,Processor}()
57-
arg_names = Dict{UInt,String}()
61+
62+
objid_to_vertex = Dict{UInt,Int}()
63+
objid_to_name = Dict{UInt,String}()
64+
5865
task_args = Dict{Int,Vector{Pair{Union{Int,Symbol},UInt}}}()
59-
arg_moves = Dict{Int,Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}}()
66+
task_arg_moves = Dict{Int,Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}}()
67+
task_result = Dict{Int,UInt}()
68+
69+
uid_to_tid = Dict{UInt,Int}()
70+
dtasks_to_patch = Set{UInt}()
71+
6072
for w in keys(logs)
6173
for idx in 1:length(logs[w][:core])
6274
category = logs[w][:core][idx].category
@@ -67,40 +79,143 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
6779
taskdeps = logs[w][:taskdeps][idx]::Pair{Int,Vector{Int}}
6880
taskname = logs[w][:tasknames][idx]::String
6981
tid, deps = taskdeps
70-
add_vertex!(g)
71-
tid_to_vertex[tid] = nv(g)
72-
push!(task_names, taskname)
82+
v = get!(tid_to_vertex, tid) do
83+
add_vertex!(g)
84+
tid_to_vertex[tid] = nv(g)
85+
nv(g)
86+
end
87+
tid_to_auto_name[tid] = taskname
7388
for dep in deps
74-
haskey(tid_to_vertex, dep) || continue
75-
add_edge!(g, tid_to_vertex[dep], nv(g))
89+
dep_v = get!(tid_to_vertex, dep) do
90+
add_vertex!(g)
91+
tid_to_vertex[dep] = nv(g)
92+
nv(g)
93+
end
94+
add_edge!(g, dep_v, v)
95+
end
96+
if haskey(logs[w], :taskuidtotid)
97+
uid_tid = logs[w][:taskuidtotid][idx]
98+
if uid_tid !== nothing
99+
uid, tid = uid_tid::Pair{UInt,Int}
100+
uid_to_tid[uid] = tid
101+
end
76102
end
77103
elseif category == :compute && kind == :start
78104
id::NamedTuple
79105
tid = id.thunk_id
80106
proc = id.processor
81107
tid_to_proc[tid] = proc
108+
elseif category == :compute && kind == :finish
109+
if haskey(logs[w], :taskresult)
110+
result_info = logs[w][:taskresult][idx]
111+
result_info === nothing && continue
112+
tid, obj = result_info::Pair{Int,LoggedMutableObject}
113+
objid = obj.objid
114+
task_result[tid] = objid
115+
v = get!(objid_to_vertex, objid) do
116+
add_vertex!(g)
117+
objid_to_vertex[objid] = nv(g)
118+
nv(g)
119+
end
120+
add_edge!(g, tid_to_vertex[tid], v)
121+
end
82122
elseif category == :move && kind == :finish
83123
if haskey(logs[w], :taskargs)
84-
id, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector}
85-
append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, id), args)
124+
tid, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector}
125+
args = map(arg->arg[1]=>arg[2].objid, args)
126+
append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, tid), args)
127+
for arg in args
128+
objid = arg[2]
129+
arg_id = get!(objid_to_vertex, objid) do
130+
add_vertex!(g)
131+
objid_to_vertex[objid] = nv(g)
132+
nv(g)
133+
end
134+
add_edge!(g, arg_id, tid_to_vertex[tid])
135+
end
86136
end
87137
if haskey(logs[w], :taskargmoves)
88138
move_info = logs[w][:taskargmoves][idx]
89139
move_info === nothing && continue
90-
tid, pos, pre_objid, post_objid = move_info
91-
v = get!(Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}, arg_moves, tid)
140+
tid, pos, pre_obj, post_obj = move_info
141+
v = get!(Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}, task_arg_moves, tid)
142+
pre_objid = pre_obj.objid
143+
post_objid = post_obj.objid
92144
push!(v, pos => (pre_objid, post_objid))
145+
pre_arg_id = get!(objid_to_vertex, pre_objid) do
146+
add_vertex!(g)
147+
objid_to_vertex[pre_objid] = nv(g)
148+
nv(g)
149+
end
150+
post_arg_id = get!(objid_to_vertex, post_objid) do
151+
add_vertex!(g)
152+
objid_to_vertex[post_objid] = nv(g)
153+
nv(g)
154+
end
155+
add_edge!(g, pre_arg_id, post_arg_id)
93156
end
94157
elseif category == :data_annotation && kind == :start
95158
id::NamedTuple
96-
objid = id.objectid
97-
name = id.name
98-
arg_names[objid] = String(name)
159+
name = String(id.name)
160+
obj = id.objectid::LoggedMutableObject
161+
objid = obj.objid
162+
objid_to_name[objid] = name
163+
if obj.kind == :task
164+
# N.B. We don't need the object vertex,
165+
# since we'll just render it as a task
166+
push!(dtasks_to_patch, objid)
167+
else
168+
get!(objid_to_vertex, objid) do
169+
add_vertex!(g)
170+
objid_to_vertex[objid] = nv(g)
171+
nv(g)
172+
end
173+
end
174+
elseif category == :finish && kind == :finish
175+
if haskey(logs[w], :tasktochunk)
176+
tid_chunk = logs[w][:tasktochunk][idx]
177+
if tid_chunk !== nothing
178+
tid, chunk_obj = tid_chunk::Pair{Int,LoggedMutableObject}
179+
chunk_id = chunk_obj.objid
180+
v = get!(objid_to_vertex, chunk_id) do
181+
add_vertex!(g)
182+
objid_to_vertex[chunk_id] = nv(g)
183+
nv(g)
184+
end
185+
add_edge!(g, tid_to_vertex[tid], v)
186+
end
187+
end
99188
end
100189
end
101190
end
102-
tids_sorted = map(first, sort(collect(tid_to_vertex); by=last))
103-
task_procs = Processor[tid_to_proc[tid] for tid in tids_sorted]
191+
192+
# Process DTasks-to-Thunk mappings
193+
for uid in dtasks_to_patch
194+
if haskey(uid_to_tid, uid)
195+
tid = uid_to_tid[uid]
196+
v = get!(tid_to_vertex, tid) do
197+
add_vertex!(g)
198+
tid_to_vertex[tid] = nv(g)
199+
nv(g)
200+
end
201+
202+
# Fixup any missing tid data
203+
if haskey(objid_to_name, uid)
204+
tid_to_name[tid] = objid_to_name[uid]
205+
end
206+
end
207+
end
208+
209+
# Auto-assign names
210+
for (tid, name) in tid_to_auto_name
211+
if !haskey(tid_to_name, tid)
212+
tid_to_name[tid] = name
213+
end
214+
end
215+
216+
# Create reverse mappings
217+
vertex_to_tid = Dict{Int,Int}(v=>k for (k,v) in tid_to_vertex)
218+
vertex_to_objid = Dict{Int,UInt}(v=>k for (k,v) in objid_to_vertex)
104219

105220
# Find all connected and disconnected vertices
106221
if !disconnected
@@ -110,31 +225,7 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
110225
con_vs = vertices(g)
111226
end
112227

113-
# Assign colors
114-
labels = task_names
115-
all_fns = unique(map(label->first(split(label, " ")), labels[con_vs]))
116-
all_procs = unique(task_procs)
117-
118-
if color_by == :fn
119-
_colors = [name_to_color(all_fns[i], colors) for i in 1:length(all_fns)]
120-
colors = Dict(v=>_colors[findfirst(fn->occursin(fn, labels[v]), all_fns)] for v in con_vs)
121-
elseif color_by == :proc
122-
_colors = [name_to_color(string(all_procs[i]), colors) for i in 1:length(all_procs)]
123-
colors = Dict(v=>_colors[findfirst(proc->proc==task_procs[v], all_procs)] for v in con_vs)
124-
else
125-
throw(ArgumentError("Unknown `color_by` value: $color_by\nAllowed: :fn, :proc"))
126-
end
127-
128-
str = is_directed(g) ? "digraph mygraph {\n" : "graph mygraph {\n"
129-
130-
# Add raw arguments
131-
for (id, name) in arg_names
132-
str *= "a$id [label=\"$name\", shape=box]\n"
133-
end
134-
135228
if times
136-
vertex_to_tid = Dict{Int,Int}(v=>k for (k,v) in tid_to_vertex)
137-
138229
# Determine per-worker start times
139230
worker_start_times = Dict{Int,UInt64}()
140231
for w in keys(logs)
@@ -169,68 +260,127 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
169260
end
170261
end
171262

172-
# Add argument moves
173-
seen_moves = Set{Tuple{UInt,UInt}}()
174-
for (tid, moves) in arg_moves
175-
for (pos, (pre_objid, post_objid)) in moves
176-
pre_objid == post_objid && continue
177-
(pre_objid, post_objid) in seen_moves && continue
178-
push!(seen_moves, (pre_objid, post_objid))
179-
move_str = "a$pre_objid -> a$post_objid [label=\"move\"]\n"
180-
str *= move_str
181-
end
263+
# Get the set of all unique task and object IDs
264+
all_tids = collect(keys(tid_to_vertex))
265+
all_objids = collect(keys(objid_to_vertex))
266+
267+
# Assign colors
268+
if color_by == :fn
269+
all_fns = unique(map(label->first(split(label, " ")), values(tid_to_name)))
270+
_colors = [name_to_color(all_fns[i], colors) for i in 1:length(all_fns)]
271+
colors = Dict(tid=>_colors[findfirst(fn->occursin(fn, tid_to_name[tid]), all_fns)] for tid in all_tids)
272+
elseif color_by == :proc
273+
all_procs = unique(values(tid_to_proc))
274+
_colors = [name_to_color(string(all_procs[i]), colors) for i in 1:length(all_procs)]
275+
colors = Dict(tid=>_colors[findfirst(proc->proc==tid_to_proc[tid], all_procs)] for tid in all_tids)
276+
else
277+
throw(ArgumentError("Unknown `color_by` value: $color_by\nAllowed: :fn, :proc"))
182278
end
183279

184-
# Add tasks
185-
for v in con_vs
280+
str = is_directed(g) ? "digraph mygraph {\n" : "graph mygraph {\n"
281+
282+
# Add task vertices
283+
for tid in all_tids
284+
v = tid_to_vertex[tid]
186285
if !disconnected && (v in discon_vs)
187286
continue
188287
end
189-
label = labels[v]
190-
color = colors[v]
191-
proc = task_procs[v]
192-
proc_str = '(' * Dagger.short_name(task_procs[v]) * ')'
193-
label_str = "$label\\n$proc_str"
288+
label_str = tid_to_name[tid]
289+
if haskey(tid_to_auto_name, tid) && tid_to_name[tid] != tid_to_auto_name[tid]
290+
label_str *= "\\nTask: $(tid_to_auto_name[tid])"
291+
end
292+
color = colors[tid]
293+
proc = tid_to_proc[tid]
294+
label_str *= "\\n($(Dagger.short_name(tid_to_proc[tid])))"
194295
if times
195-
tid = vertex_to_tid[v]
196296
start_time = pretty_time(start_times[tid]; digits=times_digits)
197297
finish_time = pretty_time(finish_times[tid]; digits=times_digits)
198-
label_str *= "\\n[+$start_time -> +$finish_time]"
298+
diff_time = pretty_time(finish_times[tid] - start_times[tid]; digits=times_digits)
299+
label_str *= "\\n[+$start_time -> +$finish_time (diff: $diff_time)]"
199300
end
200-
str *= "v$v [label=\"$label_str\", color=\"$color\", penwidth=2.0]\n"
301+
label_str = sanitize_label(label_str)
302+
str *= "v$v [label=\"$label_str\", shape=box, color=\"$color\", penwidth=2.0]\n"
201303
end
202304

203-
# Add task dependencies
305+
# Add object vertices
306+
for objid in all_objids
307+
objid_v = objid_to_vertex[objid]
308+
if !disconnected && !(objid_v in con_vs)
309+
continue
310+
end
311+
if objid in dtasks_to_patch || haskey(uid_to_tid, objid)
312+
# DTask, skip it
313+
continue
314+
end
315+
# Object
316+
if haskey(objid_to_name, objid)
317+
label = sanitize_label(objid_to_name[objid])
318+
label *= "\\nData: $(repr(objid))"
319+
else
320+
label = "Data: $(repr(objid))"
321+
end
322+
str *= "a$objid_v [label=\"$label\", shape=oval]\n"
323+
end
324+
325+
# Add task argument move edges
326+
seen_moves = Set{Tuple{UInt,UInt}}()
327+
for (tid, moves) in task_arg_moves
328+
for (pos, (pre_objid, post_objid)) in moves
329+
pre_objid == post_objid && continue
330+
(pre_objid, post_objid) in seen_moves && continue
331+
push!(seen_moves, (pre_objid, post_objid))
332+
pre_objid_v = objid_to_vertex[pre_objid]
333+
post_objid_v = objid_to_vertex[post_objid]
334+
move_str = "a$pre_objid_v -> a$post_objid_v [label=\"move\"]\n"
335+
str *= move_str
336+
end
337+
end
338+
339+
# Add task-to-task (syncdep) dependency edges
204340
edge_sep = is_directed(g) ? "->" : "--"
205341
for edge in edges(g)
342+
if !haskey(vertex_to_tid, src(edge)) || !haskey(vertex_to_tid, dst(edge))
343+
continue
344+
end
345+
if !disconnected && !(src(edge) in con_vs) || !(dst(edge) in con_vs)
346+
continue
347+
end
206348
# FIXME: Label syncdeps with associated arguments and datadeps directions
207349
str *= "v$(src(edge)) $edge_sep v$(dst(edge)) [label=\"syncdep\"]\n"
208350
end
209351

210-
# Add task arguments
211-
con_args = Vector{UInt}(collect(keys(arg_names)))
212-
for moves in values(arg_moves)
213-
for (_, (pre_objid, post_objid)) in moves
214-
push!(con_args, pre_objid)
215-
push!(con_args, post_objid)
216-
end
217-
end
352+
# Add task argument edges
218353
for (tid, args) in task_args
219354
haskey(tid_to_vertex, tid) || continue
220-
id = tid_to_vertex[tid]
221-
id in con_vs || continue
355+
tid_v = tid_to_vertex[tid]
356+
tid_v in con_vs || continue
222357
for (pos, arg) in args
223-
if !disconnected && !(arg in con_args)
358+
arg_v = objid_to_vertex[arg]
359+
if !disconnected && !(arg_v in con_vs)
224360
continue
225361
end
226-
arg_str = pos isa Int ? "arg $pos" : "kwarg $pos"
227-
str *= "a$arg $edge_sep v$id [label=\"$arg_str\"]\n"
362+
arg_str = sanitize_label(pos isa Int ? "arg $pos" : "kwarg $pos")
363+
str *= "a$arg_v $edge_sep v$tid_v [label=\"$arg_str\"]\n"
228364
end
229365
end
230366

367+
# Add task result edges
368+
for (tid, result) in task_result
369+
haskey(tid_to_vertex, tid) || continue
370+
tid_v = tid_to_vertex[tid]
371+
tid_v in con_vs || continue
372+
result_v = objid_to_vertex[result]
373+
if !disconnected && !(result_v in con_vs)
374+
continue
375+
end
376+
str *= "v$tid_v $edge_sep a$result_v [label=\"result\"]\n"
377+
end
378+
379+
# Generate the final graph
231380
str *= "}\n"
232381
gv = GraphViz.Graph(str)
233382
GraphViz.layout!(gv; engine=layout_engine)
383+
234384
return gv
235385
end
236386

0 commit comments

Comments
 (0)