Skip to content

Commit 44ab5a5

Browse files
committed
logging: Make IDs consistent, add helpers
Makes the :id field consistent (NamedTuple or nothing) Adds enable/disable logging helpers and log fetch helper
1 parent d0cf5cc commit 44ab5a5

File tree

7 files changed

+100
-47
lines changed

7 files changed

+100
-47
lines changed

Diff for: src/Dagger.jl

+2-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ include("ui/gantt-common.jl")
6767
include("ui/gantt-text.jl")
6868

6969
# Logging
70-
include("lib/logging-events.jl")
70+
include("utils/logging-events.jl")
71+
include("utils/logging.jl")
7172

7273
# Precompilation
7374
using PrecompileTools

Diff for: src/sch/Sch.jl

+43-42
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ const WORKER_MONITOR_TASKS = Dict{Int,Task}()
319319
const WORKER_MONITOR_CHANS = Dict{Int,Dict{UInt64,RemoteChannel}}()
320320
function init_proc(state, p, log_sink)
321321
ctx = Context(Int[]; log_sink)
322-
timespan_start(ctx, :init_proc, p.pid, 0)
322+
timespan_start(ctx, :init_proc, (;worker=p.pid), nothing)
323323
# Initialize pressure and capacity
324324
gproc = OSProc(p.pid)
325325
lock(state.lock) do
@@ -387,7 +387,7 @@ function init_proc(state, p, log_sink)
387387
# Setup dynamic listener
388388
dynamic_listener!(ctx, state, p.pid)
389389

390-
timespan_finish(ctx, :init_proc, p.pid, 0)
390+
timespan_finish(ctx, :init_proc, (;worker=p.pid), nothing)
391391
end
392392
function _cleanup_proc(uid, log_sink)
393393
empty!(CHUNK_CACHE) # FIXME: Should be keyed on uid!
@@ -403,14 +403,14 @@ end
403403
function cleanup_proc(state, p, log_sink)
404404
ctx = Context(Int[]; log_sink)
405405
wid = p.pid
406-
timespan_start(ctx, :cleanup_proc, wid, 0)
406+
timespan_start(ctx, :cleanup_proc, (;worker=wid), nothing)
407407
lock(WORKER_MONITOR_LOCK) do
408408
if haskey(WORKER_MONITOR_CHANS, wid)
409409
delete!(WORKER_MONITOR_CHANS[wid], state.uid)
410410
end
411411
end
412412
remote_do(_cleanup_proc, wid, state.uid, log_sink)
413-
timespan_finish(ctx, :cleanup_proc, wid, 0)
413+
timespan_finish(ctx, :cleanup_proc, (;worker=wid), nothing)
414414
end
415415

416416
"Process-local condition variable (and lock) indicating task completion."
@@ -458,24 +458,24 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
458458

459459
master = OSProc(myid())
460460

461-
timespan_start(ctx, :scheduler_init, 0, master)
461+
timespan_start(ctx, :scheduler_init, nothing, master)
462462
try
463463
scheduler_init(ctx, state, d, options, deps)
464464
finally
465-
timespan_finish(ctx, :scheduler_init, 0, master)
465+
timespan_finish(ctx, :scheduler_init, nothing, master)
466466
end
467467

468468
value, errored = try
469469
scheduler_run(ctx, state, d, options)
470470
finally
471471
# Always try to tear down the scheduler
472-
timespan_start(ctx, :scheduler_exit, 0, master)
472+
timespan_start(ctx, :scheduler_exit, nothing, master)
473473
try
474474
scheduler_exit(ctx, state, options)
475475
catch err
476476
@error "Error when tearing down scheduler" exception=(err,catch_backtrace())
477477
finally
478-
timespan_finish(ctx, :scheduler_exit, 0, master)
478+
timespan_finish(ctx, :scheduler_exit, nothing, master)
479479
end
480480
end
481481

@@ -531,10 +531,10 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
531531
check_integrity(ctx)
532532

