Skip to content

Task names, and various GraphVizExt and PlotsExt improvements #560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 21, 2024
1 change: 1 addition & 0 deletions docs/src/logging-visualization.md
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ converted to a `Val` for dispatch purposes
(i.e. `render_logs(logs::Dict, :myrenderer)` -> `render_logs(logs, Val{:myrenderer}())`).

Built-in `IO` support exists for:
- `show_logs(io, logs, :graphviz)` to write a Graphviz dot graph of executed tasks and their dependencies
- `show_logs(io, logs, :chrome_trace)` to write a task execution timeline in the chrome-trace format (view in [perfetto web UI](https://ui.perfetto.dev/) or `about:tracing` in a chrome-based browser)

Built-in rendering support exists for:
212 changes: 15 additions & 197 deletions ext/GraphVizExt.jl
Original file line number Diff line number Diff line change
@@ -7,217 +7,35 @@ else
end

import Dagger
import Dagger: DTask, Chunk, Processor
import Dagger.TimespanLogging: Timespan
import Graphs: SimpleDiGraph, add_edge!, add_vertex!, inneighbors, outneighbors, vertices, is_directed, edges, nv, src, dst

function pretty_time(t; digits=3)
r(t) = round(t; digits)
if t > 1000^3
"$(r(t/(1000^3))) s"
elseif t > 1000^2
"$(r(t/(1000^2))) ms"
elseif t > 1000
"$(r(t/1000)) us"
else
"$(r(t)) ns"
end
end

"""
Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
color_by=:fn, layout_engine="dot",
times::Bool=true, times_digits::Integer=3)
render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
color_by=:fn, layout_engine="dot",
times::Bool=true, times_digits::Integer=3,
colors=Dagger.Viz.default_colors,
name_to_color=Dagger.Viz.name_to_color)
Render a graph of the task dependencies and data dependencies in `logs` using GraphViz.
Requires the following events enabled in `enable_logging!`: `taskdeps`, `tasknames`, `taskargs`, `taskargmoves`
Requires the `all_task_deps` event enabled in `enable_logging!`
Options:
- `disconnected`: If `true`, render disconnected vertices (tasks or arguments without upstream/downstream dependencies)
- `color_by`: How to color tasks; if `:fn`, then color by unique function name, if `:proc`, then color by unique processor
- `layout_engine`: The layout engine to use for GraphViz
- `layout_engine`: The layout engine to use for GraphViz rendering
- `times`: If `true`, annotate each task with its start and finish times
- `times_digits`: Number of digits to display in the time annotations
- `colors`: A list of colors to use for coloring tasks
- `name_to_color`: A function that maps task names to colors
"""
function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false,
color_by=:fn, layout_engine="dot",
times::Bool=true, times_digits::Integer=3)
# Lookup all relevant task/argument dependencies and values in logs
g = SimpleDiGraph()
tid_to_vertex = Dict{Int,Int}()
task_names = String[]
tid_to_proc = Dict{Int,Processor}()
arg_names = Dict{UInt,String}()
task_args = Dict{Int,Vector{Pair{Union{Int,Symbol},UInt}}}()
arg_moves = Dict{Int,Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}}()
for w in keys(logs)
for idx in 1:length(logs[w][:core])
category = logs[w][:core][idx].category
kind = logs[w][:core][idx].kind
id = logs[w][:id][idx]
if category == :add_thunk && kind == :start
id::NamedTuple
taskdeps = logs[w][:taskdeps][idx]::Pair{Int,Vector{Int}}
taskname = logs[w][:tasknames][idx]::String
tid, deps = taskdeps
add_vertex!(g)
tid_to_vertex[tid] = nv(g)
push!(task_names, taskname)
for dep in deps
add_edge!(g, tid_to_vertex[dep], nv(g))
end
if haskey(logs[w], :taskargs)
id, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector}
append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, id), args)
end
elseif category == :compute && kind == :start
id::NamedTuple
tid = id.thunk_id
proc = id.processor
tid_to_proc[tid] = proc
elseif category == :move && kind == :finish
if haskey(logs[w], :taskargmoves)
move_info = logs[w][:taskargmoves][idx]
move_info === nothing && continue
tid, pos, pre_objid, post_objid = move_info
v = get!(Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}, arg_moves, tid)
push!(v, pos => (pre_objid, post_objid))
end
elseif category == :data_annotation && kind == :start
id::NamedTuple
objid = id.objectid
name = id.name
arg_names[objid] = name
end
end
end
tids_sorted = map(first, sort(collect(tid_to_vertex); by=last))
task_procs = Processor[tid_to_proc[tid] for tid in tids_sorted]

# Find all connected and disconnected vertices
if !disconnected
discon_vs = filter(v->isempty(inneighbors(g, v)) && isempty(outneighbors(g, v)), vertices(g))
con_vs = filter(v->!in(v, discon_vs), vertices(g))
else
con_vs = vertices(g)
end

