From a6feb8da22af8d88a255d4495c120b6d2cd74e87 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Sat, 29 Jun 2024 18:34:53 -0400 Subject: [PATCH 01/12] logging: Fix TaskNames name calculation --- src/utils/logging-events.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/logging-events.jl b/src/utils/logging-events.jl index f2de153b1..fa31af49c 100644 --- a/src/utils/logging-events.jl +++ b/src/utils/logging-events.jl @@ -102,7 +102,7 @@ function (::TaskNames)(ev::Event{:start}) if ev.category == :add_thunk id = ev.id.thunk_id f = Dagger.chunktype(ev.timeline.f) - if hasfield(f, :instance) && isdefined(f, :instance) + if hasproperty(f, :instance) && isdefined(f, :instance) f = f.instance end return "$(nameof(f)) [$id]" From eb9c651287630b622e1fd7faf3a260c79e725253 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Sun, 7 Jul 2024 13:53:14 +0200 Subject: [PATCH 02/12] logging/Sch/GraphVizExt: Various :graphviz improvements --- ext/GraphVizExt.jl | 16 ++++++++++----- src/datadeps.jl | 4 ++-- src/sch/Sch.jl | 36 ++++++++++++++++++++------------ src/submission.jl | 1 + src/thunk.jl | 2 ++ src/utils/logging-events.jl | 41 ++++++++++++++++++++----------------- 6 files changed, 61 insertions(+), 39 deletions(-) diff --git a/ext/GraphVizExt.jl b/ext/GraphVizExt.jl index cd26be1c7..cc1e4479a 100644 --- a/ext/GraphVizExt.jl +++ b/ext/GraphVizExt.jl @@ -65,18 +65,19 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, tid_to_vertex[tid] = nv(g) push!(task_names, taskname) for dep in deps + haskey(tid_to_vertex, dep) || continue add_edge!(g, tid_to_vertex[dep], nv(g)) end - if haskey(logs[w], :taskargs) - id, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector} - append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, id), args) - end elseif category == :compute && kind == :start id::NamedTuple tid = id.thunk_id proc = id.processor tid_to_proc[tid] = proc elseif category == :move && kind == :finish + if haskey(logs[w], :taskargs) + id, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector} + append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, id), args) + end if haskey(logs[w], :taskargmoves) move_info = logs[w][:taskargmoves][idx] move_info === nothing && continue @@ -88,7 +89,7 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, id::NamedTuple objid = id.objectid name = id.name - arg_names[objid] = name + arg_names[objid] = String(name) end end end @@ -163,8 +164,12 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, end # Add argument moves + seen_moves = Set{Tuple{UInt,UInt}}() for (tid, moves) in arg_moves for (pos, (pre_objid, post_objid)) in moves + pre_objid == post_objid && continue + (pre_objid, post_objid) in seen_moves && continue + push!(seen_moves, (pre_objid, post_objid)) move_str = "a$pre_objid -> a$post_objid [label=\"move\"]\n" str *= move_str end @@ -205,6 +210,7 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, end end for (tid, args) in task_args + haskey(tid_to_vertex, tid) || continue id = tid_to_vertex[tid] id in con_vs || continue for (pos, arg) in args diff --git a/src/datadeps.jl b/src/datadeps.jl index 3c1110820..43c4c3848 100644 --- a/src/datadeps.jl +++ b/src/datadeps.jl @@ -413,7 +413,7 @@ function generate_slot!(state::DataDepsState, dest_space, data) w = only(unique(map(get_parent, collect(processors(dest_space))))).pid ctx = Sch.eager_context() id = rand(Int) - timespan_start(ctx, :move, (;thunk_id=0, id, processor=to_proc), (;f=nothing, data)) + timespan_start(ctx, :move, (;thunk_id=0, id, position=0, processor=to_proc), (;f=nothing, data)) dest_space_args[data] = remotecall_fetch(w, from_proc, to_proc, data) do from_proc, to_proc, data data_converted = move(from_proc, to_proc, data) data_chunk = tochunk(data_converted, to_proc) @@ -422,7 +422,7 @@ function generate_slot!(state::DataDepsState, dest_space, data) @assert orig_space != memory_space(data_chunk) "space preserved! $orig_space != $(memory_space(data_chunk)) ($(typeof(data)) vs. $(typeof(data_chunk))), spaces ($orig_space -> $dest_space)" return data_chunk end - timespan_finish(ctx, :move, (;thunk_id=0, id, processor=to_proc), (;f=nothing, data=dest_space_args[data])) + timespan_finish(ctx, :move, (;thunk_id=0, id, position=0, processor=to_proc), (;f=nothing, data=dest_space_args[data])) end return dest_space_args[data] end diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index 3cd16be12..d89f22417 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -1049,13 +1049,21 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state) ids = Int[0] data = Any[thunk.f] - positions = Union{Symbol,Nothing}[] + positions = Union{Symbol,Int}[0] + arg_ctr = 1 for (idx, pos_x) in enumerate(thunk.inputs) pos, x = pos_x x = unwrap_weak_checked(x) push!(ids, istask(x) ? x.id : -idx) push!(data, istask(x) ? state.cache[x] : x) - push!(positions, pos) + if pos !== nothing + # Keyword arg + push!(positions, pos) + else + # Positional arg + push!(positions, arg_ctr) + arg_ctr += 1 + end end toptions = thunk.options !== nothing ? thunk.options : ThunkOptions() options = merge(ctx.options, toptions) @@ -1537,14 +1545,14 @@ function do_task(to_proc, task_desc) # Initiate data transfers for function and arguments transfer_time = Threads.Atomic{UInt64}(0) transfer_size = Threads.Atomic{UInt64}(0) - _data, _ids = if meta - (Any[first(data)], Int[first(ids)]) # always fetch function + _data, _ids, _positions = if meta + (Any[first(data)], Int[first(ids)], Union{Symbol,Int}[first(positions)]) # always fetch function else - (data, ids) + (data, ids, positions) end - fetch_tasks = map(Iterators.zip(_data,_ids)) do (x, id) + fetch_tasks = map(Iterators.zip(_data, _ids, _positions)) do (x, id, position) Threads.@spawn begin - timespan_start(ctx, :move, (;thunk_id, id, processor=to_proc), (;f, data=x)) + timespan_start(ctx, :move, (;thunk_id, id, position, processor=to_proc), (;f, data=x)) #= FIXME: This isn't valid if x is written to x = if x isa Chunk value = lock(TASK_SYNC) do @@ -1587,11 +1595,13 @@ function do_task(to_proc, task_desc) end else =# - x = @invokelatest move(to_proc, x) + new_x = @invokelatest move(to_proc, x) #end - @dagdebug thunk_id :move "Moved argument $id to $to_proc: $(typeof(x))" - timespan_finish(ctx, :move, (;thunk_id, id, processor=to_proc), (;f, data=x); tasks=[Base.current_task()]) - return x + if new_x !== x + @dagdebug thunk_id :move "Moved argument $position to $to_proc: $(typeof(x)) -> $(typeof(new_x))" + end + timespan_finish(ctx, :move, (;thunk_id, id, position, processor=to_proc), (;f, data=new_x); tasks=[Base.current_task()]) + return new_x end end fetched = Any[] @@ -1607,8 +1617,8 @@ function do_task(to_proc, task_desc) fetched_args = Any[] fetched_kwargs = Pair{Symbol,Any}[] for (idx, x) in enumerate(fetched) - pos = positions[idx] - if pos === nothing + pos = positions[idx+1] + if pos isa Int push!(fetched_args, x) else push!(fetched_kwargs, pos => x) diff --git a/src/submission.jl b/src/submission.jl index 6e61f86d7..53dd794b7 100644 --- a/src/submission.jl +++ b/src/submission.jl @@ -38,6 +38,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{ end::Union{Vector{Any},Nothing} lock(Sch.EAGER_ID_MAP) do id_map for (idx, (pos, arg)) in enumerate(args) + # FIXME: Switch to Union{Symbol,Int} to preserve positional information pos::Union{Symbol,Nothing} newarg = if arg isa DTask arg_uid = arg.uid diff --git a/src/thunk.jl b/src/thunk.jl index 02353fd17..f3ba7574a 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -362,6 +362,8 @@ function replace_broadcast(fn::Symbol) end return fn end +# For TaskNames logging event +Base.nameof(::ExpandedBroadcast{F}) where F = Symbol('.', nameof(F)) to_namedtuple(;kwargs...) = (;kwargs...) diff --git a/src/utils/logging-events.jl b/src/utils/logging-events.jl index fa31af49c..98349dfbe 100644 --- a/src/utils/logging-events.jl +++ b/src/utils/logging-events.jl @@ -117,21 +117,20 @@ end Records the raw (mutable) arguments of each submitted task. """ struct TaskArguments end -function (::TaskArguments)(ev::Event{:start}) - if ev.category == :add_thunk +(::TaskArguments)(ev::Event{:start}) = nothing +function (ta::TaskArguments)(ev::Event{:finish}) + if ev.category == :move args = Pair{Union{Symbol,Int},UInt}[] - for (idx, (pos, arg)) in enumerate(ev.timeline.args) - pos_idx = pos === nothing ? idx : pos - arg = Dagger.unwrap_weak_checked(arg) - if ismutable(arg) - push!(args, pos_idx => objectid(arg)) - end + thunk_id = ev.id.thunk_id::Int + pos = ev.id.position::Union{Symbol,Int} + arg = ev.timeline.data + if ismutable(arg) + push!(args, pos => objectid(arg)) end - return ev.id.thunk_id => args + return thunk_id => args end return end -(ta::TaskArguments)(ev::Event{:finish}) = nothing """ TaskArgumentMoves @@ -148,8 +147,10 @@ function (ta::TaskArgumentMoves)(ev::Event{:start}) if ev.category == :move data = ev.timeline.data if ismutable(data) - d = get!(Dict{Union{Int,Symbol},UInt}, ta.pre_move_args, ev.id.thunk_id) - d[ev.id.id] = objectid(data) + thunk_id = ev.id.thunk_id::Int + position = ev.id.position::Union{Symbol,Int} + d = get!(Dict{Union{Int,Symbol},UInt}, ta.pre_move_args, thunk_id) + d[position] = objectid(data) end end return @@ -158,16 +159,18 @@ function (ta::TaskArgumentMoves)(ev::Event{:finish}) if ev.category == :move post_data = ev.timeline.data if ismutable(post_data) - if haskey(ta.pre_move_args, ev.id.thunk_id) - d = ta.pre_move_args[ev.id.thunk_id] - if haskey(d, ev.id.id) - pre_data = d[ev.id.id] - return ev.id.thunk_id, ev.id.id, pre_data, objectid(post_data) + thunk_id = ev.id.thunk_id::Int + position = ev.id.position::Union{Symbol,Int} + if haskey(ta.pre_move_args, thunk_id) + d = ta.pre_move_args[thunk_id] + if haskey(d, position) + pre_data = d[position] + return thunk_id, position, pre_data, objectid(post_data) else - @warn "No TID $(ev.id.thunk_id), ID $(ev.id.id)" + @warn "No TID $(thunk_id), Position $(position)" end else - @warn "No TID $(ev.id.thunk_id)" + @warn "No TID $(thunk_id)" end end end From 7ec746d6fec2127da877557b69a1d8cb96b3b1ac Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Sun, 7 Jul 2024 15:02:56 +0200 Subject: [PATCH 03/12] GraphVizExt/PlotsExt: Configurable and matching colors --- ext/GraphVizExt.jl | 14 +++++++---- ext/PlotsExt.jl | 58 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 57 insertions(+), 15 deletions(-) diff --git a/ext/GraphVizExt.jl b/ext/GraphVizExt.jl index cc1e4479a..741b20471 100644 --- a/ext/GraphVizExt.jl +++ b/ext/GraphVizExt.jl @@ -24,6 +24,11 @@ function pretty_time(t; digits=3) end end +_name_to_color(name::AbstractString, colors) = + colors[mod1(hash(name), length(colors))] +_name_to_color(name::AbstractString, ::Nothing) = "black" +_default_colors = ["red", "orange", "green", "blue", "purple", "pink", "silver"] + """ Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, color_by=:fn, layout_engine="dot", @@ -42,7 +47,8 @@ Options: """ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, color_by=:fn, layout_engine="dot", - times::Bool=true, times_digits::Integer=3) + times::Bool=true, times_digits::Integer=3, + colors=_default_colors, name_to_color=_name_to_color) # Lookup all relevant task/argument dependencies and values in logs g = SimpleDiGraph() tid_to_vertex = Dict{Int,Int}() @@ -108,12 +114,12 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, labels = task_names all_fns = unique(map(label->first(split(label, " ")), labels[con_vs])) all_procs = unique(task_procs) - all_colors = ("red", "orange", "green", "blue", "purple", "pink", "silver") + if color_by == :fn - _colors = [all_colors[mod1(i, length(all_colors))] for i in 1:length(all_fns)] + _colors = [name_to_color(all_fns[i], colors) for i in 1:length(all_fns)] colors = Dict(v=>_colors[findfirst(fn->occursin(fn, labels[v]), all_fns)] for v in con_vs) elseif color_by == :proc - _colors = [all_colors[mod1(i, length(all_colors))] for i in 1:length(all_procs)] + _colors = [name_to_color(string(all_procs[i]), colors) for i in 1:length(all_procs)] colors = Dict(v=>_colors[findfirst(proc->proc==task_procs[v], all_procs)] for v in con_vs) else throw(ArgumentError("Unknown `color_by` value: $color_by\nAllowed: :fn, :proc")) diff --git a/ext/PlotsExt.jl b/ext/PlotsExt.jl index 809e7e3aa..99e0634d0 100644 --- a/ext/PlotsExt.jl +++ b/ext/PlotsExt.jl @@ -12,17 +12,52 @@ import Dagger import Dagger: DTask, Chunk, Processor import Dagger.TimespanLogging: Timespan -function logs_to_df(logs::Dict) - df = DataFrame(proc=Processor[], proc_name=String[], tid=Int[], t_start=UInt64[], t_end=UInt64[]) +_name_to_color(name::AbstractString, colors) = + colors[mod1(hash(name), length(colors))] +_name_to_color(name::AbstractString, ::Nothing) = "black" +_default_colors = ["red", "orange", "green", "blue", "purple", "pink", "silver"] + +function logs_to_df(logs::Dict; colors=_default_colors, name_to_color=_name_to_color, color_by=:fn) + if color_by == :fn + # Generate function names + fn_names = Dict{Int, String}() + for w in keys(logs) + for idx in 1:length(logs[w][:core]) + category = logs[w][:core][idx].category::Symbol + kind = logs[w][:core][idx].kind::Symbol + if category == :add_thunk && kind == :start + tid = logs[w][:id][idx].thunk_id::Int + if haskey(logs[w], :tasknames) + fn_names[tid] = first(split(logs[w][:tasknames][idx]::String, ' ')) + else + @warn "Task names missing from logs" + fn_names[tid] = "" + end + end + end + end + end + + # FIXME: Color eltype + df = DataFrame(proc=Processor[], proc_name=String[], tid=Int[], t_start=UInt64[], t_end=UInt64[], color=Any[]) Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx category = logs[w][:core][start_idx].category if category == :compute - proc = logs[w][:id][start_idx].processor + proc = logs[w][:id][start_idx].processor::Processor proc_name = Dagger.short_name(proc) - tid = logs[w][:id][start_idx].thunk_id - t_start = logs[w][:core][start_idx].timestamp - t_end = logs[w][:core][finish_idx].timestamp - push!(df, (;proc, proc_name, tid, t_start, t_end)) + tid = logs[w][:id][start_idx].thunk_id::Int + fn_name = get(fn_names, tid, "unknown") + t_start = logs[w][:core][start_idx].timestamp::UInt64 + t_end = logs[w][:core][finish_idx].timestamp::UInt64 + if color_by == :fn + fn_name = fn_names[tid] + color = name_to_color(fn_name, colors) + elseif color_by == :proc + color = name_to_color(proc_name, colors) + else + throw(ArgumentError("Invalid color_by value: $(repr(color_by))")) + end + push!(df, (;proc, proc_name, tid, t_start, t_end, color)) end end return df @@ -72,8 +107,10 @@ end Render a Gantt chart of task execution in `logs` using Plots. `kwargs` are passed to `plot` directly. """ -function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; kwargs...) - df = logs_to_df(logs) +function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; + colors=_default_colors, name_to_color=_name_to_color, + color_by=:fn, kwargs...) + df = logs_to_df(logs; colors, name_to_color, color_by) rect(w, h, x, y) = Shape(x .+ [0,w,w,0], y .+ [0,0,h,h]) @@ -85,8 +122,7 @@ function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; kwargs...) dy = Dict(u .=> 1:length(u)) r = [rect(t1, 1, t2, dy[t3]) for (t1,t2,t3) in zip(duration, t_start, df.proc_name)] - # FIXME: Colors - return plot(r; #=c=permutedims(df.color),=# yticks=(1.5:(nrow(df) + 0.5), u), xlabel="Time (seconds)", ylabel="Processor", labels=false, kwargs...) + return plot(r; color=permutedims(df.color), yticks=(1.5:(nrow(df) + 0.5), u), xlabel="Time (seconds)", ylabel="Processor", labels=false, kwargs...) end end # module PlotsExt From c0781fe5f592fe637a5bc2690522412513cf6276 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Sun, 7 Jul 2024 15:14:09 +0200 Subject: [PATCH 04/12] PlotsExt: Add legend --- ext/PlotsExt.jl | 47 +++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/ext/PlotsExt.jl b/ext/PlotsExt.jl index 99e0634d0..3bc23aa6f 100644 --- a/ext/PlotsExt.jl +++ b/ext/PlotsExt.jl @@ -18,46 +18,43 @@ _name_to_color(name::AbstractString, ::Nothing) = "black" _default_colors = ["red", "orange", "green", "blue", "purple", "pink", "silver"] function logs_to_df(logs::Dict; colors=_default_colors, name_to_color=_name_to_color, color_by=:fn) - if color_by == :fn - # Generate function names - fn_names = Dict{Int, String}() - for w in keys(logs) - for idx in 1:length(logs[w][:core]) - category = logs[w][:core][idx].category::Symbol - kind = logs[w][:core][idx].kind::Symbol - if category == :add_thunk && kind == :start - tid = logs[w][:id][idx].thunk_id::Int - if haskey(logs[w], :tasknames) - fn_names[tid] = first(split(logs[w][:tasknames][idx]::String, ' ')) - else - @warn "Task names missing from logs" - fn_names[tid] = "" - end + # Generate function names + fn_names = Dict{Int, String}() + for w in keys(logs) + for idx in 1:length(logs[w][:core]) + category = logs[w][:core][idx].category::Symbol + kind = logs[w][:core][idx].kind::Symbol + if category == :add_thunk && kind == :start + tid = logs[w][:id][idx].thunk_id::Int + if haskey(logs[w], :tasknames) + fn_names[tid] = first(split(logs[w][:tasknames][idx]::String, ' ')) + else + @warn "Task names missing from logs" + fn_names[tid] = "unknown" end end end end # FIXME: Color eltype - df = DataFrame(proc=Processor[], proc_name=String[], tid=Int[], t_start=UInt64[], t_end=UInt64[], color=Any[]) + df = DataFrame(proc=Processor[], proc_name=String[], fn_name=String[], tid=Int[], t_start=UInt64[], t_end=UInt64[], color=Any[]) Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx category = logs[w][:core][start_idx].category if category == :compute proc = logs[w][:id][start_idx].processor::Processor proc_name = Dagger.short_name(proc) tid = logs[w][:id][start_idx].thunk_id::Int - fn_name = get(fn_names, tid, "unknown") + fn_name = fn_names[tid] t_start = logs[w][:core][start_idx].timestamp::UInt64 t_end = logs[w][:core][finish_idx].timestamp::UInt64 if color_by == :fn - fn_name = fn_names[tid] color = name_to_color(fn_name, colors) elseif color_by == :proc color = name_to_color(proc_name, colors) else throw(ArgumentError("Invalid color_by value: $(repr(color_by))")) end - push!(df, (;proc, proc_name, tid, t_start, t_end, color)) + push!(df, (;proc, proc_name, fn_name, tid, t_start, t_end, color)) end end return df @@ -121,8 +118,18 @@ function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; u = unique(df.proc_name) dy = Dict(u .=> 1:length(u)) r = [rect(t1, 1, t2, dy[t3]) for (t1,t2,t3) in zip(duration, t_start, df.proc_name)] + labels = permutedims(df.fn_name) + # Deduplicate labels + for idx in 1:length(labels) + if findfirst(other_idx->labels[other_idx]==labels[idx], 1:length(labels)) < idx + labels[idx] = "" + end + end - return plot(r; color=permutedims(df.color), yticks=(1.5:(nrow(df) + 0.5), u), xlabel="Time (seconds)", ylabel="Processor", labels=false, kwargs...) + return plot(r; color=permutedims(df.color), labels, + yticks=(1.5:(nrow(df) + 0.5), u), + xlabel="Time (seconds)", ylabel="Processor", + kwargs...) end end # module PlotsExt From 841c0673b1c8d96da21b838d49304f6f81f1c07b Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Mon, 8 Jul 2024 23:23:30 +0200 Subject: [PATCH 05/12] TaskNames: Don't rely on nameof --- src/thunk.jl | 2 -- src/utils/logging-events.jl | 5 ++++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/thunk.jl b/src/thunk.jl index f3ba7574a..02353fd17 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -362,8 +362,6 @@ function replace_broadcast(fn::Symbol) end return fn end -# For TaskNames logging event -Base.nameof(::ExpandedBroadcast{F}) where F = Symbol('.', nameof(F)) to_namedtuple(;kwargs...) = (;kwargs...) diff --git a/src/utils/logging-events.jl b/src/utils/logging-events.jl index 98349dfbe..7b2902532 100644 --- a/src/utils/logging-events.jl +++ b/src/utils/logging-events.jl @@ -105,11 +105,14 @@ function (::TaskNames)(ev::Event{:start}) if hasproperty(f, :instance) && isdefined(f, :instance) f = f.instance end - return "$(nameof(f)) [$id]" + return "$(func_name(f)) [$id]" end return end (td::TaskNames)(ev::Event{:finish}) = nothing +func_name(f::Function) = nameof(f) +func_name(x) = repr(x) +func_name(::Dagger.ExpandedBroadcast{F}) where F = Symbol('.', nameof(F)) """ TaskArguments From 0f07a917a1fa48e38ee520d5494b7d961465e97e Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Wed, 31 Jul 2024 11:45:20 -0500 Subject: [PATCH 06/12] Task names and GraphVizExt improvements at-spawn: Add `name` option for custom task names logging: Allow `DTask` in `logs_annotate!` logging: Allow `DArray` in `logs_annotate!` (labels array chunks) logging: Add task result, DTask-to-Thunk, UID-to-TID logging events logging: Add `all_task_deps` option to `enable_logging!` for convenience GraphVizExt: Properly label tasks and objects GraphVizExt: Show box for task and oval for object GraphVizExt: Improve connectivity checks GraphVizExt/PlotsExt: Change default colors to Tab20 PlotsExt: Add processor and scheduler visualization modes PlotsExt: Remove unused plotting implementation PlotsExt: Ensure X axis is globally aligned --- ext/GraphVizExt.jl | 325 +++++++++++++++++++++++++++--------- ext/PlotsExt.jl | 149 +++++++++++------ src/array/darray.jl | 7 + src/sch/Sch.jl | 6 +- src/submission.jl | 22 ++- src/utils/logging-events.jl | 67 +++++++- src/utils/logging.jl | 35 +++- test/logging.jl | 4 +- 8 files changed, 474 insertions(+), 141 deletions(-) diff --git a/ext/GraphVizExt.jl b/ext/GraphVizExt.jl index 741b20471..18acf0d94 100644 --- a/ext/GraphVizExt.jl +++ b/ext/GraphVizExt.jl @@ -7,7 +7,7 @@ else end import Dagger -import Dagger: DTask, Chunk, Processor +import Dagger: DTask, Chunk, Processor, LoggedMutableObject import Dagger.TimespanLogging: Timespan import Graphs: SimpleDiGraph, add_edge!, add_vertex!, inneighbors, outneighbors, vertices, is_directed, edges, nv, src, dst @@ -24,10 +24,19 @@ function pretty_time(t; digits=3) end end +sanitize_label(label::String) = replace(label, "\"" => "\\\"") + _name_to_color(name::AbstractString, colors) = colors[mod1(hash(name), length(colors))] _name_to_color(name::AbstractString, ::Nothing) = "black" -_default_colors = ["red", "orange", "green", "blue", "purple", "pink", "silver"] +tab20_colors = [ + "#1f77b4", "#aec7e8", "#ff7f0e", "#ffbb78", + "#2ca02c", "#98df8a", "#d62728", "#ff9896", + "#9467bd", "#c5b0d5", "#8c564b", "#c49c94", + "#e377c2", "#f7b6d2", "#7f7f7f", "#c7c7c7", + "#bcbd22", "#dbdb8d", "#17becf", "#9edae5" +] +_default_colors = tab20_colors """ Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, @@ -36,7 +45,7 @@ _default_colors = ["red", "orange", "green", "blue", "purple", "pink", "silver"] Render a graph of the task dependencies and data dependencies in `logs` using GraphViz. -Requires the following events enabled in `enable_logging!`: `taskdeps`, `tasknames`, `taskargs`, `taskargmoves` +Requires the `all_task_deps` event enabled in `enable_logging!` Options: - `disconnected`: If `true`, render disconnected vertices (tasks or arguments without upstream/downstream dependencies) @@ -51,12 +60,22 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, colors=_default_colors, name_to_color=_name_to_color) # Lookup all relevant task/argument dependencies and values in logs g = SimpleDiGraph() + tid_to_vertex = Dict{Int,Int}() - task_names = String[] + tid_to_auto_name = Dict{Int,String}() + tid_to_name = Dict{Int,String}() tid_to_proc = Dict{Int,Processor}() - arg_names = Dict{UInt,String}() + + objid_to_vertex = Dict{UInt,Int}() + objid_to_name = Dict{UInt,String}() + task_args = Dict{Int,Vector{Pair{Union{Int,Symbol},UInt}}}() - arg_moves = Dict{Int,Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}}() + task_arg_moves = Dict{Int,Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}}() + task_result = Dict{Int,UInt}() + + uid_to_tid = Dict{UInt,Int}() + dtasks_to_patch = Set{UInt}() + for w in keys(logs) for idx in 1:length(logs[w][:core]) category = logs[w][:core][idx].category @@ -67,40 +86,155 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, taskdeps = logs[w][:taskdeps][idx]::Pair{Int,Vector{Int}} taskname = logs[w][:tasknames][idx]::String tid, deps = taskdeps - add_vertex!(g) - tid_to_vertex[tid] = nv(g) - push!(task_names, taskname) + v = get!(tid_to_vertex, tid) do + add_vertex!(g) + tid_to_vertex[tid] = nv(g) + nv(g) + end + tid_to_auto_name[tid] = taskname for dep in deps - haskey(tid_to_vertex, dep) || continue - add_edge!(g, tid_to_vertex[dep], nv(g)) + dep_v = get!(tid_to_vertex, dep) do + add_vertex!(g) + tid_to_vertex[dep] = nv(g) + nv(g) + end + add_edge!(g, dep_v, v) + end + if haskey(logs[w], :taskuidtotid) + uid_tid = logs[w][:taskuidtotid][idx] + if uid_tid !== nothing + uid, tid = uid_tid::Pair{UInt,Int} + uid_to_tid[uid] = tid + end end elseif category == :compute && kind == :start id::NamedTuple tid = id.thunk_id proc = id.processor tid_to_proc[tid] = proc + elseif category == :compute && kind == :finish + if haskey(logs[w], :taskresult) + result_info = logs[w][:taskresult][idx] + result_info === nothing && continue + tid, obj = result_info::Pair{Int,LoggedMutableObject} + objid = obj.objid + task_result[tid] = objid + tid_v = get!(tid_to_vertex, tid) do + add_vertex!(g) + tid_to_vertex[tid] = nv(g) + nv(g) + end + v = get!(objid_to_vertex, objid) do + add_vertex!(g) + objid_to_vertex[objid] = nv(g) + nv(g) + end + add_edge!(g, tid_v, v) + end elseif category == :move && kind == :finish if haskey(logs[w], :taskargs) - id, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector} - append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, id), args) + tid, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector} + args = map(arg->arg[1]=>arg[2].objid, args) + append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, tid), args) + for arg in args + objid = arg[2] + arg_id = get!(objid_to_vertex, objid) do + add_vertex!(g) + objid_to_vertex[objid] = nv(g) + nv(g) + end + if tid != 0 + tid_v = get!(tid_to_vertex, tid) do + add_vertex!(g) + tid_to_vertex[tid] = nv(g) + nv(g) + end + add_edge!(g, arg_id, tid_v) + end + end end if haskey(logs[w], :taskargmoves) move_info = logs[w][:taskargmoves][idx] move_info === nothing && continue - tid, pos, pre_objid, post_objid = move_info - v = get!(Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}, arg_moves, tid) + tid, pos, pre_obj, post_obj = move_info + v = get!(Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}, task_arg_moves, tid) + pre_objid = pre_obj.objid + post_objid = post_obj.objid push!(v, pos => (pre_objid, post_objid)) + pre_arg_id = get!(objid_to_vertex, pre_objid) do + add_vertex!(g) + objid_to_vertex[pre_objid] = nv(g) + nv(g) + end + post_arg_id = get!(objid_to_vertex, post_objid) do + add_vertex!(g) + objid_to_vertex[post_objid] = nv(g) + nv(g) + end + add_edge!(g, pre_arg_id, post_arg_id) end elseif category == :data_annotation && kind == :start id::NamedTuple - objid = id.objectid - name = id.name - arg_names[objid] = String(name) + name = String(id.name) + obj = id.objectid::LoggedMutableObject + objid = obj.objid + objid_to_name[objid] = name + if obj.kind == :task + # N.B. We don't need the object vertex, + # since we'll just render it as a task + push!(dtasks_to_patch, objid) + else + get!(objid_to_vertex, objid) do + add_vertex!(g) + objid_to_vertex[objid] = nv(g) + nv(g) + end + end + elseif category == :finish && kind == :finish + if haskey(logs[w], :tasktochunk) + tid_chunk = logs[w][:tasktochunk][idx] + if tid_chunk !== nothing + tid, chunk_obj = tid_chunk::Pair{Int,LoggedMutableObject} + chunk_id = chunk_obj.objid + v = get!(objid_to_vertex, chunk_id) do + add_vertex!(g) + objid_to_vertex[chunk_id] = nv(g) + nv(g) + end + add_edge!(g, tid_to_vertex[tid], v) + end + end + end + end + end + + # Process DTasks-to-Thunk mappings + for uid in dtasks_to_patch + if haskey(uid_to_tid, uid) + tid = uid_to_tid[uid] + v = get!(tid_to_vertex, tid) do + add_vertex!(g) + tid_to_vertex[tid] = nv(g) + nv(g) + end + + # Fixup any missing tid data + if haskey(objid_to_name, uid) + tid_to_name[tid] = objid_to_name[uid] end end end - tids_sorted = map(first, sort(collect(tid_to_vertex); by=last)) - task_procs = Processor[tid_to_proc[tid] for tid in tids_sorted] + + # Auto-assign names + for (tid, name) in tid_to_auto_name + if !haskey(tid_to_name, tid) + tid_to_name[tid] = name + end + end + + # Create reverse mappings + vertex_to_tid = Dict{Int,Int}(v=>k for (k,v) in tid_to_vertex) + vertex_to_objid = Dict{Int,UInt}(v=>k for (k,v) in objid_to_vertex) # Find all connected and disconnected vertices if !disconnected @@ -110,31 +244,7 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, con_vs = vertices(g) end - # Assign colors - labels = task_names - all_fns = unique(map(label->first(split(label, " ")), labels[con_vs])) - all_procs = unique(task_procs) - - if color_by == :fn - _colors = [name_to_color(all_fns[i], colors) for i in 1:length(all_fns)] - colors = Dict(v=>_colors[findfirst(fn->occursin(fn, labels[v]), all_fns)] for v in con_vs) - elseif color_by == :proc - _colors = [name_to_color(string(all_procs[i]), colors) for i in 1:length(all_procs)] - colors = Dict(v=>_colors[findfirst(proc->proc==task_procs[v], all_procs)] for v in con_vs) - else - throw(ArgumentError("Unknown `color_by` value: $color_by\nAllowed: :fn, :proc")) - end - - str = is_directed(g) ? "digraph mygraph {\n" : "graph mygraph {\n" - - # Add raw arguments - for (id, name) in arg_names - str *= "a$id [label=\"$name\", shape=box]\n" - end - if times - vertex_to_tid = Dict{Int,Int}(v=>k for (k,v) in tid_to_vertex) - # Determine per-worker start times worker_start_times = Dict{Int,UInt64}() for w in keys(logs) @@ -169,68 +279,127 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, end end - # Add argument moves - seen_moves = Set{Tuple{UInt,UInt}}() - for (tid, moves) in arg_moves - for (pos, (pre_objid, post_objid)) in moves - pre_objid == post_objid && continue - (pre_objid, post_objid) in seen_moves && continue - push!(seen_moves, (pre_objid, post_objid)) - move_str = "a$pre_objid -> a$post_objid [label=\"move\"]\n" - str *= move_str - end + # Get the set of all unique task and object IDs + all_tids = collect(keys(tid_to_vertex)) + all_objids = collect(keys(objid_to_vertex)) + + # Assign colors + if color_by == :fn + all_fns = unique(values(tid_to_name)) + _colors = [name_to_color(all_fns[i], colors) for i in 1:length(all_fns)] + colors = Dict(tid=>_colors[findfirst(fn->occursin(fn, tid_to_name[tid]), all_fns)] for tid in all_tids) + elseif color_by == :proc + all_procs = unique(values(tid_to_proc)) + _colors = [name_to_color(string(all_procs[i]), colors) for i in 1:length(all_procs)] + colors = Dict(tid=>_colors[findfirst(proc->proc==tid_to_proc[tid], all_procs)] for tid in all_tids) + else + throw(ArgumentError("Unknown `color_by` value: $color_by\nAllowed: :fn, :proc")) end - # Add tasks - for v in con_vs + str = is_directed(g) ? "digraph mygraph {\n" : "graph mygraph {\n" + + # Add task vertices + for tid in all_tids + v = tid_to_vertex[tid] if !disconnected && (v in discon_vs) continue end - label = labels[v] - color = colors[v] - proc = task_procs[v] - proc_str = '(' * Dagger.short_name(task_procs[v]) * ')' - label_str = "$label\\n$proc_str" + label_str = tid_to_name[tid] + if haskey(tid_to_auto_name, tid) && tid_to_name[tid] != tid_to_auto_name[tid] + label_str *= "\\nTask: $(tid_to_auto_name[tid])" + end + color = colors[tid] + proc = tid_to_proc[tid] + label_str *= "\\n($(Dagger.short_name(tid_to_proc[tid])))" if times - tid = vertex_to_tid[v] start_time = pretty_time(start_times[tid]; digits=times_digits) finish_time = pretty_time(finish_times[tid]; digits=times_digits) - label_str *= "\\n[+$start_time -> +$finish_time]" + diff_time = pretty_time(finish_times[tid] - start_times[tid]; digits=times_digits) + label_str *= "\\n[+$start_time -> +$finish_time (diff: $diff_time)]" end - str *= "v$v [label=\"$label_str\", color=\"$color\", penwidth=2.0]\n" + label_str = sanitize_label(label_str) + str *= "v$v [label=\"$label_str\", shape=box, color=\"$color\", penwidth=2.0]\n" end - # Add task dependencies + # Add object vertices + for objid in all_objids + objid_v = objid_to_vertex[objid] + if !disconnected && !(objid_v in con_vs) + continue + end + if objid in dtasks_to_patch || haskey(uid_to_tid, objid) + # DTask, skip it + continue + end + # Object + if haskey(objid_to_name, objid) + label = sanitize_label(objid_to_name[objid]) + label *= "\\nData: $(repr(objid))" + else + label = "Data: $(repr(objid))" + end + str *= "a$objid_v [label=\"$label\", shape=oval]\n" + end + + # Add task argument move edges + seen_moves = Set{Tuple{UInt,UInt}}() + for (tid, moves) in task_arg_moves + for (pos, (pre_objid, post_objid)) in moves + pre_objid == post_objid && continue + (pre_objid, post_objid) in seen_moves && continue + push!(seen_moves, (pre_objid, post_objid)) + pre_objid_v = objid_to_vertex[pre_objid] + post_objid_v = objid_to_vertex[post_objid] + move_str = "a$pre_objid_v -> a$post_objid_v [label=\"move\"]\n" + str *= move_str + end + end + + # Add task-to-task (syncdep) dependency edges edge_sep = is_directed(g) ? "->" : "--" for edge in edges(g) + if !haskey(vertex_to_tid, src(edge)) || !haskey(vertex_to_tid, dst(edge)) + continue + end + if !disconnected && !(src(edge) in con_vs) || !(dst(edge) in con_vs) + continue + end # FIXME: Label syncdeps with associated arguments and datadeps directions str *= "v$(src(edge)) $edge_sep v$(dst(edge)) [label=\"syncdep\"]\n" end - # Add task arguments - con_args = Vector{UInt}(collect(keys(arg_names))) - for moves in values(arg_moves) - for (_, (pre_objid, post_objid)) in moves - push!(con_args, pre_objid) - push!(con_args, post_objid) - end - end + # Add task argument edges for (tid, args) in task_args haskey(tid_to_vertex, tid) || continue - id = tid_to_vertex[tid] - id in con_vs || continue + tid_v = tid_to_vertex[tid] + tid_v in con_vs || continue for (pos, arg) in args - if !disconnected && !(arg in con_args) + arg_v = objid_to_vertex[arg] + if !disconnected && !(arg_v in con_vs) continue end - arg_str = pos isa Int ? "arg $pos" : "kwarg $pos" - str *= "a$arg $edge_sep v$id [label=\"$arg_str\"]\n" + arg_str = sanitize_label(pos isa Int ? "arg $pos" : "kwarg $pos") + str *= "a$arg_v $edge_sep v$tid_v [label=\"$arg_str\"]\n" end end + # Add task result edges + for (tid, result) in task_result + haskey(tid_to_vertex, tid) || continue + tid_v = tid_to_vertex[tid] + tid_v in con_vs || continue + result_v = objid_to_vertex[result] + if !disconnected && !(result_v in con_vs) + continue + end + str *= "v$tid_v $edge_sep a$result_v [label=\"result\"]\n" + end + + # Generate the final graph str *= "}\n" gv = GraphViz.Graph(str) GraphViz.layout!(gv; engine=layout_engine) + return gv end diff --git a/ext/PlotsExt.jl b/ext/PlotsExt.jl index 3bc23aa6f..faff294e5 100644 --- a/ext/PlotsExt.jl +++ b/ext/PlotsExt.jl @@ -15,11 +15,20 @@ import Dagger.TimespanLogging: Timespan _name_to_color(name::AbstractString, colors) = colors[mod1(hash(name), length(colors))] _name_to_color(name::AbstractString, ::Nothing) = "black" -_default_colors = ["red", "orange", "green", "blue", "purple", "pink", "silver"] +tab20_colors = [ + "#1f77b4", "#aec7e8", "#ff7f0e", "#ffbb78", + "#2ca02c", "#98df8a", "#d62728", "#ff9896", + "#9467bd", "#c5b0d5", "#8c564b", "#c49c94", + "#e377c2", "#f7b6d2", "#7f7f7f", "#c7c7c7", + "#bcbd22", "#dbdb8d", "#17becf", "#9edae5" +] +_default_colors = tab20_colors -function logs_to_df(logs::Dict; colors=_default_colors, name_to_color=_name_to_color, color_by=:fn) +function logs_to_df(logs::Dict, ::Val{:execution}; colors=_default_colors, name_to_color=_name_to_color, color_by=:fn) # Generate function names fn_names = Dict{Int, String}() + dtask_names = Dict{UInt, String}() + uid_to_tid = Dict{UInt, Int}() for w in keys(logs) for idx in 1:length(logs[w][:core]) category = logs[w][:core][idx].category::Symbol @@ -32,10 +41,33 @@ function logs_to_df(logs::Dict; colors=_default_colors, name_to_color=_name_to_c @warn "Task names missing from logs" fn_names[tid] = "unknown" end + if haskey(logs[w], :taskuidtotid) + uid_tid = logs[w][:taskuidtotid][idx] + if uid_tid !== nothing + uid, tid = uid_tid::Pair{UInt,Int} + uid_to_tid[uid] = tid + end + end + elseif category == :data_annotation && kind == :start + id = logs[w][:id][idx]::NamedTuple + name = String(id.name) + obj = id.objectid::Dagger.LoggedMutableObject + objid = obj.objid + if obj.kind == :task + dtask_names[objid] = name + end end end end + # Process DTasks-to-Thunk mappings + for (uid, tid) in uid_to_tid + # Patch in custom name + if haskey(dtask_names, uid) + fn_names[tid] = dtask_names[uid] + end + end + # FIXME: Color eltype df = DataFrame(proc=Processor[], proc_name=String[], fn_name=String[], tid=Int[], t_start=UInt64[], t_end=UInt64[], color=Any[]) Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx @@ -59,43 +91,42 @@ function logs_to_df(logs::Dict; colors=_default_colors, name_to_color=_name_to_c end return df end - -# Implementation by Przemyslaw Szufel -function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt_ps}) - df = logs_to_df(logs) - - proc_names = sort!(unique(df.proc_name)) - proc_idx = Dict{String,Int}() - for name in proc_names - proc_idx[name] = findfirst(==(name), proc_names) - end - proc_idxs = map(name->proc_idx[name], proc_names) - tvals = zeros(UInt64, length(proc_names)) - plt = bar(orientation=:h, yticks=(1:length(proc_names), proc_names), linewidth=0,yflip=true,color=:green,legend=nothing) - xlabel!(plt, "Time in seconds") - dfc = deepcopy(df) - while nrow(dfc) > 0 - rowslast = DataFrame([g[findmax(g.t_end)[2],:] for g in groupby(dfc, :proc_name)]) - tvals .= .0 - for i in 1:nrow(rowslast) - tvals[proc_idx[rowslast.proc_name[i]]] = rowslast.t_end[i] +function logs_to_df(logs::Dict, ::Val{:processor}; colors=_default_colors, name_to_color=_name_to_color, kwargs...) + # Collect processor events + # FIXME: Color eltype + df = DataFrame(proc=Processor[], proc_name=String[], category=String[], t_start=UInt64[], t_end=UInt64[], color=Any[]) + Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx + category = logs[w][:core][start_idx].category + if category in (:compute, :storage_wait, :storage_safe_scan, :proc_run_fetch, :proc_steal_local) + proc = logs[w][:id][start_idx].processor::Processor + proc_name = Dagger.short_name(proc) + t_start = logs[w][:core][start_idx].timestamp::UInt64 + t_end = logs[w][:core][finish_idx].timestamp::UInt64 + category_str = string(category) + color = name_to_color(category_str, colors) + push!(df, (;proc, proc_name, category=category_str, t_start, t_end, color)) end - #setindex!.(Ref(tvals), rowslast.t_end, getindex.(proc_idx, rowslast.proc_name)) - bar!(plt, tvals[proc_idxs], orientation=:h, linewidth=0.5,yflip=true,color=:green) - tvals .= .0 - for i in 1:nrow(rowslast) - tvals[proc_idx[rowslast.proc_name[i]]] = rowslast.t_start[i] + end + return df +end +function logs_to_df(logs::Dict, ::Val{:scheduler}; colors=_default_colors, name_to_color=_name_to_color, kwargs...) + # Collect scheduler events + # FIXME: Color eltype + df = DataFrame(category=String[], t_start=UInt64[], t_end=UInt64[], color=Any[]) + Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx + category = logs[w][:core][start_idx].category + if category in (:scheduler_init, :scheduler_exit, :init_proc, :remove_proc, :add_thunk, :handle_fault, :schedule, :fire, :enqueue, :take, :finish) + t_start = logs[w][:core][start_idx].timestamp::UInt64 + t_end = logs[w][:core][finish_idx].timestamp::UInt64 + category_str = string(category) + color = name_to_color(category_str, colors) + push!(df, (;category=category_str, t_start, t_end, color)) end - #setindex!.(Ref(tvals), rowslast.t_start, proc_idx[rowslast.proc_name]) - bar!(plt, tvals[proc_idxs], orientation=:h, linewidth=0.5,linecolor=:white,yflip=true,color=:white) - annotate!.(Ref(plt),(rowslast.t_start .+ rowslast.t_end) ./ 2, findfirst.( .==(rowslast.proc_name), Ref(proc_names)), text.(string.(rowslast.tid),9,rotation=90 )) - dfc = dfc[ .! (dfc.tid .∈ Ref(rowslast.tid) ), : ] end - # FIXME: theoretic_optimal = simulate_polling(df)[1] + minimum(df.t_start) - theoretic_optimal = minimum(df.t_start) - plot!(plt, [theoretic_optimal,theoretic_optimal], [0, length(proc_names)+1],width=3,color=:black,style=:dot) - return plt + return df end +logs_to_df(logs::Dict, ::Val{target}; kwargs...) where target = + throw(ArgumentError("Invalid target: $(repr(target))")) # Implementation adapted from: # https://discourse.julialang.org/t/how-to-make-a-gantt-plot-with-plots-jl/95165/7 @@ -105,30 +136,54 @@ end Render a Gantt chart of task execution in `logs` using Plots. `kwargs` are passed to `plot` directly. """ function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; + target=:execution, colors=_default_colors, name_to_color=_name_to_color, color_by=:fn, kwargs...) - df = logs_to_df(logs; colors, name_to_color, color_by) + df = logs_to_df(logs, Val{target}(); colors, name_to_color, color_by) + y_elem = if target == :execution || target == :processor + :proc_name + elseif target == :scheduler + :category + end + ylabel = if target == :execution || target == :processor + "Processor" + elseif target == :scheduler + "Category" + end + sort!(df, y_elem) + + global_t_start, global_t_end = extrema(logs[1][:core][idx].timestamp for idx in 1:length(logs[1][:core])) rect(w, h, x, y) = Shape(x .+ [0,w,w,0], y .+ [0,0,h,h]) - t_init = minimum(df.t_start) - t_start = (df.t_start .- t_init) ./ 1e9 - t_end = (df.t_end .- t_init) ./ 1e9 + t_start = (df.t_start .- global_t_start) ./ 1e9 + t_end = (df.t_end .- global_t_start) ./ 1e9 duration = t_end .- t_start - u = unique(df.proc_name) + u = unique(getproperty(df, y_elem)) dy = Dict(u .=> 1:length(u)) - r = [rect(t1, 1, t2, dy[t3]) for (t1,t2,t3) in zip(duration, t_start, df.proc_name)] - labels = permutedims(df.fn_name) - # Deduplicate labels - for idx in 1:length(labels) - if findfirst(other_idx->labels[other_idx]==labels[idx], 1:length(labels)) < idx - labels[idx] = "" + r = [rect(t1, 1, t2, dy[t3]) for (t1,t2,t3) in zip(duration, t_start, getproperty(df, y_elem))] + if target == :execution + labels = permutedims(df.fn_name) + elseif target == :processor + labels = permutedims(df.category) + elseif target == :scheduler + labels = nothing + end + + if labels !== nothing + # Deduplicate labels + for idx in 1:length(labels) + if findfirst(other_idx->labels[other_idx]==labels[idx], 1:length(labels)) < idx + labels[idx] = "" + end end end return plot(r; color=permutedims(df.color), labels, yticks=(1.5:(nrow(df) + 0.5), u), - xlabel="Time (seconds)", ylabel="Processor", + xlabel="Time (seconds)", ylabel, + xlim=(0.0, (global_t_end - global_t_start) / 1e9), + legendalpha=0, kwargs...) end diff --git a/src/array/darray.jl b/src/array/darray.jl index 9e0bc4636..ca1aebff8 100644 --- a/src/array/darray.jl +++ b/src/array/darray.jl @@ -524,6 +524,13 @@ function Base.:(==)(x::AbstractArray{T,N}, y::ArrayOp{S,N}) where {T,S,N} return collect(x) == y end +function logs_annotate!(ctx::Context, A::DArray, name::Union{String,Symbol}) + for (idx, chunk) in enumerate(A.chunks) + sd = A.subdomains[idx] + Dagger.logs_annotate!(ctx, chunk, name*'['*join(sd.indexes, ',')*']') + end +end + # TODO: Allow `f` to return proc mapchunk(f, chunk) = tochunk(f(poolget(chunk.handle))) function mapchunks(f, d::DArray{T,N,F}) where {T,N,F} diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index d89f22417..3dc6cef9f 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -604,9 +604,9 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options) end end - timespan_start(ctx, :finish, (;thunk_id), (;thunk_id)) + timespan_start(ctx, :finish, (;thunk_id), (;thunk_id, result=res)) finish_task!(ctx, state, node, thunk_failed) - timespan_finish(ctx, :finish, (;thunk_id), (;thunk_id)) + timespan_finish(ctx, :finish, (;thunk_id), (;thunk_id, result=res)) delete_unused_tasks!(state) end @@ -1684,7 +1684,7 @@ function do_task(to_proc, task_desc) threadtime = cputhreadtime() - threadtime_start # FIXME: This is not a realistic measure of max. required memory #gc_allocd = min(max(UInt64(Base.gc_num().allocd) - UInt64(gcnum_start.allocd), UInt64(0)), UInt64(1024^4)) - timespan_finish(ctx, :compute, (;thunk_id, processor=to_proc), (;f)) + timespan_finish(ctx, :compute, (;thunk_id, processor=to_proc), (;f, result=result_meta)) lock(TASK_SYNC) do real_time_util[] -= est_time_util pop!(TASKS_RUNNING, thunk_id) diff --git a/src/submission.jl b/src/submission.jl index 53dd794b7..7312e378d 100644 --- a/src/submission.jl +++ b/src/submission.jl @@ -26,7 +26,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{ id = next_id() - timespan_start(ctx, :add_thunk, (;thunk_id=id), (;f, args, options)) + timespan_start(ctx, :add_thunk, (;thunk_id=id), (;f, args, options, uid)) # Lookup DTask/ThunkID -> Thunk old_args = copy(args) @@ -129,7 +129,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{ put!(state.chan, Sch.RescheduleSignal()) end - timespan_finish(ctx, :add_thunk, (;thunk_id=id), (;f, args, options)) + timespan_finish(ctx, :add_thunk, (;thunk_id=id), (;f, args, options, uid)) return thunk_id end @@ -224,10 +224,13 @@ function eager_spawn(spec::DTaskSpec) future = ThunkFuture() finalizer_ref = poolset(DTaskFinalizer(uid); device=MemPool.CPURAMDevice()) - # Return unlaunched DTask + # Create unlaunched DTask return DTask(uid, future, finalizer_ref) end function eager_launch!((spec, task)::Pair{DTaskSpec,DTask}) + # Assign a name, if specified + eager_assign_name!(spec, task) + # Lookup DTask -> ThunkID local args, options lock(Sch.EAGER_ID_MAP) do id_map @@ -244,6 +247,11 @@ end function eager_launch!(specs::Vector{Pair{DTaskSpec,DTask}}) ntasks = length(specs) + # Assign a name, if specified + for (spec, task) in specs + eager_assign_name!(spec, task) + end + uids = [task.uid for (_, task) in specs] futures = [task.future for (_, task) in specs] finalizer_refs = [task.finalizer_ref for (_, task) in specs] @@ -263,3 +271,11 @@ function eager_launch!(specs::Vector{Pair{DTaskSpec,DTask}}) task.thunk_ref = thunk_ids[i].ref end end + +function eager_assign_name!(spec::DTaskSpec, task::DTask) + # Assign a name, if specified + if haskey(spec.options, :name) + Dagger.logs_annotate!(task, spec.options.name) + spec.options = (;filter(x -> x[1] != :name, Base.pairs(spec.options))...) + end +end diff --git a/src/utils/logging-events.jl b/src/utils/logging-events.jl index 7b2902532..716c4a2b8 100644 --- a/src/utils/logging-events.jl +++ b/src/utils/logging-events.jl @@ -1,3 +1,8 @@ +struct LoggedMutableObject + objid::UInt + kind::Symbol +end + module Events import ..Dagger @@ -123,12 +128,12 @@ struct TaskArguments end (::TaskArguments)(ev::Event{:start}) = nothing function (ta::TaskArguments)(ev::Event{:finish}) if ev.category == :move - args = Pair{Union{Symbol,Int},UInt}[] + args = Pair{Union{Symbol,Int},Dagger.LoggedMutableObject}[] thunk_id = ev.id.thunk_id::Int pos = ev.id.position::Union{Symbol,Int} arg = ev.timeline.data if ismutable(arg) - push!(args, pos => objectid(arg)) + push!(args, pos => Dagger.objectid_or_chunkid(arg)) end return thunk_id => args end @@ -141,10 +146,10 @@ end Records any `move`-derived copies of arguments of each task. """ struct TaskArgumentMoves - pre_move_args::Dict{Int,Dict{Union{Int,Symbol},UInt}} + pre_move_args::Dict{Int,Dict{Union{Int,Symbol},Dagger.LoggedMutableObject}} end TaskArgumentMoves() = - TaskArgumentMoves(Dict{Int,Dict{Union{Int,Symbol},UInt}}()) + TaskArgumentMoves(Dict{Int,Dict{Union{Int,Symbol},Dagger.LoggedMutableObject}}()) init_similar(::TaskArgumentMoves) = TaskArgumentMoves() function (ta::TaskArgumentMoves)(ev::Event{:start}) if ev.category == :move @@ -152,8 +157,8 @@ function (ta::TaskArgumentMoves)(ev::Event{:start}) if ismutable(data) thunk_id = ev.id.thunk_id::Int position = ev.id.position::Union{Symbol,Int} - d = get!(Dict{Union{Int,Symbol},UInt}, ta.pre_move_args, thunk_id) - d[position] = objectid(data) + d = get!(Dict{Union{Int,Symbol},Dagger.LoggedMutableObject}, ta.pre_move_args, thunk_id) + d[position] = Dagger.objectid_or_chunkid(data) end end return @@ -168,7 +173,7 @@ function (ta::TaskArgumentMoves)(ev::Event{:finish}) d = ta.pre_move_args[thunk_id] if haskey(d, position) pre_data = d[position] - return thunk_id, position, pre_data, objectid(post_data) + return thunk_id, position, pre_data, Dagger.objectid_or_chunkid(post_data) else @warn "No TID $(thunk_id), Position $(position)" end @@ -180,6 +185,24 @@ function (ta::TaskArgumentMoves)(ev::Event{:finish}) return end +""" + TaskResult + +Records the raw (mutable) return value of each submitted task. +""" +struct TaskResult end +(::TaskResult)(ev::Event{:start}) = nothing +function (ta::TaskResult)(ev::Event{:finish}) + if ev.category == :compute + thunk_id = ev.id.thunk_id::Int + result = ev.timeline.result + if ismutable(result) + return thunk_id => Dagger.objectid_or_chunkid(result) + end + end + return +end + """ TaskDependencies @@ -213,4 +236,34 @@ function (::TaskDependencies)(ev::Event{:start}) end (td::TaskDependencies)(ev::Event{:finish}) = nothing +""" + TaskUIDtoTID + +Maps DTask UIDs to Thunk TIDs. +""" +struct TaskUIDtoTID end +function (tut::TaskUIDtoTID)(ev::Event{:start}) + if ev.category == :add_thunk + thunk_id = ev.id.thunk_id::Int + uid = ev.timeline.uid::UInt + return uid => thunk_id + end + return +end +(tut::TaskUIDtoTID)(ev::Event{:finish}) = nothing + +struct TaskToChunk end +(td::TaskToChunk)(ev::Event{:start}) = nothing +function (::TaskToChunk)(ev::Event{:finish}) + if ev.category == :finish + thunk_id = ev.id.thunk_id::Int + result = ev.timeline.result + if ismutable(result) + chunk_id = Dagger.objectid_or_chunkid(result) + return thunk_id => chunk_id + end + end + return +end + end # module Events diff --git a/src/utils/logging.jl b/src/utils/logging.jl index 9b923c526..3a0a969de 100644 --- a/src/utils/logging.jl +++ b/src/utils/logging.jl @@ -8,18 +8,26 @@ Enables logging globally for all workers. Certain core events are always enabled Extra events: - `metrics::Bool`: Enables various utilization and allocation metrics - `timeline::Bool`: Enables raw "timeline" values, which are event-specific; not recommended except for debugging +- `all_task_deps::Bool`: Enables all task dependency-related logging - `tasknames::Bool`: Enables generating unique task names for each task - `taskdeps::Bool`: Enables reporting of upstream task dependencies (as task IDs) for each task argument - `taskargs::Bool`: Enables reporting of upstream non-task dependencies (as `objectid` hash) for each task argument - `taskargmoves::Bool`: Enables reporting of copies of upstream dependencies (as original and copy `objectid` hashes) for each task argument +- `taskresult::Bool`: Enables reporting of task result values (as `objectid` hash) +- `taskuidtotid::Bool`: Enables reporting of task UID-to-TID mappings +- `tasktochunk::Bool`: Enables reporting of DTask-to-Chunk mappings - `profile::Bool`: Enables profiling of task execution; not currently recommended, as it adds significant overhead """ function enable_logging!(;metrics::Bool=true, timeline::Bool=false, + all_task_deps::Bool=false, tasknames::Bool=true, taskdeps::Bool=true, taskargs::Bool=false, taskargmoves::Bool=false, + taskresult::Bool=false, + taskuidtotid::Bool=false, + tasktochunk::Bool=false, profile::Bool=false) ml = TimespanLogging.MultiEventLog() ml[:core] = TimespanLogging.Events.CoreMetrics() @@ -27,6 +35,15 @@ function enable_logging!(;metrics::Bool=true, if timeline ml[:timeline] = TimespanLogging.Events.TimelineMetrics() end + if all_task_deps + tasknames = true + taskdeps = true + taskargs = true + taskargmoves = true + taskresult = true + taskuidtotid = true + tasktochunk = true + end if tasknames ml[:tasknames] = Dagger.Events.TaskNames() end @@ -39,6 +56,15 @@ function enable_logging!(;metrics::Bool=true, if taskargmoves ml[:taskargmoves] = Dagger.Events.TaskArgumentMoves() end + if taskresult + ml[:taskresult] = Dagger.Events.TaskResult() + end + if taskuidtotid + ml[:taskuidtotid] = Dagger.Events.TaskUIDtoTID() + end + if tasktochunk + ml[:tasktochunk] = Dagger.Events.TaskToChunk() + end if profile ml[:profile] = DaggerWebDash.ProfileMetrics() end @@ -106,9 +132,16 @@ utilize for display purposes. """ function logs_annotate!(ctx::Context, arg, name::Union{String,Symbol}) ismutable(arg) || throw(ArgumentError("Argument must be mutable to be annotated")) - Dagger.TimespanLogging.timespan_start(ctx, :data_annotation, (;objectid=objectid(arg), name), nothing) + Dagger.TimespanLogging.timespan_start(ctx, :data_annotation, (;objectid=objectid_or_chunkid(arg), name), nothing) # TODO: Remove redundant log event Dagger.TimespanLogging.timespan_finish(ctx, :data_annotation, nothing, nothing) end logs_annotate!(arg, name::Union{String,Symbol}) = logs_annotate!(Dagger.Sch.eager_context(), arg, name) + +objectid_or_chunkid(@nospecialize(x)) = + LoggedMutableObject(objectid(x), :object) +objectid_or_chunkid(@nospecialize(x::Chunk)) = + LoggedMutableObject(hash(x), :chunk) +objectid_or_chunkid(@nospecialize(x::DTask)) = + LoggedMutableObject(x.uid, :task) diff --git a/test/logging.jl b/test/logging.jl index bfc5de025..9a12cf7ff 100644 --- a/test/logging.jl +++ b/test/logging.jl @@ -168,7 +168,7 @@ import Colors, GraphViz, DataFrames, Plots, JSON3 if VERSION >= v"1.9-" @testset "show_plan/render_plan built-in" begin - Dagger.enable_logging!() + Dagger.enable_logging!(;all_task_deps=true) A = distribute(rand(4, 4), Blocks(8, 8)) sum(A) @@ -182,7 +182,7 @@ import Colors, GraphViz, DataFrames, Plots, JSON3 # JSON3Ext @test Dagger.render_logs(logs, :chrome_trace) !== nothing - + Dagger.disable_logging!() end end From 42597ff63f6153446889f6763b1c74b964e712a4 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Tue, 20 Aug 2024 14:26:25 -0500 Subject: [PATCH 07/12] logging: Add TaskFunctionNames event --- ext/GraphVizExt.jl | 2 +- ext/JSON3Ext.jl | 5 ++--- ext/PlotsExt.jl | 4 ++-- src/utils/logging-events.jl | 18 ++++++++++++++++++ src/utils/logging.jl | 7 ++++++- 5 files changed, 29 insertions(+), 7 deletions(-) diff --git a/ext/GraphVizExt.jl b/ext/GraphVizExt.jl index 18acf0d94..1b41f9fa1 100644 --- a/ext/GraphVizExt.jl +++ b/ext/GraphVizExt.jl @@ -84,7 +84,7 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, if category == :add_thunk && kind == :start id::NamedTuple taskdeps = logs[w][:taskdeps][idx]::Pair{Int,Vector{Int}} - taskname = logs[w][:tasknames][idx]::String + taskname = logs[w][:taskfuncnames][idx]::String tid, deps = taskdeps v = get!(tid_to_vertex, tid) do add_vertex!(g) diff --git a/ext/JSON3Ext.jl b/ext/JSON3Ext.jl index c5afddc41..d8ba0eb3e 100644 --- a/ext/JSON3Ext.jl +++ b/ext/JSON3Ext.jl @@ -37,8 +37,7 @@ function logs_to_chrome_trace(logs::Dict) if !haskey(execution_logs, tid) execution_logs[tid] = Dict{Symbol,Any}() end - taskname = logs[w][:tasknames][start_idx] - fname = first(split(taskname, ' ')) + fname = logs[w][:taskfuncnames][start_idx] execution_logs[tid][:name] = fname end end @@ -63,4 +62,4 @@ function Dagger.show_logs(io::IO, logs::Dict, ::Val{:chrome_trace}) JSON3.write(io, logs_to_chrome_trace(logs)) end -end # module JSON3Ext \ No newline at end of file +end # module JSON3Ext diff --git a/ext/PlotsExt.jl b/ext/PlotsExt.jl index faff294e5..eb1454e72 100644 --- a/ext/PlotsExt.jl +++ b/ext/PlotsExt.jl @@ -35,8 +35,8 @@ function logs_to_df(logs::Dict, ::Val{:execution}; colors=_default_colors, name_ kind = logs[w][:core][idx].kind::Symbol if category == :add_thunk && kind == :start tid = logs[w][:id][idx].thunk_id::Int - if haskey(logs[w], :tasknames) - fn_names[tid] = first(split(logs[w][:tasknames][idx]::String, ' ')) + if haskey(logs[w], :taskfuncnames) + fn_names[tid] = logs[w][:taskfuncnames][idx]::String else @warn "Task names missing from logs" fn_names[tid] = "unknown" diff --git a/src/utils/logging-events.jl b/src/utils/logging-events.jl index 716c4a2b8..68b80b487 100644 --- a/src/utils/logging-events.jl +++ b/src/utils/logging-events.jl @@ -119,6 +119,24 @@ func_name(f::Function) = nameof(f) func_name(x) = repr(x) func_name(::Dagger.ExpandedBroadcast{F}) where F = Symbol('.', nameof(F)) +""" + TaskFunctionNames + +Records the function name of each task. +""" +struct TaskFunctionNames end +function (::TaskFunctionNames)(ev::Event{:start}) + if ev.category == :add_thunk + f = Dagger.chunktype(ev.timeline.f) + if hasproperty(f, :instance) && isdefined(f, :instance) + f = f.instance + end + return String(func_name(f)) + end + return +end +(td::TaskFunctionNames)(ev::Event{:finish}) = nothing + """ TaskArguments diff --git a/src/utils/logging.jl b/src/utils/logging.jl index 3a0a969de..62980993e 100644 --- a/src/utils/logging.jl +++ b/src/utils/logging.jl @@ -10,6 +10,7 @@ Extra events: - `timeline::Bool`: Enables raw "timeline" values, which are event-specific; not recommended except for debugging - `all_task_deps::Bool`: Enables all task dependency-related logging - `tasknames::Bool`: Enables generating unique task names for each task +- `taskfuncnames::Bool`: Enables reporting of task function names for each task - `taskdeps::Bool`: Enables reporting of upstream task dependencies (as task IDs) for each task argument - `taskargs::Bool`: Enables reporting of upstream non-task dependencies (as `objectid` hash) for each task argument - `taskargmoves::Bool`: Enables reporting of copies of upstream dependencies (as original and copy `objectid` hashes) for each task argument @@ -22,6 +23,7 @@ function enable_logging!(;metrics::Bool=true, timeline::Bool=false, all_task_deps::Bool=false, tasknames::Bool=true, + taskfuncnames::Bool=false, taskdeps::Bool=true, taskargs::Bool=false, taskargmoves::Bool=false, @@ -36,7 +38,7 @@ function enable_logging!(;metrics::Bool=true, ml[:timeline] = TimespanLogging.Events.TimelineMetrics() end if all_task_deps - tasknames = true + taskfuncnames = true taskdeps = true taskargs = true taskargmoves = true @@ -47,6 +49,9 @@ function enable_logging!(;metrics::Bool=true, if tasknames ml[:tasknames] = Dagger.Events.TaskNames() end + if taskfuncnames + ml[:taskfuncnames] = Dagger.Events.TaskFunctionNames() + end if taskdeps ml[:taskdeps] = Dagger.Events.TaskDependencies() end From 23f2e24117e0488a0e2fbe2603f11ca8d6288b10 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Tue, 20 Aug 2024 14:29:27 -0500 Subject: [PATCH 08/12] logging: Don't filter on thunk_id in logs_event_pairs --- src/utils/logging.jl | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/utils/logging.jl b/src/utils/logging.jl index 62980993e..3c8a7ed22 100644 --- a/src/utils/logging.jl +++ b/src/utils/logging.jl @@ -118,14 +118,12 @@ function logs_event_pairs(f, logs::Dict) continue end id::NamedTuple - if haskey(id, :thunk_id) - event_key = (category, id) - if kind == :start - running_events[event_key] = idx - else - event_start_idx = running_events[event_key] - f(w, event_start_idx, idx) - end + event_key = (category, id) + if kind == :start + running_events[event_key] = idx + else + event_start_idx = running_events[event_key] + f(w, event_start_idx, idx) end end end From fb91d85c5f587f351738cecf86cad25a65e7a9fc Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Tue, 20 Aug 2024 14:30:14 -0500 Subject: [PATCH 09/12] Sch: Add sch uid to most logging events --- src/sch/Sch.jl | 66 +++++++++++++++++++++++++------------------------ test/logging.jl | 4 ++- 2 files changed, 37 insertions(+), 33 deletions(-) diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index 3dc6cef9f..facb41a56 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -318,7 +318,7 @@ const WORKER_MONITOR_TASKS = Dict{Int,Task}() const WORKER_MONITOR_CHANS = Dict{Int,Dict{UInt64,RemoteChannel}}() function init_proc(state, p, log_sink) ctx = Context(Int[]; log_sink) - timespan_start(ctx, :init_proc, (;worker=p.pid), nothing) + timespan_start(ctx, :init_proc, (;uid=state.uid, worker=p.pid), nothing) # Initialize pressure and capacity gproc = OSProc(p.pid) lock(state.lock) do @@ -383,7 +383,7 @@ function init_proc(state, p, log_sink) # Setup dynamic listener dynamic_listener!(ctx, state, p.pid) - timespan_finish(ctx, :init_proc, (;worker=p.pid), nothing) + timespan_finish(ctx, :init_proc, (;uid=state.uid, worker=p.pid), nothing) end function _cleanup_proc(uid, log_sink) empty!(CHUNK_CACHE) # FIXME: Should be keyed on uid! @@ -399,7 +399,7 @@ end function cleanup_proc(state, p, log_sink) ctx = Context(Int[]; log_sink) wid = p.pid - timespan_start(ctx, :cleanup_proc, (;worker=wid), nothing) + timespan_start(ctx, :cleanup_proc, (;uid=state.uid, worker=wid), nothing) lock(WORKER_MONITOR_LOCK) do if haskey(WORKER_MONITOR_CHANS, wid) delete!(WORKER_MONITOR_CHANS[wid], state.uid) @@ -419,7 +419,7 @@ function cleanup_proc(state, p, log_sink) end end - timespan_finish(ctx, :cleanup_proc, (;worker=wid), nothing) + timespan_finish(ctx, :cleanup_proc, (;uid=state.uid, worker=wid), nothing) end "Process-local condition variable (and lock) indicating task completion." @@ -467,24 +467,24 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) master = OSProc(myid()) - timespan_start(ctx, :scheduler_init, nothing, master) + timespan_start(ctx, :scheduler_init, (;uid=state.uid), master) try scheduler_init(ctx, state, d, options, deps) finally - timespan_finish(ctx, :scheduler_init, nothing, master) + timespan_finish(ctx, :scheduler_init, (;uid=state.uid), master) end value, errored = try scheduler_run(ctx, state, d, options) finally # Always try to tear down the scheduler - timespan_start(ctx, :scheduler_exit, nothing, master) + timespan_start(ctx, :scheduler_exit, (;uid=state.uid), master) try scheduler_exit(ctx, state, options) catch err @error "Error when tearing down scheduler" exception=(err,catch_backtrace()) finally - timespan_finish(ctx, :scheduler_exit, nothing, master) + timespan_finish(ctx, :scheduler_exit, (;uid=state.uid), master) end end @@ -545,10 +545,10 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options) check_integrity(ctx) isempty(state.running) && continue - timespan_start(ctx, :take, nothing, nothing) + timespan_start(ctx, :take, (;uid=state.uid), nothing) @dagdebug nothing :take "Waiting for results" chan_value = take!(state.chan) # get result of completed thunk - timespan_finish(ctx, :take, nothing, nothing) + timespan_finish(ctx, :take, (;uid=state.uid), nothing) if chan_value isa RescheduleSignal continue end @@ -563,13 +563,13 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options) @warn "Worker $(pid) died, rescheduling work" # Remove dead worker from procs list - timespan_start(ctx, :remove_procs, (;worker=pid), nothing) + timespan_start(ctx, :remove_procs, (;uid=state.uid, worker=pid), nothing) remove_dead_proc!(ctx, state, gproc) - timespan_finish(ctx, :remove_procs, (;worker=pid), nothing) + timespan_finish(ctx, :remove_procs, (;uid=state.uid, worker=pid), nothing) - timespan_start(ctx, :handle_fault, (;worker=pid), nothing) + timespan_start(ctx, :handle_fault, (;uid=state.uid, worker=pid), nothing) handle_fault(ctx, state, gproc) - timespan_finish(ctx, :handle_fault, (;worker=pid), nothing) + timespan_finish(ctx, :handle_fault, (;uid=state.uid, worker=pid), nothing) return # effectively `continue` else if something(ctx.options.allow_errors, false) || @@ -604,9 +604,9 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options) end end - timespan_start(ctx, :finish, (;thunk_id), (;thunk_id, result=res)) + timespan_start(ctx, :finish, (;uid=state.uid, thunk_id), (;thunk_id, result=res)) finish_task!(ctx, state, node, thunk_failed) - timespan_finish(ctx, :finish, (;thunk_id), (;thunk_id, result=res)) + timespan_finish(ctx, :finish, (;uid=state.uid, thunk_id), (;thunk_id, result=res)) delete_unused_tasks!(state) end @@ -691,13 +691,13 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) task = nothing @label pop_task if task !== nothing - timespan_finish(ctx, :schedule, (;thunk_id=task.id), (;thunk_id=task.id)) + timespan_finish(ctx, :schedule, (;uid=state.uid, thunk_id=task.id), (;thunk_id=task.id)) end if isempty(state.ready) @goto fire_tasks end task = pop!(state.ready) - timespan_start(ctx, :schedule, (;thunk_id=task.id), (;thunk_id=task.id)) + timespan_start(ctx, :schedule, (;uid=state.uid, thunk_id=task.id), (;thunk_id=task.id)) if haskey(state.cache, task) if haskey(state.errored, task) # An error was eagerly propagated to this task @@ -887,7 +887,7 @@ function monitor_procs_changed!(ctx, state) wait(ctx.proc_notify) end - timespan_start(ctx, :assign_procs, nothing, nothing) + timespan_start(ctx, :assign_procs, (;uid=state.uid), nothing) # Load new set of procs new_ps = procs_to_use(ctx) @@ -915,7 +915,7 @@ function monitor_procs_changed!(ctx, state) end end - timespan_finish(ctx, :assign_procs, nothing, nothing) + timespan_finish(ctx, :assign_procs, (;uid=state.uid), nothing) old_ps = new_ps end end @@ -1085,8 +1085,9 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state) # know which task failed. tasks = Task[] for ts in to_send + # TODO: errormonitor task = Threads.@spawn begin - timespan_start(ctx, :fire, (;worker=gproc.pid), nothing) + timespan_start(ctx, :fire, (;uid=state.uid, worker=gproc.pid), nothing) try remotecall_wait(do_tasks, gproc.pid, proc, state.chan, [ts]); catch err @@ -1094,7 +1095,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state) thunk_id = ts[1] put!(state.chan, (gproc.pid, proc, thunk_id, (CapturedException(err, bt), nothing))) finally - timespan_finish(ctx, :fire, (;worker=gproc.pid), nothing) + timespan_finish(ctx, :fire, (;uid=state.uid, worker=gproc.pid), nothing) end end end @@ -1212,6 +1213,7 @@ proc_has_occupancy(proc_occupancy, task_occupancy) = function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, return_queue::RemoteChannel) to_proc = istate.proc proc_run_task = @task begin + # FIXME: Context changes aren't noticed over time ctx = istate.ctx tasks = istate.tasks proc_occupancy = istate.proc_occupancy @@ -1223,12 +1225,12 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re # Wait for new tasks if !work_to_do @dagdebug nothing :processor "Waiting for tasks" - timespan_start(ctx, :proc_run_wait, (;worker=wid, processor=to_proc), nothing) + timespan_start(ctx, :proc_run_wait, (;uid, worker=wid, processor=to_proc), nothing) wait(istate.reschedule) @static if VERSION >= v"1.9" reset(istate.reschedule) end - timespan_finish(ctx, :proc_run_wait, (;worker=wid, processor=to_proc), nothing) + timespan_finish(ctx, :proc_run_wait, (;uid, worker=wid, processor=to_proc), nothing) if istate.done[] return end @@ -1236,7 +1238,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re # Fetch a new task to execute @dagdebug nothing :processor "Trying to dequeue" - timespan_start(ctx, :proc_run_fetch, (;worker=wid, processor=to_proc), nothing) + timespan_start(ctx, :proc_run_fetch, (;uid, worker=wid, processor=to_proc), nothing) work_to_do = false task_and_occupancy = lock(istate.queue) do queue # Only steal if there are multiple queued tasks, to prevent @@ -1255,7 +1257,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re return queue_result end if task_and_occupancy === nothing - timespan_finish(ctx, :proc_run_fetch, (;worker=wid, processor=to_proc), nothing) + timespan_finish(ctx, :proc_run_fetch, (;uid, worker=wid, processor=to_proc), nothing) @dagdebug nothing :processor "Failed to dequeue" @@ -1270,7 +1272,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re @dagdebug nothing :processor "Trying to steal" # Try to steal a task - timespan_start(ctx, :steal_local, (;worker=wid, processor=to_proc), nothing) + timespan_start(ctx, :proc_steal_local, (;uid, worker=wid, processor=to_proc), nothing) # Try to steal from local queues randomly # TODO: Prioritize stealing from busiest processors @@ -1305,12 +1307,12 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re from_proc = other_istate.proc thunk_id = task[1] @dagdebug thunk_id :processor "Stolen from $from_proc by $to_proc" - timespan_finish(ctx, :steal_local, (;worker=wid, processor=to_proc), (;from_proc, thunk_id)) + timespan_finish(ctx, :proc_steal_local, (;uid, worker=wid, processor=to_proc), (;from_proc, thunk_id)) # TODO: Keep stealing until we hit full occupancy? @goto execute end end - timespan_finish(ctx, :steal_local, (;worker=wid, processor=to_proc), nothing) + timespan_finish(ctx, :proc_steal_local, (;uid, worker=wid, processor=to_proc), nothing) # TODO: Try to steal from remote queues @@ -1322,7 +1324,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re task = task_spec[] thunk_id = task[1] time_util = task[2] - timespan_finish(ctx, :proc_run_fetch, (;worker=wid, processor=to_proc), (;thunk_id, proc_occupancy=proc_occupancy[], task_occupancy)) + timespan_finish(ctx, :proc_run_fetch, (;uid, worker=wid, processor=to_proc), (;thunk_id, proc_occupancy=proc_occupancy[], task_occupancy)) @dagdebug thunk_id :processor "Dequeued task" # Execute the task and return its result @@ -1423,7 +1425,7 @@ function do_tasks(to_proc, return_queue, tasks) for task in tasks thunk_id = task[1] occupancy = task[4] - timespan_start(ctx, :enqueue, (;processor=to_proc, thunk_id), nothing) + timespan_start(ctx, :enqueue, (;uid, processor=to_proc, thunk_id), nothing) should_launch = lock(TASK_SYNC) do # Already running; don't try to re-launch if !(thunk_id in TASKS_RUNNING) @@ -1435,7 +1437,7 @@ function do_tasks(to_proc, return_queue, tasks) end should_launch || continue enqueue!(queue, TaskSpecKey(task), occupancy) - timespan_finish(ctx, :enqueue, (;processor=to_proc, thunk_id), nothing) + timespan_finish(ctx, :enqueue, (;uid, processor=to_proc, thunk_id), nothing) @dagdebug thunk_id :processor "Enqueued task" end end diff --git a/test/logging.jl b/test/logging.jl index 9a12cf7ff..13724ac45 100644 --- a/test/logging.jl +++ b/test/logging.jl @@ -100,7 +100,9 @@ import Colors, GraphViz, DataFrames, Plots, JSON3 for idx in 1:length(logs[w][:core]) category = logs[w][:core][idx].category if category in (:scheduler_init, :scheduler_exit, :take, :assign_procs) - @test logs[w][:id][idx] === nothing + @test logs[w][:id][idx] isa NamedTuple + @test haskey(logs[w][:id][idx], :uid) + @test logs[w][:id][idx].uid isa UInt end if category in (:add_thunk, :schedule, :finish, :move, :compute, :storage_wait, :storage_safe_scan, :enqueue) @test logs[w][:id][idx] isa NamedTuple From 68710dbc9ae372ee104258c74c1740928478bcf0 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Wed, 21 Aug 2024 06:24:03 -0500 Subject: [PATCH 10/12] logs_event_pairs: Don't require matching start --- src/utils/logging.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/logging.jl b/src/utils/logging.jl index 3c8a7ed22..4ab98c79b 100644 --- a/src/utils/logging.jl +++ b/src/utils/logging.jl @@ -121,7 +121,7 @@ function logs_event_pairs(f, logs::Dict) event_key = (category, id) if kind == :start running_events[event_key] = idx - else + elseif haskey(running_events, event_key) event_start_idx = running_events[event_key] f(w, event_start_idx, idx) end From 27c410079e0755bfdcb51668ac9c55dde0c206d8 Mon Sep 17 00:00:00 2001 From: Mateusz Jakub Fila Date: Mon, 19 Aug 2024 08:40:35 +0200 Subject: [PATCH 11/12] add show_logs for graphviz --- docs/src/logging-visualization.md | 1 + ext/GraphVizExt.jl | 43 +++++++++++++++++++++++++++---- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/docs/src/logging-visualization.md b/docs/src/logging-visualization.md index 928f816ae..f6717c0ba 100644 --- a/docs/src/logging-visualization.md +++ b/docs/src/logging-visualization.md @@ -18,6 +18,7 @@ converted to a `Val` for dispatch purposes (i.e. `render_logs(logs::Dict, :myrenderer)` -> `render_logs(logs, Val{:myrenderer}())`). Built-in `IO` support exists for: +- `show_logs(io, logs, :graphviz)` to write a Graphviz dot graph of executed tasks and their dependencies - `show_logs(io, logs, :chrome_trace)` to write a task execution timeline in the chrome-trace format (view in [perfetto web UI](https://ui.perfetto.dev/) or `about:tracing` in a chrome-based browser) Built-in rendering support exists for: diff --git a/ext/GraphVizExt.jl b/ext/GraphVizExt.jl index 1b41f9fa1..3169d2859 100644 --- a/ext/GraphVizExt.jl +++ b/ext/GraphVizExt.jl @@ -38,6 +38,30 @@ tab20_colors = [ ] _default_colors = tab20_colors +""" + Dagger.show_logs(io::IO, logs::Dict, ::Val{:graphviz}; disconnected=false, + color_by=:fn, times::Bool=true, times_digits::Integer=3) + +Write a graph of the task dependencies and data dependencies in `logs` to dot format + +Requires the following events enabled in `enable_logging!`: `taskdeps`, `tasknames`, `taskargs`, `taskargmoves` + +Options: +- `disconnected`: If `true`, render disconnected vertices (tasks or arguments without upstream/downstream dependencies) +- `color_by`: How to color tasks; if `:fn`, then color by unique function name, if `:proc`, then color by unique processor +- `times`: If `true`, annotate each task with its start and finish times +- `times_digits`: Number of digits to display in the time annotations +- `colors`: A list of colors to use for coloring tasks +- `name_to_color`: A function that maps task names to colors +""" +function Dagger.show_logs(io::IO, logs::Dict, ::Val{:graphviz}; disconnected=false, + color_by=:fn, times::Bool=true, times_digits::Integer=3, + colors=_default_colors, name_to_color=_name_to_color) + dot = logs_to_dot(logs; disconnected, times, times_digits, + color_by, colors, name_to_color) + println(io, dot) +end + """ Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, color_by=:fn, layout_engine="dot", @@ -50,14 +74,26 @@ Requires the `all_task_deps` event enabled in `enable_logging!` Options: - `disconnected`: If `true`, render disconnected vertices (tasks or arguments without upstream/downstream dependencies) - `color_by`: How to color tasks; if `:fn`, then color by unique function name, if `:proc`, then color by unique processor -- `layout_engine`: The layout engine to use for GraphViz +- `layout_engine`: The layout engine to use for GraphViz rendering - `times`: If `true`, annotate each task with its start and finish times - `times_digits`: Number of digits to display in the time annotations +- `colors`: A list of colors to use for coloring tasks +- `name_to_color`: A function that maps task names to colors """ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, color_by=:fn, layout_engine="dot", times::Bool=true, times_digits::Integer=3, colors=_default_colors, name_to_color=_name_to_color) + dot = logs_to_dot(logs; disconnected, times, times_digits, + color_by, colors, name_to_color) + gv = GraphViz.Graph(dot) + GraphViz.layout!(gv; engine=layout_engine) + return gv +end + +function logs_to_dot(logs::Dict; disconnected=false, color_by=:fn, + times::Bool=true, times_digits::Integer=3, + colors=_default_colors, name_to_color=_name_to_color) # Lookup all relevant task/argument dependencies and values in logs g = SimpleDiGraph() @@ -397,10 +433,7 @@ function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, # Generate the final graph str *= "}\n" - gv = GraphViz.Graph(str) - GraphViz.layout!(gv; engine=layout_engine) - - return gv + return str end end From 09a9cf29c4f97aa728b581d85e947ccdbdd4a614 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Wed, 21 Aug 2024 12:29:10 -0500 Subject: [PATCH 12/12] logging/viz: Move `logs_to_dot` and utils into Dagger Co-Authored-By: Mateusz Jakub Fila --- ext/GraphVizExt.jl | 414 +------------------------------------------ ext/PlotsExt.jl | 38 ++-- src/Dagger.jl | 5 +- src/utils/viz.jl | 406 ++++++++++++++++++++++++++++++++++++++++++ src/visualization.jl | 2 +- 5 files changed, 438 insertions(+), 427 deletions(-) create mode 100644 src/utils/viz.jl diff --git a/ext/GraphVizExt.jl b/ext/GraphVizExt.jl index 3169d2859..d701ade54 100644 --- a/ext/GraphVizExt.jl +++ b/ext/GraphVizExt.jl @@ -7,65 +7,13 @@ else end import Dagger -import Dagger: DTask, Chunk, Processor, LoggedMutableObject -import Dagger.TimespanLogging: Timespan -import Graphs: SimpleDiGraph, add_edge!, add_vertex!, inneighbors, outneighbors, vertices, is_directed, edges, nv, src, dst - -function pretty_time(t; digits=3) - r(t) = round(t; digits) - if t > 1000^3 - "$(r(t/(1000^3))) s" - elseif t > 1000^2 - "$(r(t/(1000^2))) ms" - elseif t > 1000 - "$(r(t/1000)) us" - else - "$(r(t)) ns" - end -end - -sanitize_label(label::String) = replace(label, "\"" => "\\\"") - -_name_to_color(name::AbstractString, colors) = - colors[mod1(hash(name), length(colors))] -_name_to_color(name::AbstractString, ::Nothing) = "black" -tab20_colors = [ - "#1f77b4", "#aec7e8", "#ff7f0e", "#ffbb78", - "#2ca02c", "#98df8a", "#d62728", "#ff9896", - "#9467bd", "#c5b0d5", "#8c564b", "#c49c94", - "#e377c2", "#f7b6d2", "#7f7f7f", "#c7c7c7", - "#bcbd22", "#dbdb8d", "#17becf", "#9edae5" -] -_default_colors = tab20_colors """ - Dagger.show_logs(io::IO, logs::Dict, ::Val{:graphviz}; disconnected=false, - color_by=:fn, times::Bool=true, times_digits::Integer=3) - -Write a graph of the task dependencies and data dependencies in `logs` to dot format - -Requires the following events enabled in `enable_logging!`: `taskdeps`, `tasknames`, `taskargs`, `taskargmoves` - -Options: -- `disconnected`: If `true`, render disconnected vertices (tasks or arguments without upstream/downstream dependencies) -- `color_by`: How to color tasks; if `:fn`, then color by unique function name, if `:proc`, then color by unique processor -- `times`: If `true`, annotate each task with its start and finish times -- `times_digits`: Number of digits to display in the time annotations -- `colors`: A list of colors to use for coloring tasks -- `name_to_color`: A function that maps task names to colors -""" -function Dagger.show_logs(io::IO, logs::Dict, ::Val{:graphviz}; disconnected=false, - color_by=:fn, times::Bool=true, times_digits::Integer=3, - colors=_default_colors, name_to_color=_name_to_color) - dot = logs_to_dot(logs; disconnected, times, times_digits, - color_by, colors, name_to_color) - println(io, dot) -end - -""" - Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, - color_by=:fn, layout_engine="dot", - times::Bool=true, times_digits::Integer=3) + render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, + color_by=:fn, layout_engine="dot", + times::Bool=true, times_digits::Integer=3, + colors=Dagger.Viz.default_colors, + name_to_color=Dagger.Viz.name_to_color) Render a graph of the task dependencies and data dependencies in `logs` using GraphViz. @@ -83,357 +31,13 @@ Options: function Dagger.render_logs(logs::Dict, ::Val{:graphviz}; disconnected=false, color_by=:fn, layout_engine="dot", times::Bool=true, times_digits::Integer=3, - colors=_default_colors, name_to_color=_name_to_color) - dot = logs_to_dot(logs; disconnected, times, times_digits, - color_by, colors, name_to_color) + colors=Dagger.Viz.default_colors, + name_to_color=Dagger.Viz.name_to_color) + dot = Dagger.Viz.logs_to_dot(logs; disconnected, times, times_digits, + color_by, colors, name_to_color) gv = GraphViz.Graph(dot) GraphViz.layout!(gv; engine=layout_engine) return gv end -function logs_to_dot(logs::Dict; disconnected=false, color_by=:fn, - times::Bool=true, times_digits::Integer=3, - colors=_default_colors, name_to_color=_name_to_color) - # Lookup all relevant task/argument dependencies and values in logs - g = SimpleDiGraph() - - tid_to_vertex = Dict{Int,Int}() - tid_to_auto_name = Dict{Int,String}() - tid_to_name = Dict{Int,String}() - tid_to_proc = Dict{Int,Processor}() - - objid_to_vertex = Dict{UInt,Int}() - objid_to_name = Dict{UInt,String}() - - task_args = Dict{Int,Vector{Pair{Union{Int,Symbol},UInt}}}() - task_arg_moves = Dict{Int,Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}}() - task_result = Dict{Int,UInt}() - - uid_to_tid = Dict{UInt,Int}() - dtasks_to_patch = Set{UInt}() - - for w in keys(logs) - for idx in 1:length(logs[w][:core]) - category = logs[w][:core][idx].category - kind = logs[w][:core][idx].kind - id = logs[w][:id][idx] - if category == :add_thunk && kind == :start - id::NamedTuple - taskdeps = logs[w][:taskdeps][idx]::Pair{Int,Vector{Int}} - taskname = logs[w][:taskfuncnames][idx]::String - tid, deps = taskdeps - v = get!(tid_to_vertex, tid) do - add_vertex!(g) - tid_to_vertex[tid] = nv(g) - nv(g) - end - tid_to_auto_name[tid] = taskname - for dep in deps - dep_v = get!(tid_to_vertex, dep) do - add_vertex!(g) - tid_to_vertex[dep] = nv(g) - nv(g) - end - add_edge!(g, dep_v, v) - end - if haskey(logs[w], :taskuidtotid) - uid_tid = logs[w][:taskuidtotid][idx] - if uid_tid !== nothing - uid, tid = uid_tid::Pair{UInt,Int} - uid_to_tid[uid] = tid - end - end - elseif category == :compute && kind == :start - id::NamedTuple - tid = id.thunk_id - proc = id.processor - tid_to_proc[tid] = proc - elseif category == :compute && kind == :finish - if haskey(logs[w], :taskresult) - result_info = logs[w][:taskresult][idx] - result_info === nothing && continue - tid, obj = result_info::Pair{Int,LoggedMutableObject} - objid = obj.objid - task_result[tid] = objid - tid_v = get!(tid_to_vertex, tid) do - add_vertex!(g) - tid_to_vertex[tid] = nv(g) - nv(g) - end - v = get!(objid_to_vertex, objid) do - add_vertex!(g) - objid_to_vertex[objid] = nv(g) - nv(g) - end - add_edge!(g, tid_v, v) - end - elseif category == :move && kind == :finish - if haskey(logs[w], :taskargs) - tid, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector} - args = map(arg->arg[1]=>arg[2].objid, args) - append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, tid), args) - for arg in args - objid = arg[2] - arg_id = get!(objid_to_vertex, objid) do - add_vertex!(g) - objid_to_vertex[objid] = nv(g) - nv(g) - end - if tid != 0 - tid_v = get!(tid_to_vertex, tid) do - add_vertex!(g) - tid_to_vertex[tid] = nv(g) - nv(g) - end - add_edge!(g, arg_id, tid_v) - end - end - end - if haskey(logs[w], :taskargmoves) - move_info = logs[w][:taskargmoves][idx] - move_info === nothing && continue - tid, pos, pre_obj, post_obj = move_info - v = get!(Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}, task_arg_moves, tid) - pre_objid = pre_obj.objid - post_objid = post_obj.objid - push!(v, pos => (pre_objid, post_objid)) - pre_arg_id = get!(objid_to_vertex, pre_objid) do - add_vertex!(g) - objid_to_vertex[pre_objid] = nv(g) - nv(g) - end - post_arg_id = get!(objid_to_vertex, post_objid) do - add_vertex!(g) - objid_to_vertex[post_objid] = nv(g) - nv(g) - end - add_edge!(g, pre_arg_id, post_arg_id) - end - elseif category == :data_annotation && kind == :start - id::NamedTuple - name = String(id.name) - obj = id.objectid::LoggedMutableObject - objid = obj.objid - objid_to_name[objid] = name - if obj.kind == :task - # N.B. We don't need the object vertex, - # since we'll just render it as a task - push!(dtasks_to_patch, objid) - else - get!(objid_to_vertex, objid) do - add_vertex!(g) - objid_to_vertex[objid] = nv(g) - nv(g) - end - end - elseif category == :finish && kind == :finish - if haskey(logs[w], :tasktochunk) - tid_chunk = logs[w][:tasktochunk][idx] - if tid_chunk !== nothing - tid, chunk_obj = tid_chunk::Pair{Int,LoggedMutableObject} - chunk_id = chunk_obj.objid - v = get!(objid_to_vertex, chunk_id) do - add_vertex!(g) - objid_to_vertex[chunk_id] = nv(g) - nv(g) - end - add_edge!(g, tid_to_vertex[tid], v) - end - end - end - end - end - - # Process DTasks-to-Thunk mappings - for uid in dtasks_to_patch - if haskey(uid_to_tid, uid) - tid = uid_to_tid[uid] - v = get!(tid_to_vertex, tid) do - add_vertex!(g) - tid_to_vertex[tid] = nv(g) - nv(g) - end - - # Fixup any missing tid data - if haskey(objid_to_name, uid) - tid_to_name[tid] = objid_to_name[uid] - end - end - end - - # Auto-assign names - for (tid, name) in tid_to_auto_name - if !haskey(tid_to_name, tid) - tid_to_name[tid] = name - end - end - - # Create reverse mappings - vertex_to_tid = Dict{Int,Int}(v=>k for (k,v) in tid_to_vertex) - vertex_to_objid = Dict{Int,UInt}(v=>k for (k,v) in objid_to_vertex) - - # Find all connected and disconnected vertices - if !disconnected - discon_vs = filter(v->isempty(inneighbors(g, v)) && isempty(outneighbors(g, v)), vertices(g)) - con_vs = filter(v->!in(v, discon_vs), vertices(g)) - else - con_vs = vertices(g) - end - - if times - # Determine per-worker start times - worker_start_times = Dict{Int,UInt64}() - for w in keys(logs) - start = typemax(UInt64) - for idx in 1:length(logs[w][:core]) - if logs[w][:core][idx].category == :compute && logs[w][:core][idx].kind == :start - tid = logs[w][:id][idx].thunk_id - haskey(tid_to_vertex, tid) || continue - id = tid_to_vertex[tid] - id in con_vs || continue - start = min(start, logs[w][:core][idx].timestamp) - end - end - worker_start_times[w] = start - end - - # Determine per-task start and finish times - start_times = Dict{Int,UInt64}() - finish_times = Dict{Int,UInt64}() - for w in keys(logs) - start = typemax(UInt64) - for idx in 1:length(logs[w][:core]) - if logs[w][:core][idx].category == :compute - tid = logs[w][:id][idx].thunk_id - if logs[w][:core][idx].kind == :start - start_times[tid] = logs[w][:core][idx].timestamp - worker_start_times[w] - else - finish_times[tid] = logs[w][:core][idx].timestamp - worker_start_times[w] - end - end - end - end - end - - # Get the set of all unique task and object IDs - all_tids = collect(keys(tid_to_vertex)) - all_objids = collect(keys(objid_to_vertex)) - - # Assign colors - if color_by == :fn - all_fns = unique(values(tid_to_name)) - _colors = [name_to_color(all_fns[i], colors) for i in 1:length(all_fns)] - colors = Dict(tid=>_colors[findfirst(fn->occursin(fn, tid_to_name[tid]), all_fns)] for tid in all_tids) - elseif color_by == :proc - all_procs = unique(values(tid_to_proc)) - _colors = [name_to_color(string(all_procs[i]), colors) for i in 1:length(all_procs)] - colors = Dict(tid=>_colors[findfirst(proc->proc==tid_to_proc[tid], all_procs)] for tid in all_tids) - else - throw(ArgumentError("Unknown `color_by` value: $color_by\nAllowed: :fn, :proc")) - end - - str = is_directed(g) ? "digraph mygraph {\n" : "graph mygraph {\n" - - # Add task vertices - for tid in all_tids - v = tid_to_vertex[tid] - if !disconnected && (v in discon_vs) - continue - end - label_str = tid_to_name[tid] - if haskey(tid_to_auto_name, tid) && tid_to_name[tid] != tid_to_auto_name[tid] - label_str *= "\\nTask: $(tid_to_auto_name[tid])" - end - color = colors[tid] - proc = tid_to_proc[tid] - label_str *= "\\n($(Dagger.short_name(tid_to_proc[tid])))" - if times - start_time = pretty_time(start_times[tid]; digits=times_digits) - finish_time = pretty_time(finish_times[tid]; digits=times_digits) - diff_time = pretty_time(finish_times[tid] - start_times[tid]; digits=times_digits) - label_str *= "\\n[+$start_time -> +$finish_time (diff: $diff_time)]" - end - label_str = sanitize_label(label_str) - str *= "v$v [label=\"$label_str\", shape=box, color=\"$color\", penwidth=2.0]\n" - end - - # Add object vertices - for objid in all_objids - objid_v = objid_to_vertex[objid] - if !disconnected && !(objid_v in con_vs) - continue - end - if objid in dtasks_to_patch || haskey(uid_to_tid, objid) - # DTask, skip it - continue - end - # Object - if haskey(objid_to_name, objid) - label = sanitize_label(objid_to_name[objid]) - label *= "\\nData: $(repr(objid))" - else - label = "Data: $(repr(objid))" - end - str *= "a$objid_v [label=\"$label\", shape=oval]\n" - end - - # Add task argument move edges - seen_moves = Set{Tuple{UInt,UInt}}() - for (tid, moves) in task_arg_moves - for (pos, (pre_objid, post_objid)) in moves - pre_objid == post_objid && continue - (pre_objid, post_objid) in seen_moves && continue - push!(seen_moves, (pre_objid, post_objid)) - pre_objid_v = objid_to_vertex[pre_objid] - post_objid_v = objid_to_vertex[post_objid] - move_str = "a$pre_objid_v -> a$post_objid_v [label=\"move\"]\n" - str *= move_str - end - end - - # Add task-to-task (syncdep) dependency edges - edge_sep = is_directed(g) ? "->" : "--" - for edge in edges(g) - if !haskey(vertex_to_tid, src(edge)) || !haskey(vertex_to_tid, dst(edge)) - continue - end - if !disconnected && !(src(edge) in con_vs) || !(dst(edge) in con_vs) - continue - end - # FIXME: Label syncdeps with associated arguments and datadeps directions - str *= "v$(src(edge)) $edge_sep v$(dst(edge)) [label=\"syncdep\"]\n" - end - - # Add task argument edges - for (tid, args) in task_args - haskey(tid_to_vertex, tid) || continue - tid_v = tid_to_vertex[tid] - tid_v in con_vs || continue - for (pos, arg) in args - arg_v = objid_to_vertex[arg] - if !disconnected && !(arg_v in con_vs) - continue - end - arg_str = sanitize_label(pos isa Int ? "arg $pos" : "kwarg $pos") - str *= "a$arg_v $edge_sep v$tid_v [label=\"$arg_str\"]\n" - end - end - - # Add task result edges - for (tid, result) in task_result - haskey(tid_to_vertex, tid) || continue - tid_v = tid_to_vertex[tid] - tid_v in con_vs || continue - result_v = objid_to_vertex[result] - if !disconnected && !(result_v in con_vs) - continue - end - str *= "v$tid_v $edge_sep a$result_v [label=\"result\"]\n" - end - - # Generate the final graph - str *= "}\n" - return str -end - end diff --git a/ext/PlotsExt.jl b/ext/PlotsExt.jl index eb1454e72..7e2acaf86 100644 --- a/ext/PlotsExt.jl +++ b/ext/PlotsExt.jl @@ -12,19 +12,8 @@ import Dagger import Dagger: DTask, Chunk, Processor import Dagger.TimespanLogging: Timespan -_name_to_color(name::AbstractString, colors) = - colors[mod1(hash(name), length(colors))] -_name_to_color(name::AbstractString, ::Nothing) = "black" -tab20_colors = [ - "#1f77b4", "#aec7e8", "#ff7f0e", "#ffbb78", - "#2ca02c", "#98df8a", "#d62728", "#ff9896", - "#9467bd", "#c5b0d5", "#8c564b", "#c49c94", - "#e377c2", "#f7b6d2", "#7f7f7f", "#c7c7c7", - "#bcbd22", "#dbdb8d", "#17becf", "#9edae5" -] -_default_colors = tab20_colors - -function logs_to_df(logs::Dict, ::Val{:execution}; colors=_default_colors, name_to_color=_name_to_color, color_by=:fn) +function logs_to_df(logs::Dict, ::Val{:execution}; + colors, name_to_color, color_by) # Generate function names fn_names = Dict{Int, String}() dtask_names = Dict{UInt, String}() @@ -91,7 +80,8 @@ function logs_to_df(logs::Dict, ::Val{:execution}; colors=_default_colors, name_ end return df end -function logs_to_df(logs::Dict, ::Val{:processor}; colors=_default_colors, name_to_color=_name_to_color, kwargs...) +function logs_to_df(logs::Dict, ::Val{:processor}; + colors, name_to_color, kwargs...) # Collect processor events # FIXME: Color eltype df = DataFrame(proc=Processor[], proc_name=String[], category=String[], t_start=UInt64[], t_end=UInt64[], color=Any[]) @@ -109,7 +99,8 @@ function logs_to_df(logs::Dict, ::Val{:processor}; colors=_default_colors, name_ end return df end -function logs_to_df(logs::Dict, ::Val{:scheduler}; colors=_default_colors, name_to_color=_name_to_color, kwargs...) +function logs_to_df(logs::Dict, ::Val{:scheduler}; + colors, name_to_color, kwargs...) # Collect scheduler events # FIXME: Color eltype df = DataFrame(category=String[], t_start=UInt64[], t_end=UInt64[], color=Any[]) @@ -131,13 +122,24 @@ logs_to_df(logs::Dict, ::Val{target}; kwargs...) where target = # Implementation adapted from: # https://discourse.julialang.org/t/how-to-make-a-gantt-plot-with-plots-jl/95165/7 """ - Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; kwargs...) + Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; + target=:execution, + colors, name_to_color, color_by=:fn, + kwargs...) + +Render a Gantt chart of task execution in `logs` using Plots. -Render a Gantt chart of task execution in `logs` using Plots. `kwargs` are passed to `plot` directly. +Keyword arguments affect rendering behavior: +- `target`: Which aspect of the logs to render. May be one of `:execution`, `:processor`, or `:scheduler`. +- `colors`: A list of colors to use for rendering. +- `name_to_color`: A function mapping names to colors. +- `color_by`: Whether to color by function name (`:fn`) or processor name (`:proc`). +- `kwargs` are passed to `plot` directly. """ function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt}; target=:execution, - colors=_default_colors, name_to_color=_name_to_color, + colors=Dagger.Viz.default_colors, + name_to_color=Dagger.Viz.name_to_color, color_by=:fn, kwargs...) df = logs_to_df(logs, Val{target}(); colors, name_to_color, color_by) y_elem = if target == :execution || target == :processor diff --git a/src/Dagger.jl b/src/Dagger.jl index d39f2f5e8..b478ece0f 100644 --- a/src/Dagger.jl +++ b/src/Dagger.jl @@ -88,14 +88,13 @@ include("array/cholesky.jl") include("array/lu.jl") include("array/random.jl") -# Visualization +# Logging and Visualization include("visualization.jl") include("ui/gantt-common.jl") include("ui/gantt-text.jl") - -# Logging include("utils/logging-events.jl") include("utils/logging.jl") +include("utils/viz.jl") # Precompilation import PrecompileTools: @compile_workload diff --git a/src/utils/viz.jl b/src/utils/viz.jl new file mode 100644 index 000000000..295fe8a36 --- /dev/null +++ b/src/utils/viz.jl @@ -0,0 +1,406 @@ +# Built-in visualization utilities + +module Viz + +import ..Dagger +import Dagger: DTask, Chunk, Processor, LoggedMutableObject +import Dagger: show_logs +import Graphs: SimpleDiGraph, add_edge!, add_vertex!, inneighbors, outneighbors, vertices, is_directed, edges, nv, src, dst + +function pretty_time(t; digits=3) + r(t) = round(t; digits) + if t > 1000^3 + "$(r(t/(1000^3))) s" + elseif t > 1000^2 + "$(r(t/(1000^2))) ms" + elseif t > 1000 + "$(r(t/1000)) us" + else + "$(r(t)) ns" + end +end + +sanitize_label(label::String) = replace(label, "\"" => "\\\"") + +name_to_color(name::AbstractString, colors) = + colors[mod1(hash(name), length(colors))] +name_to_color(name::AbstractString, ::Nothing) = "black" +tab20_colors = [ + "#1f77b4", "#aec7e8", "#ff7f0e", "#ffbb78", + "#2ca02c", "#98df8a", "#d62728", "#ff9896", + "#9467bd", "#c5b0d5", "#8c564b", "#c49c94", + "#e377c2", "#f7b6d2", "#7f7f7f", "#c7c7c7", + "#bcbd22", "#dbdb8d", "#17becf", "#9edae5" +] +default_colors = tab20_colors + +""" + show_logs(io::IO, logs::Dict, ::Val{:graphviz}; disconnected=false, + color_by=:fn, times::Bool=true, times_digits::Integer=3) + +Write a graph of the task dependencies and data dependencies in `logs` to dot format + +Requires the following events enabled in `enable_logging!`: `taskdeps`, `tasknames`, `taskargs`, `taskargmoves` + +Options: +- `disconnected`: If `true`, render disconnected vertices (tasks or arguments without upstream/downstream dependencies) +- `color_by`: How to color tasks; if `:fn`, then color by unique function name, if `:proc`, then color by unique processor +- `times`: If `true`, annotate each task with its start and finish times +- `times_digits`: Number of digits to display in the time annotations +- `colors`: A list of colors to use for coloring tasks +- `name_to_color`: A function that maps task names to colors +""" +function show_logs(io::IO, logs::Dict, ::Val{:graphviz}; disconnected=false, + color_by=:fn, times::Bool=true, times_digits::Integer=3, + colors=default_colors, name_to_color=name_to_color) + dot = logs_to_dot(logs; disconnected, times, times_digits, + color_by, colors, name_to_color) + println(io, dot) +end + +function logs_to_dot(logs::Dict; disconnected=false, color_by=:fn, + times::Bool=true, times_digits::Integer=3, + colors=default_colors, name_to_color=name_to_color) + # Lookup all relevant task/argument dependencies and values in logs + g = SimpleDiGraph() + + tid_to_vertex = Dict{Int,Int}() + tid_to_auto_name = Dict{Int,String}() + tid_to_name = Dict{Int,String}() + tid_to_proc = Dict{Int,Processor}() + + objid_to_vertex = Dict{UInt,Int}() + objid_to_name = Dict{UInt,String}() + + task_args = Dict{Int,Vector{Pair{Union{Int,Symbol},UInt}}}() + task_arg_moves = Dict{Int,Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}}() + task_result = Dict{Int,UInt}() + + uid_to_tid = Dict{UInt,Int}() + dtasks_to_patch = Set{UInt}() + + for w in keys(logs) + for idx in 1:length(logs[w][:core]) + category = logs[w][:core][idx].category + kind = logs[w][:core][idx].kind + id = logs[w][:id][idx] + if category == :add_thunk && kind == :start + id::NamedTuple + taskdeps = logs[w][:taskdeps][idx]::Pair{Int,Vector{Int}} + taskname = logs[w][:taskfuncnames][idx]::String + tid, deps = taskdeps + v = get!(tid_to_vertex, tid) do + add_vertex!(g) + tid_to_vertex[tid] = nv(g) + nv(g) + end + tid_to_auto_name[tid] = taskname + for dep in deps + dep_v = get!(tid_to_vertex, dep) do + add_vertex!(g) + tid_to_vertex[dep] = nv(g) + nv(g) + end + add_edge!(g, dep_v, v) + end + if haskey(logs[w], :taskuidtotid) + uid_tid = logs[w][:taskuidtotid][idx] + if uid_tid !== nothing + uid, tid = uid_tid::Pair{UInt,Int} + uid_to_tid[uid] = tid + end + end + elseif category == :compute && kind == :start + id::NamedTuple + tid = id.thunk_id + proc = id.processor + tid_to_proc[tid] = proc + elseif category == :compute && kind == :finish + if haskey(logs[w], :taskresult) + result_info = logs[w][:taskresult][idx] + result_info === nothing && continue + tid, obj = result_info::Pair{Int,LoggedMutableObject} + objid = obj.objid + task_result[tid] = objid + tid_v = get!(tid_to_vertex, tid) do + add_vertex!(g) + tid_to_vertex[tid] = nv(g) + nv(g) + end + v = get!(objid_to_vertex, objid) do + add_vertex!(g) + objid_to_vertex[objid] = nv(g) + nv(g) + end + add_edge!(g, tid_v, v) + end + elseif category == :move && kind == :finish + if haskey(logs[w], :taskargs) + tid, args = logs[w][:taskargs][idx]::Pair{Int,<:Vector} + args = map(arg->arg[1]=>arg[2].objid, args) + append!(get!(Vector{Pair{Union{Int,Symbol},UInt}}, task_args, tid), args) + for arg in args + objid = arg[2] + arg_id = get!(objid_to_vertex, objid) do + add_vertex!(g) + objid_to_vertex[objid] = nv(g) + nv(g) + end + if tid != 0 + tid_v = get!(tid_to_vertex, tid) do + add_vertex!(g) + tid_to_vertex[tid] = nv(g) + nv(g) + end + add_edge!(g, arg_id, tid_v) + end + end + end + if haskey(logs[w], :taskargmoves) + move_info = logs[w][:taskargmoves][idx] + move_info === nothing && continue + tid, pos, pre_obj, post_obj = move_info + v = get!(Vector{Pair{Union{Int,Symbol},Tuple{UInt,UInt}}}, task_arg_moves, tid) + pre_objid = pre_obj.objid + post_objid = post_obj.objid + push!(v, pos => (pre_objid, post_objid)) + pre_arg_id = get!(objid_to_vertex, pre_objid) do + add_vertex!(g) + objid_to_vertex[pre_objid] = nv(g) + nv(g) + end + post_arg_id = get!(objid_to_vertex, post_objid) do + add_vertex!(g) + objid_to_vertex[post_objid] = nv(g) + nv(g) + end + add_edge!(g, pre_arg_id, post_arg_id) + end + elseif category == :data_annotation && kind == :start + id::NamedTuple + name = String(id.name) + obj = id.objectid::LoggedMutableObject + objid = obj.objid + objid_to_name[objid] = name + if obj.kind == :task + # N.B. We don't need the object vertex, + # since we'll just render it as a task + push!(dtasks_to_patch, objid) + else + get!(objid_to_vertex, objid) do + add_vertex!(g) + objid_to_vertex[objid] = nv(g) + nv(g) + end + end + elseif category == :finish && kind == :finish + if haskey(logs[w], :tasktochunk) + tid_chunk = logs[w][:tasktochunk][idx] + if tid_chunk !== nothing + tid, chunk_obj = tid_chunk::Pair{Int,LoggedMutableObject} + chunk_id = chunk_obj.objid + v = get!(objid_to_vertex, chunk_id) do + add_vertex!(g) + objid_to_vertex[chunk_id] = nv(g) + nv(g) + end + add_edge!(g, tid_to_vertex[tid], v) + end + end + end + end + end + + # Process DTasks-to-Thunk mappings + for uid in dtasks_to_patch + if haskey(uid_to_tid, uid) + tid = uid_to_tid[uid] + v = get!(tid_to_vertex, tid) do + add_vertex!(g) + tid_to_vertex[tid] = nv(g) + nv(g) + end + + # Fixup any missing tid data + if haskey(objid_to_name, uid) + tid_to_name[tid] = objid_to_name[uid] + end + end + end + + # Auto-assign names + for (tid, name) in tid_to_auto_name + if !haskey(tid_to_name, tid) + tid_to_name[tid] = name + end + end + + # Create reverse mappings + vertex_to_tid = Dict{Int,Int}(v=>k for (k,v) in tid_to_vertex) + vertex_to_objid = Dict{Int,UInt}(v=>k for (k,v) in objid_to_vertex) + + # Find all connected and disconnected vertices + if !disconnected + discon_vs = filter(v->isempty(inneighbors(g, v)) && isempty(outneighbors(g, v)), vertices(g)) + con_vs = filter(v->!in(v, discon_vs), vertices(g)) + else + con_vs = vertices(g) + end + + if times + # Determine per-worker start times + worker_start_times = Dict{Int,UInt64}() + for w in keys(logs) + start = typemax(UInt64) + for idx in 1:length(logs[w][:core]) + if logs[w][:core][idx].category == :compute && logs[w][:core][idx].kind == :start + tid = logs[w][:id][idx].thunk_id + haskey(tid_to_vertex, tid) || continue + id = tid_to_vertex[tid] + id in con_vs || continue + start = min(start, logs[w][:core][idx].timestamp) + end + end + worker_start_times[w] = start + end + + # Determine per-task start and finish times + start_times = Dict{Int,UInt64}() + finish_times = Dict{Int,UInt64}() + for w in keys(logs) + start = typemax(UInt64) + for idx in 1:length(logs[w][:core]) + if logs[w][:core][idx].category == :compute + tid = logs[w][:id][idx].thunk_id + if logs[w][:core][idx].kind == :start + start_times[tid] = logs[w][:core][idx].timestamp - worker_start_times[w] + else + finish_times[tid] = logs[w][:core][idx].timestamp - worker_start_times[w] + end + end + end + end + end + + # Get the set of all unique task and object IDs + all_tids = collect(keys(tid_to_vertex)) + all_objids = collect(keys(objid_to_vertex)) + + # Assign colors + if color_by == :fn + all_fns = unique(values(tid_to_name)) + _colors = [name_to_color(all_fns[i], colors) for i in 1:length(all_fns)] + colors = Dict(tid=>_colors[findfirst(fn->occursin(fn, tid_to_name[tid]), all_fns)] for tid in all_tids) + elseif color_by == :proc + all_procs = unique(values(tid_to_proc)) + _colors = [name_to_color(string(all_procs[i]), colors) for i in 1:length(all_procs)] + colors = Dict(tid=>_colors[findfirst(proc->proc==tid_to_proc[tid], all_procs)] for tid in all_tids) + else + throw(ArgumentError("Unknown `color_by` value: $color_by\nAllowed: :fn, :proc")) + end + + str = is_directed(g) ? "digraph mygraph {\n" : "graph mygraph {\n" + + # Add task vertices + for tid in all_tids + v = tid_to_vertex[tid] + if !disconnected && (v in discon_vs) + continue + end + label_str = tid_to_name[tid] + if haskey(tid_to_auto_name, tid) && tid_to_name[tid] != tid_to_auto_name[tid] + label_str *= "\\nTask: $(tid_to_auto_name[tid])" + end + color = colors[tid] + proc = tid_to_proc[tid] + label_str *= "\\n($(Dagger.short_name(tid_to_proc[tid])))" + if times + start_time = pretty_time(start_times[tid]; digits=times_digits) + finish_time = pretty_time(finish_times[tid]; digits=times_digits) + diff_time = pretty_time(finish_times[tid] - start_times[tid]; digits=times_digits) + label_str *= "\\n[+$start_time -> +$finish_time (diff: $diff_time)]" + end + label_str = sanitize_label(label_str) + str *= "v$v [label=\"$label_str\", shape=box, color=\"$color\", penwidth=2.0]\n" + end + + # Add object vertices + for objid in all_objids + objid_v = objid_to_vertex[objid] + if !disconnected && !(objid_v in con_vs) + continue + end + if objid in dtasks_to_patch || haskey(uid_to_tid, objid) + # DTask, skip it + continue + end + # Object + if haskey(objid_to_name, objid) + label = sanitize_label(objid_to_name[objid]) + label *= "\\nData: $(repr(objid))" + else + label = "Data: $(repr(objid))" + end + str *= "a$objid_v [label=\"$label\", shape=oval]\n" + end + + # Add task argument move edges + seen_moves = Set{Tuple{UInt,UInt}}() + for (tid, moves) in task_arg_moves + for (pos, (pre_objid, post_objid)) in moves + pre_objid == post_objid && continue + (pre_objid, post_objid) in seen_moves && continue + push!(seen_moves, (pre_objid, post_objid)) + pre_objid_v = objid_to_vertex[pre_objid] + post_objid_v = objid_to_vertex[post_objid] + move_str = "a$pre_objid_v -> a$post_objid_v [label=\"move\"]\n" + str *= move_str + end + end + + # Add task-to-task (syncdep) dependency edges + edge_sep = is_directed(g) ? "->" : "--" + for edge in edges(g) + if !haskey(vertex_to_tid, src(edge)) || !haskey(vertex_to_tid, dst(edge)) + continue + end + if !disconnected && !(src(edge) in con_vs) || !(dst(edge) in con_vs) + continue + end + # FIXME: Label syncdeps with associated arguments and datadeps directions + str *= "v$(src(edge)) $edge_sep v$(dst(edge)) [label=\"syncdep\"]\n" + end + + # Add task argument edges + for (tid, args) in task_args + haskey(tid_to_vertex, tid) || continue + tid_v = tid_to_vertex[tid] + tid_v in con_vs || continue + for (pos, arg) in args + arg_v = objid_to_vertex[arg] + if !disconnected && !(arg_v in con_vs) + continue + end + arg_str = sanitize_label(pos isa Int ? "arg $pos" : "kwarg $pos") + str *= "a$arg_v $edge_sep v$tid_v [label=\"$arg_str\"]\n" + end + end + + # Add task result edges + for (tid, result) in task_result + haskey(tid_to_vertex, tid) || continue + tid_v = tid_to_vertex[tid] + tid_v in con_vs || continue + result_v = objid_to_vertex[result] + if !disconnected && !(result_v in con_vs) + continue + end + str *= "v$tid_v $edge_sep a$result_v [label=\"result\"]\n" + end + + # Generate the final graph + str *= "}\n" + return str +end + +end # module Viz diff --git a/src/visualization.jl b/src/visualization.jl index 5fe443b5a..e2b1a8d29 100644 --- a/src/visualization.jl +++ b/src/visualization.jl @@ -30,7 +30,7 @@ show_logs(t, logs, vizmode::Symbol; options...) = show_logs(t, logs, Val{vizmode}(); options...) function show_logs(logs, ::Val{vizmode}; options...) where vizmode iob = IOBuffer() - show_logs(iob, t, Val{vizmode}(); options...) + show_logs(iob, logs, Val{vizmode}(); options...) return String(take!(iob)) end function show_logs(t, logs, ::Val{vizmode}; options...) where vizmode