533533
isempty(state.running) && continue
534-
timespan_start(ctx, :take, 0, 0)
534+
timespan_start(ctx, :take, nothing, nothing)
535535
@dagdebug nothing :take "Waiting for results"
536536
chan_value = take!(state.chan) # get result of completed thunk
537-
timespan_finish(ctx, :take, 0, 0)
537+
timespan_finish(ctx, :take, nothing, nothing)
538538
if chan_value isa RescheduleSignal
539539
continue
540540
end
@@ -549,13 +549,13 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
549549
@warn "Worker $(pid) died, rescheduling work"
550550

551551
# Remove dead worker from procs list
552-
timespan_start(ctx, :remove_procs, 0, 0)
552+
timespan_start(ctx, :remove_procs, (;worker=pid), nothing)
553553
remove_dead_proc!(ctx, state, gproc)
554-
timespan_finish(ctx, :remove_procs, 0, 0)
554+
timespan_finish(ctx, :remove_procs, (;worker=pid), nothing)
555555

556-
timespan_start(ctx, :handle_fault, 0, 0)
556+
timespan_start(ctx, :handle_fault, (;worker=pid), nothing)
557557
handle_fault(ctx, state, gproc)
558-
timespan_finish(ctx, :handle_fault, 0, 0)
558+
timespan_finish(ctx, :handle_fault, (;worker=pid), nothing)
559559
return # effectively `continue`
560560
else
561561
if something(ctx.options.allow_errors, false) ||
@@ -590,9 +590,9 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
590590
end
591591
end
592592

593-
timespan_start(ctx, :finish, thunk_id, (;thunk_id))
593+
timespan_start(ctx, :finish, (;thunk_id), (;thunk_id))
594594
finish_task!(ctx, state, node, thunk_failed)
595-
timespan_finish(ctx, :finish, thunk_id, (;thunk_id))
595+
timespan_finish(ctx, :finish, (;thunk_id), (;thunk_id))
596596

597597
delete_unused_tasks!(state)
598598
end
@@ -675,13 +675,13 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
675675
task = nothing
676676
@label pop_task
677677
if task !== nothing
678-
timespan_finish(ctx, :schedule, task.id, (;thunk_id=task.id))
678+
timespan_finish(ctx, :schedule, (;thunk_id=task.id), (;thunk_id=task.id))
679679
end
680680
if isempty(state.ready)
681681
@goto fire_tasks
682682
end
683683
task = pop!(state.ready)
684-
timespan_start(ctx, :schedule, task.id, (;thunk_id=task.id))
684+
timespan_start(ctx, :schedule, (;thunk_id=task.id), (;thunk_id=task.id))
685685
if haskey(state.cache, task)
686686
if haskey(state.errored, task)
687687
# An error was eagerly propagated to this task
@@ -869,7 +869,7 @@ function monitor_procs_changed!(ctx, state)
869869
wait(ctx.proc_notify)
870870
end
871871

872-
timespan_start(ctx, :assign_procs, 0, 0)
872+
timespan_start(ctx, :assign_procs, nothing, nothing)
873873

874874
# Load new set of procs
875875
new_ps = procs_to_use(ctx)
@@ -897,7 +897,7 @@ function monitor_procs_changed!(ctx, state)
897897
end
898898
end
899899