# Assign colors
labels = task_names
all_fns = unique(map(label->first(split(label, " ")), labels[con_vs]))
all_procs = unique(task_procs)
all_colors = ("red", "orange", "green", "blue", "purple", "pink", "silver")
if color_by == :fn
_colors = [all_colors[mod1(i, length(all_colors))] for i in 1:length(all_fns)]
colors = Dict(v=>_colors[findfirst(fn->occursin(fn, labels[v]), all_fns)] for v in con_vs)
elseif color_by == :proc
_colors = [all_colors[mod1(i, length(all_colors))] for i in 1:length(all_procs)]
colors = Dict(v=>_colors[findfirst(proc->proc==task_procs[v], all_procs)] for v in con_vs)
else
throw(ArgumentError("Unknown `color_by` value: $color_by\nAllowed: :fn, :proc"))
end

str = is_directed(g) ? "digraph mygraph {\n" : "graph mygraph {\n"

# Add raw arguments
for (id, name) in arg_names
str *= "a$id [label=\"$name\", shape=box]\n"
end

if times
vertex_to_tid = Dict{Int,Int}(v=>k for (k,v) in tid_to_vertex)

# Determine per-worker start times
worker_start_times = Dict{Int,UInt64}()
for w in keys(logs)
start = typemax(UInt64)
for idx in 1:length(logs[w][:core])
if logs[w][:core][idx].category == :compute && logs[w][:core][idx].kind == :start
tid = logs[w][:id][idx].thunk_id
haskey(tid_to_vertex, tid) || continue
id = tid_to_vertex[tid]
id in con_vs || continue
start = min(start, logs[w][:core][idx].timestamp)
end
end
worker_start_times[w] = start
end

# Determine per-task start and finish times
start_times = Dict{Int,UInt64}()
finish_times = Dict{Int,UInt64}()
for w in keys(logs)
start = typemax(UInt64)
for idx in 1:length(logs[w][:core])
if logs[w][:core][idx].category == :compute
tid = logs[w][:id][idx].thunk_id
if logs[w][:core][idx].kind == :start
start_times[tid] = logs[w][:core][idx].timestamp - worker_start_times[w]
else
finish_times[tid] = logs[w][:core][idx].timestamp - worker_start_times[w]
end
end
end
end
end

# Add argument moves
for (tid, moves) in arg_moves
for (pos, (pre_objid, post_objid)) in moves
move_str = "a$pre_objid -> a$post_objid [label=\"move\"]\n"
str *= move_str
end
end

# Add tasks
for v in con_vs
if !disconnected && (v in discon_vs)
continue
end
label = labels[v]
color = colors[v]
proc = task_procs[v]
proc_str = '(' * Dagger.short_name(task_procs[v]) * ')'
label_str = "$label\\n$proc_str"
if times
tid = vertex_to_tid[v]
start_time = pretty_time(start_times[tid]; digits=times_digits)
finish_time = pretty_time(finish_times[tid]; digits=times_digits)
label_str *= "\\n[+$start_time -> +$finish_time]"
end
str *= "v$v [label=\"$label_str\", color=\"$color\", penwidth=2.0]\n"
end

# Add task dependencies
edge_sep = is_directed(g) ? "->" : "--"
for edge in edges(g)
# FIXME: Label syncdeps with associated arguments and datadeps directions
str *= "v$(src(edge)) $edge_sep v$(dst(edge)) [label=\"syncdep\"]\n"
end

# Add task arguments
con_args = Vector{UInt}(collect(keys(arg_names)))
for moves in values(arg_moves)
for (_, (pre_objid, post_objid)) in moves
push!(con_args, pre_objid)
push!(con_args, post_objid)
end
end
for (tid, args) in task_args
id = tid_to_vertex[tid]
id in con_vs || continue
for (pos, arg) in args
if !disconnected && !(arg in con_args)
continue
end
arg_str = pos isa Int ? "arg $pos" : "kwarg $pos"
str *= "a$arg $edge_sep v$id [label=\"$arg_str\"]\n"
end
end

str *= "}\n"
gv = GraphViz.Graph(str)
times::Bool=true, times_digits::Integer=3,
colors=Dagger.Viz.default_colors,
name_to_color=Dagger.Viz.name_to_color)
dot = Dagger.Viz.logs_to_dot(logs; disconnected, times, times_digits,
color_by, colors, name_to_color)
gv = GraphViz.Graph(dot)
GraphViz.layout!(gv; engine=layout_engine)
return gv
end
5 changes: 2 additions & 3 deletions ext/JSON3Ext.jl
Original file line number Diff line number Diff line change
@@ -37,8 +37,7 @@ function logs_to_chrome_trace(logs::Dict)
if !haskey(execution_logs, tid)
execution_logs[tid] = Dict{Symbol,Any}()
end
taskname = logs[w][:tasknames][start_idx]
fname = first(split(taskname, ' '))
fname = logs[w][:taskfuncnames][start_idx]
execution_logs[tid][:name] = fname
end
end
@@ -63,4 +62,4 @@ function Dagger.show_logs(io::IO, logs::Dict, ::Val{:chrome_trace})
JSON3.write(io, logs_to_chrome_trace(logs))
end

