|
| 1 | +module GraphVizExt |
| 2 | + |
| 3 | +if isdefined(Base, :get_extension) |
| 4 | + using GraphViz |
| 5 | +else |
| 6 | + using ..GraphViz |
| 7 | +end |
| 8 | + |
| 9 | +import Dagger |
| 10 | +import Dagger: DTask, Chunk, Processor |
| 11 | +import Dagger.TimespanLogging: Timespan |
| 12 | +import Graphs: SimpleDiGraph, add_edge!, add_vertex!, inneighbors, outneighbors, vertices, is_directed, edges, nv, src, dst |
| 13 | + |
| 14 | +function pretty_time(t; digits=3) |
| 15 | + r(t) = round(t; digits) |
| 16 | + if t > 1000^3 |
| 17 | + "$(r(t/(1000^3))) s" |
| 18 | + elseif t > 1000^2 |
| 19 | + "$(r(t/(1000^2))) ms" |
| 20 | + elseif t > 1000 |
| 21 | + "$(r(t/1000)) us" |
| 22 | + else |
| 23 | + "$(r(t)) ns" |
| 24 | + end |
| 25 | +end |
| 26 | + |
| 27 | +""" |
| 28 | + Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, |
| 29 | + color_by=:fn, layout_engine="dot", |
| 30 | + times::Bool=true, times_digits::Integer=3) |
| 31 | +
|
| 32 | +Render a graph of the task dependencies and data dependencies in `logs` using GraphViz. |
| 33 | +
|
| 34 | +Requires the following events enabled in `enable_logging!`: `taskdeps`, `tasknames`, `taskargs`, `taskargmoves` |
| 35 | +
|
| 36 | +Options: |
| 37 | +- `disconnected`: If `true`, render disconnected vertices (tasks or arguments without upstream/downstream dependencies) |
| 38 | +- `color_by`: How to color tasks; if `:fn`, then color by unique function name, if `:proc`, then color by unique processor |
| 39 | +- `layout_engine`: The layout engine to use for GraphViz |
| 40 | +- `times`: If `true`, annotate each task with its start and finish times |
| 41 | +- `times_digits`: Number of digits to display in the time annotations |
| 42 | +""" |
| 43 | +function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, |
| 44 | + color_by=:fn, layout_engine="dot", |
| 45 | + times::Bool=true, times_digits::Integer=3) |
| 46 | + # Lookup all relevant task/argument dependencies and values in logs |
| 47 | + g = SimpleDiGraph() |
| 48 | + tid_to_vertex = Dict{Int,Int}() |
| 49 | + task_names = String[] |
| 50 | + tid_to_proc = Dict{Int,Processor}() |
| 51 | + arg_names = Dict{UInt,String}() |
| 52 | + task_args = Dict{Int,Vector{Pair{Union{Int,Symbol},UInt}}}() |
| 53 | + arg_moves = Dict{Int,Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}}() |
| 54 | + for w in keys(logs) |
| 55 | + for idx in 1:length(logs[w][:core]) |
| 56 | + category = logs[w][:core][idx].category |
| 57 | + kind = logs[w][:core][idx].kind |
| 58 | + id = logs[w][:id][idx] |
| 59 | + if category == :add_thunk && kind == :start |
| 60 | + id::NamedTuple |
| 61 | + taskdeps = logs[w][:taskdeps][idx]::Pair{Int,Vector{Int}} |
| 62 | + taskname = logs[w][:tasknames][idx]::String |
| 63 | + tid, deps = taskdeps |
| 64 | + add_vertex!(g) |
| 65 | + tid_to_vertex[tid] = nv(g) |
| 66 | + push!(task_names, taskname) |
| 67 | + for dep in deps |
| 68 | + add_edge!(g, tid_to_vertex[dep], nv(g)) |
| 69 | + end |
| 70 | + if haskey(logs[w], :taskargs) |
| 71 | + id, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector} |
| 72 | + append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, id), args) |
| 73 | + end |
| 74 | + elseif category == :compute && kind == :start |
| 75 | + id::NamedTuple |
| 76 | + tid = id.thunk_id |
| 77 | + proc = id.processor |
| 78 | + tid_to_proc[tid] = proc |
| 79 | + elseif category == :move && kind == :finish |
| 80 | + if haskey(logs[w], :taskargmoves) |
| 81 | + move_info = logs[w][:taskargmoves][idx] |
| 82 | + move_info === nothing && continue |
| 83 | + tid, pos, pre_objid, post_objid = move_info |
| 84 | + v = get!(Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}, arg_moves, tid) |
| 85 | + push!(v, pos => (pre_objid, post_objid)) |
| 86 | + end |
| 87 | + elseif category == :data_annotation && kind == :start |
| 88 | + id::NamedTuple |
| 89 | + objid = id.objectid |
| 90 | + name = id.name |
| 91 | + arg_names[objid] = name |
| 92 | + end |
| 93 | + end |
| 94 | + end |
| 95 | + tids_sorted = map(first, sort(collect(tid_to_vertex); by=last)) |
| 96 | + task_procs = Processor[tid_to_proc[tid] for tid in tids_sorted] |
| 97 | + |
| 98 | + # Find all connected and disconnected vertices |
| 99 | + if !disconnected |
| 100 | + discon_vs = filter(v->isempty(inneighbors(g, v)) && isempty(outneighbors(g, v)), vertices(g)) |
| 101 | + con_vs = filter(v->!in(v, discon_vs), vertices(g)) |
| 102 | + else |
| 103 | + con_vs = vertices(g) |
| 104 | + end |
| 105 | + |
| 106 | + # Assign colors |
| 107 | + labels = task_names |
| 108 | + all_fns = unique(map(label->first(split(label, " ")), labels[con_vs])) |
| 109 | + all_procs = unique(task_procs) |
| 110 | + all_colors = ("red", "orange", "green", "blue", "purple", "pink", "silver") |
| 111 | + if color_by == :fn |
| 112 | + _colors = [all_colors[mod1(i, length(all_colors))] for i in 1:length(all_fns)] |
| 113 | + colors = Dict(v=>_colors[findfirst(fn->occursin(fn, labels[v]), all_fns)] for v in con_vs) |
| 114 | + elseif color_by == :proc |
| 115 | + _colors = [all_colors[mod1(i, length(all_colors))] for i in 1:length(all_procs)] |
| 116 | + colors = Dict(v=>_colors[findfirst(proc->proc==task_procs[v], all_procs)] for v in con_vs) |
| 117 | + else |
| 118 | + throw(ArgumentError("Unknown `color_by` value: $color_by\nAllowed: :fn, :proc")) |
| 119 | + end |
| 120 | + |
| 121 | + str = is_directed(g) ? "digraph mygraph {\n" : "graph mygraph {\n" |
| 122 | + |
| 123 | + # Add raw arguments |
| 124 | + for (id, name) in arg_names |
| 125 | + str *= "a$id [label=\"$name\", shape=box]\n" |
| 126 | + end |
| 127 | + |
| 128 | + if times |
| 129 | + vertex_to_tid = Dict{Int,Int}(v=>k for (k,v) in tid_to_vertex) |
| 130 | + |
| 131 | + # Determine per-worker start times |
| 132 | + worker_start_times = Dict{Int,UInt64}() |
| 133 | + for w in keys(logs) |
| 134 | + start = typemax(UInt64) |
| 135 | + for idx in 1:length(logs[w][:core]) |
| 136 | + if logs[w][:core][idx].category == :compute && logs[w][:core][idx].kind == :start |
| 137 | + tid = logs[w][:id][idx].thunk_id |
| 138 | + haskey(tid_to_vertex, tid) || continue |
| 139 | + id = tid_to_vertex[tid] |
| 140 | + id in con_vs || continue |
| 141 | + start = min(start, logs[w][:core][idx].timestamp) |
| 142 | + end |
| 143 | + end |
| 144 | + worker_start_times[w] = start |
| 145 | + end |
| 146 | + |
| 147 | + # Determine per-task start and finish times |
| 148 | + start_times = Dict{Int,UInt64}() |
| 149 | + finish_times = Dict{Int,UInt64}() |
| 150 | + for w in keys(logs) |
| 151 | + start = typemax(UInt64) |
| 152 | + for idx in 1:length(logs[w][:core]) |
| 153 | + if logs[w][:core][idx].category == :compute |
| 154 | + tid = logs[w][:id][idx].thunk_id |
| 155 | + if logs[w][:core][idx].kind == :start |
| 156 | + start_times[tid] = logs[w][:core][idx].timestamp - worker_start_times[w] |
| 157 | + else |
| 158 | + finish_times[tid] = logs[w][:core][idx].timestamp - worker_start_times[w] |
| 159 | + end |
| 160 | + end |
| 161 | + end |
| 162 | + end |
| 163 | + end |
| 164 | + |
| 165 | + # Add argument moves |
| 166 | + for (tid, moves) in arg_moves |
| 167 | + for (pos, (pre_objid, post_objid)) in moves |
| 168 | + move_str = "a$pre_objid -> a$post_objid [label=\"move\"]\n" |
| 169 | + str *= move_str |
| 170 | + end |
| 171 | + end |
| 172 | + |
| 173 | + # Add tasks |
| 174 | + for v in con_vs |
| 175 | + if !disconnected && (v in discon_vs) |
| 176 | + continue |
| 177 | + end |
| 178 | + label = labels[v] |
| 179 | + color = colors[v] |
| 180 | + proc = task_procs[v] |
| 181 | + proc_str = '(' * Dagger.short_name(task_procs[v]) * ')' |
| 182 | + label_str = "$label\\n$proc_str" |
| 183 | + if times |
| 184 | + tid = vertex_to_tid[v] |
| 185 | + start_time = pretty_time(start_times[tid]; digits=times_digits) |
| 186 | + finish_time = pretty_time(finish_times[tid]; digits=times_digits) |
| 187 | + label_str *= "\\n[+$start_time -> +$finish_time]" |
| 188 | + end |
| 189 | + str *= "v$v [label=\"$label_str\", color=\"$color\", penwidth=2.0]\n" |
| 190 | + end |
| 191 | + |
| 192 | + # Add task dependencies |
| 193 | + edge_sep = is_directed(g) ? "->" : "--" |
| 194 | + for edge in edges(g) |
| 195 | + # FIXME: Label syncdeps with associated arguments and datadeps directions |
| 196 | + str *= "v$(src(edge)) $edge_sep v$(dst(edge)) [label=\"syncdep\"]\n" |
| 197 | + end |
| 198 | + |
| 199 | + # Add task arguments |
| 200 | + con_args = Vector{UInt}(collect(keys(arg_names))) |
| 201 | + for moves in values(arg_moves) |
| 202 | + for (_, (pre_objid, post_objid)) in moves |
| 203 | + push!(con_args, pre_objid) |
| 204 | + push!(con_args, post_objid) |
| 205 | + end |
| 206 | + end |
| 207 | + for (tid, args) in task_args |
| 208 | + id = tid_to_vertex[tid] |
| 209 | + id in con_vs || continue |
| 210 | + for (pos, arg) in args |
| 211 | + if !disconnected && !(arg in con_args) |
| 212 | + continue |
| 213 | + end |
| 214 | + arg_str = pos isa Int ? "arg $pos" : "kwarg $pos" |
| 215 | + str *= "a$arg $edge_sep v$id [label=\"$arg_str\"]\n" |
| 216 | + end |
| 217 | + end |
| 218 | + |
| 219 | + str *= "}\n" |
| 220 | + gv = GraphViz.Graph(str) |
| 221 | + GraphViz.layout!(gv; engine=layout_engine) |
| 222 | + return gv |
| 223 | +end |
| 224 | + |
| 225 | +end |
0 commit comments