900-
timespan_finish(ctx, :assign_procs, 0, 0)
900+
timespan_finish(ctx, :assign_procs, nothing, nothing)
901901
old_ps = new_ps
902902
end
903903
end
@@ -982,9 +982,9 @@ function evict_chunks!(log_sink, chunks::Set{Chunk})
982982
ctx = Context([myid()];log_sink)
983983
for chunk in chunks
984984
lock(TASK_SYNC) do
985-
timespan_start(ctx, :evict, myid(), (;data=chunk))
985+
timespan_start(ctx, :evict, (;worker=myid()), (;data=chunk))
986986
haskey(CHUNK_CACHE, chunk) && delete!(CHUNK_CACHE, chunk)
987-
timespan_finish(ctx, :evict, myid(), (;data=chunk))
987+
timespan_finish(ctx, :evict, (;worker=myid()), (;data=chunk))
988988
end
989989
end
990990
nothing
@@ -1061,15 +1061,15 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
10611061
for ts in to_send
10621062
# TODO: errormonitor
10631063
@async begin
1064-
timespan_start(ctx, :fire, gproc.pid, 0)
1064+
timespan_start(ctx, :fire, (;worker=gproc.pid), nothing)
10651065
try
10661066
remotecall_wait(do_tasks, gproc.pid, proc, state.chan, [ts])
10671067
catch err
10681068
bt = catch_backtrace()
10691069
thunk_id = ts[1]
10701070
put!(state.chan, (gproc.pid, proc, thunk_id, (CapturedException(err, bt), nothing)))
10711071
finally
1072-
timespan_finish(ctx, :fire, gproc.pid, 0)
1072+
timespan_finish(ctx, :fire, (;worker=gproc.pid), nothing)
10731073
end
10741074
end
10751075
end
@@ -1189,25 +1189,26 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
11891189
proc_occupancy = istate.proc_occupancy
11901190
time_pressure = istate.time_pressure
11911191

1192+
wid = get_parent(to_proc).pid
11921193
work_to_do = false
11931194
while isopen(return_queue)
11941195
# Wait for new tasks
11951196
if !work_to_do
11961197
@dagdebug nothing :processor "Waiting for tasks"
1197-
timespan_start(ctx, :proc_run_wait, to_proc, nothing)
1198+
timespan_start(ctx, :proc_run_wait, (;worker=wid, processor=to_proc), nothing)
11981199
wait(istate.reschedule)
11991200
@static if VERSION >= v"1.9"
12001201
reset(istate.reschedule)
12011202
end
1202-
timespan_finish(ctx, :proc_run_wait, to_proc, nothing)
1203+
timespan_finish(ctx, :proc_run_wait, (;worker=wid, processor=to_proc), nothing)
12031204
if istate.done[]
12041205
return
12051206
end
12061207
end
12071208

12081209
# Fetch a new task to execute
12091210
@dagdebug nothing :processor "Trying to dequeue"
1210-
timespan_start(ctx, :proc_run_fetch, to_proc, nothing)
1211+
timespan_start(ctx, :proc_run_fetch, (;worker=wid, processor=to_proc), nothing)
12111212
work_to_do = false
12121213
task_and_occupancy = lock(istate.queue) do queue
12131214
# Only steal if there are multiple queued tasks, to prevent
@@ -1226,7 +1227,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12261227
return queue_result
12271228
end
12281229
if task_and_occupancy === nothing
1229-
timespan_finish(ctx, :proc_run_fetch, to_proc, nothing)
1230+
timespan_finish(ctx, :proc_run_fetch, (;worker=wid, processor=to_proc), nothing)
12301231

12311232
@dagdebug nothing :processor "Failed to dequeue"
12321233

@@ -1241,7 +1242,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12411242
@dagdebug nothing :processor "Trying to steal"
12421243

12431244
# Try to steal a task
1244-
timespan_start(ctx, :steal_local, to_proc, nothing)
1245+
timespan_start(ctx, :steal_local, (;worker=wid, processor=to_proc), nothing)
12451246

12461247
# Try to steal from local queues randomly
12471248
# TODO: Prioritize stealing from busiest processors
@@ -1276,12 +1277,12 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12761277
from_proc = other_istate.proc
12771278
thunk_id = task[1]
12781279
@dagdebug thunk_id :processor "Stolen from $from_proc by $to_proc"
1279-
timespan_finish(ctx, :steal_local, to_proc, (;from_proc, thunk_id))
1280+
timespan_finish(ctx, :steal_local, (;worker=wid, processor=to_proc), (;from_proc, thunk_id))
12801281
# TODO: Keep stealing until we hit full occupancy?
12811282
@goto execute
12821283
end
12831284
end
1284-
timespan_finish(ctx, :steal_local, to_proc, nothing)
1285+
timespan_finish(ctx, :steal_local, (;worker=wid, processor=to_proc), nothing)
12851286