end # module JSON3Ext
end # module JSON3Ext
202 changes: 151 additions & 51 deletions ext/PlotsExt.jl
Original file line number Diff line number Diff line change
@@ -12,81 +12,181 @@ import Dagger
import Dagger: DTask, Chunk, Processor
import Dagger.TimespanLogging: Timespan

function logs_to_df(logs::Dict)
df = DataFrame(proc=Processor[], proc_name=String[], tid=Int[], t_start=UInt64[], t_end=UInt64[])
function logs_to_df(logs::Dict, ::Val{:execution};
colors, name_to_color, color_by)
# Generate function names
fn_names = Dict{Int, String}()
dtask_names = Dict{UInt, String}()
uid_to_tid = Dict{UInt, Int}()
for w in keys(logs)
for idx in 1:length(logs[w][:core])
category = logs[w][:core][idx].category::Symbol
kind = logs[w][:core][idx].kind::Symbol
if category == :add_thunk && kind == :start
tid = logs[w][:id][idx].thunk_id::Int
if haskey(logs[w], :taskfuncnames)
fn_names[tid] = logs[w][:taskfuncnames][idx]::String
else
@warn "Task names missing from logs"
fn_names[tid] = "unknown"
end
if haskey(logs[w], :taskuidtotid)
uid_tid = logs[w][:taskuidtotid][idx]
if uid_tid !== nothing
uid, tid = uid_tid::Pair{UInt,Int}
uid_to_tid[uid] = tid
end
end
elseif category == :data_annotation && kind == :start
id = logs[w][:id][idx]::NamedTuple
name = String(id.name)
obj = id.objectid::Dagger.LoggedMutableObject
objid = obj.objid
if obj.kind == :task
dtask_names[objid] = name
end
end
end
end

# Process DTasks-to-Thunk mappings
for (uid, tid) in uid_to_tid
# Patch in custom name
if haskey(dtask_names, uid)
fn_names[tid] = dtask_names[uid]
end
end

# FIXME: Color eltype
df = DataFrame(proc=Processor[], proc_name=String[], fn_name=String[], tid=Int[], t_start=UInt64[], t_end=UInt64[], color=Any[])
Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx
category = logs[w][:core][start_idx].category
if category == :compute
proc = logs[w][:id][start_idx].processor
proc = logs[w][:id][start_idx].processor::Processor
proc_name = Dagger.short_name(proc)
tid = logs[w][:id][start_idx].thunk_id
t_start = logs[w][:core][start_idx].timestamp
t_end = logs[w][:core][finish_idx].timestamp
push!(df, (;proc, proc_name, tid, t_start, t_end))
tid = logs[w][:id][start_idx].thunk_id::Int
fn_name = fn_names[tid]
t_start = logs[w][:core][start_idx].timestamp::UInt64
t_end = logs[w][:core][finish_idx].timestamp::UInt64
if color_by == :fn
color = name_to_color(fn_name, colors)
elseif color_by == :proc
color = name_to_color(proc_name, colors)
else
throw(ArgumentError("Invalid color_by value: $(repr(color_by))"))
end
push!(df, (;proc, proc_name, fn_name, tid, t_start, t_end, color))
end
end
return df
end

# Implementation by Przemyslaw Szufel
function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt_ps})
df = logs_to_df(logs)

proc_names = sort!(unique(df.proc_name))
proc_idx = Dict{String,Int}()
for name in proc_names
proc_idx[name] = findfirst(==(name), proc_names)
end
proc_idxs = map(name->proc_idx[name], proc_names)
tvals = zeros(UInt64, length(proc_names))
plt = bar(orientation=:h, yticks=(1:length(proc_names), proc_names), linewidth=0,yflip=true,color=:green,legend=nothing)
xlabel!(plt, "Time in seconds")
dfc = deepcopy(df)
while nrow(dfc) > 0
rowslast = DataFrame([g[findmax(g.t_end)[2],:] for g in groupby(dfc, :proc_name)])
tvals .= .0
for i in 1:nrow(rowslast)
tvals[proc_idx[rowslast.proc_name[i]]] = rowslast.t_end[i]
function logs_to_df(logs::Dict, ::Val{:processor};
colors, name_to_color, kwargs...)
# Collect processor events
# FIXME: Color eltype
df = DataFrame(proc=Processor[], proc_name=String[], category=String[], t_start=UInt64[], t_end=UInt64[], color=Any[])
Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx
category = logs[w][:core][start_idx].category
if category in (:compute, :storage_wait, :storage_safe_scan, :proc_run_fetch, :proc_steal_local)
proc = logs[w][:id][start_idx].processor::Processor
proc_name = Dagger.short_name(proc)
t_start = logs[w][:core][start_idx].timestamp::UInt64
t_end = logs[w][:core][finish_idx].timestamp::UInt64
category_str = string(category)
color = name_to_color(category_str, colors)
push!(df, (;proc, proc_name, category=category_str, t_start, t_end, color))
end
#setindex!.(Ref(tvals), rowslast.t_end, getindex.(proc_idx, rowslast.proc_name))
bar!(plt, tvals[proc_idxs], orientation=:h, linewidth=0.5,yflip=true,color=:green)
tvals .= .0
for i in 1:nrow(rowslast)
tvals[proc_idx[rowslast.proc_name[i]]] = rowslast.t_start[i]
end
return df
end
function logs_to_df(logs::Dict, ::Val{:scheduler};
colors, name_to_color, kwargs...)
# Collect scheduler events
# FIXME: Color eltype
df = DataFrame(category=String[], t_start=UInt64[], t_end=UInt64[], color=Any[])
Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx
category = logs[w][:core][start_idx].category
if category in (:scheduler_init, :scheduler_exit, :init_proc, :remove_proc, :add_thunk, :handle_fault, :schedule, :fire, :enqueue, :take, :finish)
t_start = logs[w][:core][start_idx].timestamp::UInt64
t_end = logs[w][:core][finish_idx].timestamp::UInt64
category_str = string(category)
color = name_to_color(category_str, colors)
push!(df, (;category=category_str, t_start, t_end, color))
end
#setindex!.(Ref(tvals), rowslast.t_start, proc_idx[rowslast.proc_name])
bar!(plt, tvals[proc_idxs], orientation=:h, linewidth=0.5,linecolor=:white,yflip=true,color=:white)
annotate!.(Ref(plt),(rowslast.t_start .+ rowslast.t_end) ./ 2, findfirst.( .==(rowslast.proc_name), Ref(proc_names)), text.(string.(rowslast.tid),9,rotation=90 ))
dfc = dfc[ .! (dfc.tid .∈ Ref(rowslast.tid) ), : ]
end
# FIXME: theoretic_optimal = simulate_polling(df)[1] + minimum(df.t_start)
theoretic_optimal = minimum(df.t_start)
plot!(plt, [theoretic_optimal,theoretic_optimal], [0, length(proc_names)+1],width=3,color=:black,style=:dot)
return plt
return df
end
logs_to_df(logs::Dict, ::Val{target}; kwargs...) where target =
throw(ArgumentError("Invalid target: $(repr(target))"))

# Implementation adapted from:
# https://discourse.julialang.org/t/how-to-make-a-gantt-plot-with-plots-jl/95165/7
"""
Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; kwargs...)
Dagger.render_logs(logs::Dict, ::Val{:plots_gantt};
target=:execution,
colors, name_to_color, color_by=:fn,
kwargs...)
Render a Gantt chart of task execution in `logs` using Plots.
Render a Gantt chart of task execution in `logs` using Plots. `kwargs` are passed to `plot` directly.
Keyword arguments affect rendering behavior:
- `target`: Which aspect of the logs to render. May be one of `:execution`, `:processor`, or `:scheduler`.
- `colors`: A list of colors to use for rendering.
- `name_to_color`: A function mapping names to colors.
- `color_by`: Whether to color by function name (`:fn`) or processor name (`:proc`).
- `kwargs` are passed to `plot` directly.
"""
function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; kwargs...)
df = logs_to_df(logs)
function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt};
target=:execution,
colors=Dagger.Viz.default_colors,
name_to_color=Dagger.Viz.name_to_color,
color_by=:fn, kwargs...)
df = logs_to_df(logs, Val{target}(); colors, name_to_color, color_by)
y_elem = if target == :execution || target == :processor
:proc_name
elseif target == :scheduler
:category
end
ylabel = if target == :execution || target == :processor
"Processor"
elseif target == :scheduler
"Category"
end
sort!(df, y_elem)

global_t_start, global_t_end = extrema(logs[1][:core][idx].timestamp for idx in 1:length(logs[1][:core]))

rect(w, h, x, y) = Shape(x .+ [0,w,w,0], y .+ [0,0,h,h])

t_init = minimum(df.t_start)
t_start = (df.t_start .- t_init) ./ 1e9
t_end = (df.t_end .- t_init) ./ 1e9
t_start = (df.t_start .- global_t_start) ./ 1e9
t_end = (df.t_end .- global_t_start) ./ 1e9
duration = t_end .- t_start
u = unique(df.proc_name)
u = unique(getproperty(df, y_elem))
dy = Dict(u .=> 1:length(u))
r = [rect(t1, 1, t2, dy[t3]) for (t1,t2,t3) in zip(duration, t_start, df.proc_name)]
r = [rect(t1, 1, t2, dy[t3]) for (t1,t2,t3) in zip(duration, t_start, getproperty(df, y_elem))]
if target == :execution
labels = permutedims(df.fn_name)
elseif target == :processor
labels = permutedims(df.category)
elseif target == :scheduler
labels = nothing
end