12861287
# TODO: Try to steal from remote queues
12871288

@@ -1293,7 +1294,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
12931294
task = task_spec[]
12941295
thunk_id = task[1]
12951296
time_util = task[2]
1296-
timespan_finish(ctx, :proc_run_fetch, to_proc, (;thunk_id, proc_occupancy=proc_occupancy[], task_occupancy))
1297+
timespan_finish(ctx, :proc_run_fetch, (;worker=wid, processor=to_proc), (;thunk_id, proc_occupancy=proc_occupancy[], task_occupancy))
12971298
@dagdebug thunk_id :processor "Dequeued task"
12981299

12991300
# Execute the task and return its result
@@ -1378,7 +1379,7 @@ function do_tasks(to_proc, return_queue, tasks)
13781379
for task in tasks
13791380
thunk_id = task[1]
13801381
occupancy = task[4]
1381-
timespan_start(ctx, :enqueue, (;to_proc, thunk_id), nothing)
1382+
timespan_start(ctx, :enqueue, (;processor=to_proc, thunk_id), nothing)
13821383
should_launch = lock(TASK_SYNC) do
13831384
# Already running; don't try to re-launch
13841385
if !(thunk_id in TASKS_RUNNING)
@@ -1390,7 +1391,7 @@ function do_tasks(to_proc, return_queue, tasks)
13901391
end
13911392
should_launch || continue
13921393
enqueue!(queue, TaskSpecKey(task), occupancy)
1393-
timespan_finish(ctx, :enqueue, (;to_proc, thunk_id), nothing)
1394+
timespan_finish(ctx, :enqueue, (;processor=to_proc, thunk_id), nothing)
13941395
@dagdebug thunk_id :processor "Enqueued task"
13951396
end
13961397
end
@@ -1435,7 +1436,7 @@ function do_task(to_proc, task_desc)
14351436
to_storage_name = nameof(typeof(to_storage))
14361437
storage_cap = storage_capacity(to_storage)
14371438

1438-
timespan_start(ctx, :storage_wait, thunk_id, (;f, to_proc, device=typeof(to_storage)))
1439+
timespan_start(ctx, :storage_wait, (;thunk_id, processor=to_proc), (;f, device=typeof(to_storage)))
14391440
real_time_util = Ref{UInt64}(0)
14401441
real_alloc_util = UInt64(0)
14411442
if !meta
@@ -1493,7 +1494,7 @@ function do_task(to_proc, task_desc)
14931494
break
14941495
end
14951496
end
1496-
timespan_finish(ctx, :storage_wait, thunk_id, (;f, to_proc, device=typeof(to_storage)))
1497+
timespan_finish(ctx, :storage_wait, (;thunk_id, processor=to_proc), (;f, device=typeof(to_storage)))
14971498

14981499
@dagdebug thunk_id :execute "Moving data"
14991500

@@ -1507,7 +1508,7 @@ function do_task(to_proc, task_desc)
15071508
end
15081509
fetch_tasks = map(Iterators.zip(_data,_ids)) do (x, id)
15091510
@async begin
1510-
timespan_start(ctx, :move, (;thunk_id, id), (;f, id, data=x))
1511+
timespan_start(ctx, :move, (;thunk_id, id, processor=to_proc), (;f, data=x))
15111512
#= FIXME: This isn't valid if x is written to
15121513
x = if x isa Chunk
15131514
value = lock(TASK_SYNC) do
@@ -1553,7 +1554,7 @@ function do_task(to_proc, task_desc)
15531554
x = @invokelatest move(to_proc, x)
15541555
#end
15551556
@dagdebug thunk_id :move "Moved argument $id to $to_proc: $x"
1556-
timespan_finish(ctx, :move, (;thunk_id, id), (;f, id, data=x); tasks=[Base.current_task()])
1557+
timespan_finish(ctx, :move, (;thunk_id, id, processor=to_proc), (;f, data=x); tasks=[Base.current_task()])
15571558
return x
15581559
end
15591560
end
@@ -1587,7 +1588,7 @@ function do_task(to_proc, task_desc)
15871588
=#
15881589