if labels !== nothing
# Deduplicate labels
for idx in 1:length(labels)
if findfirst(other_idx->labels[other_idx]==labels[idx], 1:length(labels)) < idx
labels[idx] = ""
end
end
end

# FIXME: Colors
return plot(r; #=c=permutedims(df.color),=# yticks=(1.5:(nrow(df) + 0.5), u), xlabel="Time (seconds)", ylabel="Processor", labels=false, kwargs...)
return plot(r; color=permutedims(df.color), labels,
yticks=(1.5:(nrow(df) + 0.5), u),
xlabel="Time (seconds)", ylabel,
xlim=(0.0, (global_t_end - global_t_start) / 1e9),
legendalpha=0,
kwargs...)
end

end # module PlotsExt
5 changes: 2 additions & 3 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
@@ -88,14 +88,13 @@ include("array/cholesky.jl")
include("array/lu.jl")
include("array/random.jl")

# Visualization
# Logging and Visualization
include("visualization.jl")
include("ui/gantt-common.jl")
include("ui/gantt-text.jl")

# Logging
include("utils/logging-events.jl")
include("utils/logging.jl")
include("utils/viz.jl")

# Precompilation
import PrecompileTools: @compile_workload
7 changes: 7 additions & 0 deletions src/array/darray.jl
Original file line number Diff line number Diff line change
@@ -524,6 +524,13 @@ function Base.:(==)(x::AbstractArray{T,N}, y::ArrayOp{S,N}) where {T,S,N}
return collect(x) == y
end

function logs_annotate!(ctx::Context, A::DArray, name::Union{String,Symbol})
for (idx, chunk) in enumerate(A.chunks)
sd = A.subdomains[idx]
Dagger.logs_annotate!(ctx, chunk, name*'['*join(sd.indexes, ',')*']')
end
end

# TODO: Allow `f` to return proc
mapchunk(f, chunk) = tochunk(f(poolget(chunk.handle)))
function mapchunks(f, d::DArray{T,N,F}) where {T,N,F}
4 changes: 2 additions & 2 deletions src/datadeps.jl
Original file line number Diff line number Diff line change
@@ -413,7 +413,7 @@ function generate_slot!(state::DataDepsState, dest_space, data)
w = only(unique(map(get_parent, collect(processors(dest_space))))).pid
ctx = Sch.eager_context()
id = rand(Int)
timespan_start(ctx, :move, (;thunk_id=0, id, processor=to_proc), (;f=nothing, data))
timespan_start(ctx, :move, (;thunk_id=0, id, position=0, processor=to_proc), (;f=nothing, data))
dest_space_args[data] = remotecall_fetch(w, from_proc, to_proc, data) do from_proc, to_proc, data
data_converted = move(from_proc, to_proc, data)
data_chunk = tochunk(data_converted, to_proc)
@@ -422,7 +422,7 @@ function generate_slot!(state::DataDepsState, dest_space, data)
@assert orig_space != memory_space(data_chunk) "space preserved! $orig_space != $(memory_space(data_chunk)) ($(typeof(data)) vs. $(typeof(data_chunk))), spaces ($orig_space -> $dest_space)"
return data_chunk
end
timespan_finish(ctx, :move, (;thunk_id=0, id, processor=to_proc), (;f=nothing, data=dest_space_args[data]))
timespan_finish(ctx, :move, (;thunk_id=0, id, position=0, processor=to_proc), (;f=nothing, data=dest_space_args[data]))
end
return dest_space_args[data]
end
104 changes: 58 additions & 46 deletions src/sch/Sch.jl

Large diffs are not rendered by default.

23 changes: 20 additions & 3 deletions src/submission.jl
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{

id = next_id()

timespan_start(ctx, :add_thunk, (;thunk_id=id), (;f, args, options))
timespan_start(ctx, :add_thunk, (;thunk_id=id), (;f, args, options, uid))

# Lookup DTask/ThunkID -> Thunk
old_args = copy(args)
@@ -38,6 +38,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
end::Union{Vector{Any},Nothing}
lock(Sch.EAGER_ID_MAP) do id_map
for (idx, (pos, arg)) in enumerate(args)
# FIXME: Switch to Union{Symbol,Int} to preserve positional information
pos::Union{Symbol,Nothing}
newarg = if arg isa DTask
arg_uid = arg.uid
@@ -128,7 +129,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
put!(state.chan, Sch.RescheduleSignal())
end

timespan_finish(ctx, :add_thunk, (;thunk_id=id), (;f, args, options))
timespan_finish(ctx, :add_thunk, (;thunk_id=id), (;f, args, options, uid))

return thunk_id
end
@@ -223,10 +224,13 @@ function eager_spawn(spec::DTaskSpec)
future = ThunkFuture()
finalizer_ref = poolset(DTaskFinalizer(uid); device=MemPool.CPURAMDevice())

# Return unlaunched DTask
# Create unlaunched DTask
return DTask(uid, future, finalizer_ref)
end
function eager_launch!((spec, task)::Pair{DTaskSpec,DTask})
# Assign a name, if specified
eager_assign_name!(spec, task)

# Lookup DTask -> ThunkID
local args, options
lock(Sch.EAGER_ID_MAP) do id_map
@@ -243,6 +247,11 @@ end
function eager_launch!(specs::Vector{Pair{DTaskSpec,DTask}})
ntasks = length(specs)

# Assign a name, if specified
for (spec, task) in specs
eager_assign_name!(spec, task)
end

uids = [task.uid for (_, task) in specs]
futures = [task.future for (_, task) in specs]
finalizer_refs = [task.finalizer_ref for (_, task) in specs]
@@ -262,3 +271,11 @@ function eager_launch!(specs::Vector{Pair{DTaskSpec,DTask}})
task.thunk_ref = thunk_ids[i].ref
end
end

function eager_assign_name!(spec::DTaskSpec, task::DTask)
# Assign a name, if specified
if haskey(spec.options, :name)
Dagger.logs_annotate!(task, spec.options.name)
spec.options = (;filter(x -> x[1] != :name, Base.pairs(spec.options))...)
end
end
125 changes: 101 additions & 24 deletions src/utils/logging-events.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
struct LoggedMutableObject
objid::UInt
kind::Symbol
end

module Events

import ..Dagger
@@ -102,54 +107,76 @@ function (::TaskNames)(ev::Event{:start})
if ev.category == :add_thunk
id = ev.id.thunk_id
f = Dagger.chunktype(ev.timeline.f)
if hasfield(f, :instance) && isdefined(f, :instance)
if hasproperty(f, :instance) && isdefined(f, :instance)
f = f.instance
end
return "$(nameof(f)) [$id]"
return "$(func_name(f)) [$id]"
end
return
end
(td::TaskNames)(ev::Event{:finish}) = nothing
func_name(f::Function) = nameof(f)
func_name(x) = repr(x)
func_name(::Dagger.ExpandedBroadcast{F}) where F = Symbol('.', nameof(F))

"""
TaskFunctionNames
Records the function name of each task.
"""
struct TaskFunctionNames end
function (::TaskFunctionNames)(ev::Event{:start})
if ev.category == :add_thunk
f = Dagger.chunktype(ev.timeline.f)
if hasproperty(f, :instance) && isdefined(f, :instance)
f = f.instance
end
return String(func_name(f))
end
return
end
(td::TaskFunctionNames)(ev::Event{:finish}) = nothing

"""
TaskArguments
Records the raw (mutable) arguments of each submitted task.
"""
struct TaskArguments end
function (::TaskArguments)(ev::Event{:start})
if ev.category == :add_thunk
args = Pair{Union{Symbol,Int},UInt}[]
for (idx, (pos, arg)) in enumerate(ev.timeline.args)
pos_idx = pos === nothing ? idx : pos
arg = Dagger.unwrap_weak_checked(arg)
if ismutable(arg)
push!(args, pos_idx => objectid(arg))
end
(::TaskArguments)(ev::Event{:start}) = nothing
function (ta::TaskArguments)(ev::Event{:finish})
if ev.category == :move
args = Pair{Union{Symbol,Int},Dagger.LoggedMutableObject}[]
thunk_id = ev.id.thunk_id::Int
pos = ev.id.position::Union{Symbol,Int}
arg = ev.timeline.data
if ismutable(arg)
push!(args, pos => Dagger.objectid_or_chunkid(arg))
end
return ev.id.thunk_id => args
return thunk_id => args
end
return
end
(ta::TaskArguments)(ev::Event{:finish}) = nothing

"""
TaskArgumentMoves
Records any `move`-derived copies of arguments of each task.
"""
struct TaskArgumentMoves
pre_move_args::Dict{Int,Dict{Union{Int,Symbol},UInt}}
pre_move_args::Dict{Int,Dict{Union{Int,Symbol},Dagger.LoggedMutableObject}}
end
TaskArgumentMoves() =
TaskArgumentMoves(Dict{Int,Dict{Union{Int,Symbol},UInt}}())
TaskArgumentMoves(Dict{Int,Dict{Union{Int,Symbol},Dagger.LoggedMutableObject}}())
init_similar(::TaskArgumentMoves) = TaskArgumentMoves()
function (ta::TaskArgumentMoves)(ev::Event{:start})
if ev.category == :move
data = ev.timeline.data
if ismutable(data)
d = get!(Dict{Union{Int,Symbol},UInt}, ta.pre_move_args, ev.id.thunk_id)
d[ev.id.id] = objectid(data)
thunk_id = ev.id.thunk_id::Int
position = ev.id.position::Union{Symbol,Int}
d = get!(Dict{Union{Int,Symbol},Dagger.LoggedMutableObject}, ta.pre_move_args, thunk_id)
d[position] = Dagger.objectid_or_chunkid(data)
end
end
return
@@ -158,22 +185,42 @@ function (ta::TaskArgumentMoves)(ev::Event{:finish})
if ev.category == :move
post_data = ev.timeline.data
if ismutable(post_data)
if haskey(ta.pre_move_args, ev.id.thunk_id)
d = ta.pre_move_args[ev.id.thunk_id]
if haskey(d, ev.id.id)
pre_data = d[ev.id.id]
return ev.id.thunk_id, ev.id.id, pre_data, objectid(post_data)
thunk_id = ev.id.thunk_id::Int
position = ev.id.position::Union{Symbol,Int}
if haskey(ta.pre_move_args, thunk_id)
d = ta.pre_move_args[thunk_id]
if haskey(d, position)
pre_data = d[position]
return thunk_id, position, pre_data, Dagger.objectid_or_chunkid(post_data)
else
@warn "No TID $(ev.id.thunk_id), ID $(ev.id.id)"
@warn "No TID $(thunk_id), Position $(position)"
end
else
@warn "No TID $(ev.id.thunk_id)"
@warn "No TID $(thunk_id)"
end
end
end
return
end

"""
TaskResult
Records the raw (mutable) return value of each submitted task.
"""
struct TaskResult end
(::TaskResult)(ev::Event{:start}) = nothing
function (ta::TaskResult)(ev::Event{:finish})
if ev.category == :compute
thunk_id = ev.id.thunk_id::Int
result = ev.timeline.result
if ismutable(result)
return thunk_id => Dagger.objectid_or_chunkid(result)
end
end
return
end

"""
TaskDependencies
@@ -207,4 +254,34 @@ function (::TaskDependencies)(ev::Event{:start})
end
(td::TaskDependencies)(ev::Event{:finish}) = nothing

"""
TaskUIDtoTID
Maps DTask UIDs to Thunk TIDs.
"""
struct TaskUIDtoTID end
function (tut::TaskUIDtoTID)(ev::Event{:start})
if ev.category == :add_thunk
thunk_id = ev.id.thunk_id::Int
uid = ev.timeline.uid::UInt
return uid => thunk_id
end
return
end
(tut::TaskUIDtoTID)(ev::Event{:finish}) = nothing

struct TaskToChunk end
(td::TaskToChunk)(ev::Event{:start}) = nothing
function (::TaskToChunk)(ev::Event{:finish})
if ev.category == :finish
thunk_id = ev.id.thunk_id::Int
result = ev.timeline.result
if ismutable(result)
chunk_id = Dagger.objectid_or_chunkid(result)
return thunk_id => chunk_id
end
end
return
end

end # module Events
54 changes: 45 additions & 9 deletions src/utils/logging.jl
Original file line number Diff line number Diff line change
@@ -8,28 +8,50 @@ Enables logging globally for all workers. Certain core events are always enabled
Extra events:
- `metrics::Bool`: Enables various utilization and allocation metrics
- `timeline::Bool`: Enables raw "timeline" values, which are event-specific; not recommended except for debugging
- `all_task_deps::Bool`: Enables all task dependency-related logging
- `tasknames::Bool`: Enables generating unique task names for each task
- `taskfuncnames::Bool`: Enables reporting of task function names for each task
- `taskdeps::Bool`: Enables reporting of upstream task dependencies (as task IDs) for each task argument
- `taskargs::Bool`: Enables reporting of upstream non-task dependencies (as `objectid` hash) for each task argument
- `taskargmoves::Bool`: Enables reporting of copies of upstream dependencies (as original and copy `objectid` hashes) for each task argument
- `taskresult::Bool`: Enables reporting of task result values (as `objectid` hash)
- `taskuidtotid::Bool`: Enables reporting of task UID-to-TID mappings
- `tasktochunk::Bool`: Enables reporting of DTask-to-Chunk mappings
- `profile::Bool`: Enables profiling of task execution; not currently recommended, as it adds significant overhead
"""
function enable_logging!(;metrics::Bool=true,
timeline::Bool=false,
all_task_deps::Bool=false,
tasknames::Bool=true,
taskfuncnames::Bool=false,
taskdeps::Bool=true,
taskargs::Bool=false,
taskargmoves::Bool=false,
taskresult::Bool=false,
taskuidtotid::Bool=false,
tasktochunk::Bool=false,
profile::Bool=false)
ml = TimespanLogging.MultiEventLog()
ml[:core] = TimespanLogging.Events.CoreMetrics()
ml[:id] = TimespanLogging.Events.IDMetrics()
if timeline
ml[:timeline] = TimespanLogging.Events.TimelineMetrics()
end
if all_task_deps
taskfuncnames = true
taskdeps = true
taskargs = true
taskargmoves = true
taskresult = true
taskuidtotid = true
tasktochunk = true
end
if tasknames
ml[:tasknames] = Dagger.Events.TaskNames()
end
if taskfuncnames
ml[:taskfuncnames] = Dagger.Events.TaskFunctionNames()
end
if taskdeps
ml[:taskdeps] = Dagger.Events.TaskDependencies()
end
@@ -39,6 +61,15 @@ function enable_logging!(;metrics::Bool=true,
if taskargmoves
ml[:taskargmoves] = Dagger.Events.TaskArgumentMoves()
end
if taskresult
ml[:taskresult] = Dagger.Events.TaskResult()
end
if taskuidtotid
ml[:taskuidtotid] = Dagger.Events.TaskUIDtoTID()
end
if tasktochunk
ml[:tasktochunk] = Dagger.Events.TaskToChunk()
end
if profile
ml[:profile] = DaggerWebDash.ProfileMetrics()
end
@@ -87,14 +118,12 @@ function logs_event_pairs(f, logs::Dict)
continue
end
id::NamedTuple
if haskey(id, :thunk_id)
event_key = (category, id)
if kind == :start
running_events[event_key] = idx
else
event_start_idx = running_events[event_key]
f(w, event_start_idx, idx)
end
event_key = (category, id)
if kind == :start
running_events[event_key] = idx
elseif haskey(running_events, event_key)
event_start_idx = running_events[event_key]
f(w, event_start_idx, idx)
end
end
end
@@ -106,9 +135,16 @@ utilize for display purposes.
"""
function logs_annotate!(ctx::Context, arg, name::Union{String,Symbol})
ismutable(arg) || throw(ArgumentError("Argument must be mutable to be annotated"))
Dagger.TimespanLogging.timespan_start(ctx, :data_annotation, (;objectid=objectid(arg), name), nothing)
Dagger.TimespanLogging.timespan_start(ctx, :data_annotation, (;objectid=objectid_or_chunkid(arg), name), nothing)
# TODO: Remove redundant log event
Dagger.TimespanLogging.timespan_finish(ctx, :data_annotation, nothing, nothing)
end
logs_annotate!(arg, name::Union{String,Symbol}) =
logs_annotate!(Dagger.Sch.eager_context(), arg, name)

objectid_or_chunkid(@nospecialize(x)) =
LoggedMutableObject(objectid(x), :object)
objectid_or_chunkid(@nospecialize(x::Chunk)) =
LoggedMutableObject(hash(x), :chunk)
objectid_or_chunkid(@nospecialize(x::DTask)) =
LoggedMutableObject(x.uid, :task)
406 changes: 406 additions & 0 deletions src/utils/viz.jl

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/visualization.jl
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ show_logs(t, logs, vizmode::Symbol; options...) =
show_logs(t, logs, Val{vizmode}(); options...)
function show_logs(logs, ::Val{vizmode}; options...) where vizmode
iob = IOBuffer()
show_logs(iob, t, Val{vizmode}(); options...)
show_logs(iob, logs, Val{vizmode}(); options...)
return String(take!(iob))
end
function show_logs(t, logs, ::Val{vizmode}; options...) where vizmode
8 changes: 5 additions & 3 deletions test/logging.jl
Original file line number Diff line number Diff line change
@@ -100,7 +100,9 @@ import Colors, GraphViz, DataFrames, Plots, JSON3
for idx in 1:length(logs[w][:core])
category = logs[w][:core][idx].category
if category in (:scheduler_init, :scheduler_exit, :take, :assign_procs)
@test logs[w][:id][idx] === nothing
@test logs[w][:id][idx] isa NamedTuple
@test haskey(logs[w][:id][idx], :uid)
@test logs[w][:id][idx].uid isa UInt
end
if category in (:add_thunk, :schedule, :finish, :move, :compute, :storage_wait, :storage_safe_scan, :enqueue)
@test logs[w][:id][idx] isa NamedTuple
@@ -168,7 +170,7 @@ import Colors, GraphViz, DataFrames, Plots, JSON3

if VERSION >= v"1.9-"
@testset "show_plan/render_plan built-in" begin
Dagger.enable_logging!()
Dagger.enable_logging!(;all_task_deps=true)

A = distribute(rand(4, 4), Blocks(8, 8))
sum(A)
@@ -182,7 +184,7 @@ import Colors, GraphViz, DataFrames, Plots, JSON3

# JSON3Ext
@test Dagger.render_logs(logs, :chrome_trace) !== nothing

Dagger.disable_logging!()
end
end