15891590
real_time_util[] += est_time_util
1590-
timespan_start(ctx, :compute, thunk_id, (;f, to_proc))
1591+
timespan_start(ctx, :compute, (;thunk_id, processor=to_proc), (;f))
15911592
res = nothing
15921593

15931594
# Start counting time and GC allocations
@@ -1614,13 +1615,13 @@ function do_task(to_proc, task_desc)
16141615
# Check if result is safe to store
16151616
device = nothing
16161617
if !(res isa Chunk)
1617-
timespan_start(ctx, :storage_safe_scan, thunk_id, (;T=typeof(res)))
1618+
timespan_start(ctx, :storage_safe_scan, (;thunk_id, processor=to_proc), (;T=typeof(res)))
16181619
device = if walk_storage_safe(res)
16191620
to_storage
16201621
else
16211622
MemPool.CPURAMDevice()
16221623
end
1623-
timespan_finish(ctx, :storage_safe_scan, thunk_id, (;T=typeof(res)))
1624+
timespan_finish(ctx, :storage_safe_scan, (;thunk_id, processor=to_proc), (;T=typeof(res)))
16241625
end
16251626

16261627
# Construct result
@@ -1637,7 +1638,7 @@ function do_task(to_proc, task_desc)
16371638
threadtime = cputhreadtime() - threadtime_start
16381639
# FIXME: This is not a realistic measure of max. required memory
16391640
#gc_allocd = min(max(UInt64(Base.gc_num().allocd) - UInt64(gcnum_start.allocd), UInt64(0)), UInt64(1024^4))
1640-
timespan_finish(ctx, :compute, thunk_id, (;f, to_proc))
1641+
timespan_finish(ctx, :compute, (;thunk_id, processor=to_proc), (;f))
16411642
lock(TASK_SYNC) do
16421643
real_time_util[] -= est_time_util
16431644
pop!(TASKS_RUNNING, thunk_id)

Diff for: src/sch/dynamic.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ end
201201
add_thunk!(f, h::SchedulerHandle, args...; future=nothing, ref=nothing, options...) =
202202
exec!(_add_thunk!, h, f, args, options, future, ref)
203203
function _add_thunk!(ctx, state, task, tid, (f, args, options, future, ref))
204-
timespan_start(ctx, :add_thunk, tid, 0)
204+
timespan_start(ctx, :add_thunk, (;thunk_id=tid), 0)
205205
_args = map(args) do pos_arg
206206
if pos_arg[2] isa ThunkID
207207
return pos_arg[1] => state.thunk_dict[pos_arg[2].id]
@@ -228,7 +228,7 @@ function _add_thunk!(ctx, state, task, tid, (f, args, options, future, ref))
228228
end
229229
state.valid[thunk] = nothing
230230
put!(state.chan, RescheduleSignal())
231-
timespan_finish(ctx, :add_thunk, tid, 0)
231+
timespan_finish(ctx, :add_thunk, (;thunk_id=tid), 0)
232232
return thunk_id
233233
end
234234
end

Diff for: src/submission.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
2424
return thunk_ids
2525
end
2626

27-
timespan_start(ctx, :add_thunk, tid, 0)
27+
timespan_start(ctx, :add_thunk, (;thunk_id=tid), 0)
2828

2929
# Lookup EagerThunk/ThunkID -> Thunk
3030
old_args = copy(args)
@@ -118,7 +118,7 @@ function eager_submit_internal!(ctx, state, task, tid, payload; uid_to_tid=Dict{
118118
put!(state.chan, Sch.RescheduleSignal())
119119
end
120120

121-
timespan_finish(ctx, :add_thunk, tid, 0)
121+
timespan_finish(ctx, :add_thunk, (;thunk_id=tid), 0)
122122

123123
return thunk_id
124124
end
File renamed without changes.

0 commit comments

Comments
 (0)