@@ -47,12 +47,12 @@ Fields:
47
47
- `running_on::Dict{Thunk,OSProc}` - Map from `Thunk` to the OS process executing it
48
48
- `thunk_dict::Dict{Int, Any}` - Maps from thunk IDs to a `Thunk`
49
49
- `node_order::Any` - Function that returns the order of a thunk
50
- - `worker_pressure::Dict{Int,Dict{Type,UInt }}` - Cache of worker pressure
51
- - `worker_capacity::Dict{Int,Dict{Type,UInt }}` - Maps from worker ID to capacity
50
+ - `worker_pressure::Dict{Int,Dict{Type,UInt64 }}` - Cache of worker pressure
51
+ - `worker_capacity::Dict{Int,Dict{Type,UInt64 }}` - Maps from worker ID to capacity
52
52
- `worker_loadavg::Dict{Int,NTuple{3,Float64}}` - Worker load average
53
53
- `worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}}` - Communication channels between the scheduler and each worker
54
54
- `procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}}` - Cached linked list of processors ready to be used
55
- - `function_cost_cache::Dict{Type{<:Tuple},UInt }` - Cache of estimated CPU time required to compute the given signature
55
+ - `function_cost_cache::Dict{Type{<:Tuple},UInt64 }` - Cache of estimated CPU time required to compute the given signature
56
56
- `halt::Base.Event` - Event indicating that the scheduler is halting
57
57
- `lock::ReentrantLock` - Lock around operations which modify the state
58
58
- `futures::Dict{Thunk, Vector{ThunkFuture}}` - Futures registered for waiting on the result of a thunk.
@@ -69,12 +69,12 @@ struct ComputeState
69
69
running_on:: Dict{Thunk,OSProc}
70
70
thunk_dict:: Dict{Int, Any}
71
71
node_order:: Any
72
- worker_pressure:: Dict{Int,Dict{Type,UInt }}
73
- worker_capacity:: Dict{Int,Dict{Type,UInt }}
72
+ worker_pressure:: Dict{Int,Dict{Type,UInt64 }}
73
+ worker_capacity:: Dict{Int,Dict{Type,UInt64 }}
74
74
worker_loadavg:: Dict{Int,NTuple{3,Float64}}
75
75
worker_chans:: Dict{Int, Tuple{RemoteChannel,RemoteChannel}}
76
76
procs_cache_list:: Base.RefValue{Union{ProcessorCacheEntry,Nothing}}
77
- function_cost_cache:: Dict{Type{<:Tuple},UInt }
77
+ function_cost_cache:: Dict{Type{<:Tuple},UInt64 }
78
78
halt:: Base.Event
79
79
lock:: ReentrantLock
80
80
futures:: Dict{Thunk, Vector{ThunkFuture}}
@@ -92,12 +92,12 @@ function start_state(deps::Dict, node_order, chan)
92
92
Dict {Thunk,OSProc} (),
93
93
Dict {Int, Thunk} (),
94
94
node_order,
95
- Dict {Int,Dict{Type,UInt }} (),
96
- Dict {Int,Dict{Type,UInt }} (),
95
+ Dict {Int,Dict{Type,UInt64 }} (),
96
+ Dict {Int,Dict{Type,UInt64 }} (),
97
97
Dict {Int,NTuple{3,Float64}} (),
98
98
Dict {Int, Tuple{RemoteChannel,RemoteChannel}} (),
99
99
Ref {Union{ProcessorCacheEntry,Nothing}} (nothing ),
100
- Dict {Type{<:Tuple},UInt } (),
100
+ Dict {Type{<:Tuple},UInt64 } (),
101
101
Base. Event (),
102
102
ReentrantLock (),
103
103
Dict {Thunk, Vector{ThunkFuture}} (),
@@ -228,19 +228,19 @@ function init_proc(state, p)
228
228
# Initialize pressure and capacity
229
229
proc = OSProc (p. pid)
230
230
lock (state. lock) do
231
- state. worker_pressure[p. pid] = Dict {Type,UInt } ()
232
- state. worker_capacity[p. pid] = Dict {Type,UInt } ()
231
+ state. worker_pressure[p. pid] = Dict {Type,UInt64 } ()
232
+ state. worker_capacity[p. pid] = Dict {Type,UInt64 } ()
233
233
state. worker_loadavg[p. pid] = (0.0 , 0.0 , 0.0 )
234
234
for T in unique (typeof .(get_processors (proc)))
235
235
state. worker_pressure[p. pid][T] = 0
236
- state. worker_capacity[p. pid][T] = capacity (proc, T) * UInt (1e9 )
236
+ state. worker_capacity[p. pid][T] = capacity (proc, T) * UInt64 (1e9 )
237
237
end
238
238
state. worker_pressure[p. pid][OSProc] = 0
239
239
state. worker_capacity[p. pid][OSProc] = 0
240
240
end
241
241
cap = remotecall (capacity, p. pid)
242
242
@async begin
243
- cap = fetch (cap) * UInt (1e9 )
243
+ cap = fetch (cap) * UInt64 (1e9 )
244
244
lock (state. lock) do
245
245
state. worker_capacity[p. pid] = cap
246
246
end
295
295
const TASK_SYNC = Threads. Condition ()
296
296
297
297
" Process-local dictionary tracking per-processor total utilization."
298
- const PROC_UTILIZATION = Dict {UInt64,Dict{Type,Ref{UInt }}} ()
298
+ const PROC_UTILIZATION = Dict {UInt64,Dict{Type,Ref{UInt64 }}} ()
299
299
300
300
"""
301
301
MaxUtilization
@@ -540,7 +540,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
540
540
function has_capacity (p, gp, procutil, sig)
541
541
T = typeof (p)
542
542
# FIXME : MaxUtilization
543
- extra_util = round (UInt , get (procutil, T, 1 ) * 1e9 )
543
+ extra_util = round (UInt64 , get (procutil, T, 1 ) * 1e9 )
544
544
real_util = state. worker_pressure[gp][T]
545
545
if (T === Dagger. ThreadProc) && haskey (state. function_cost_cache, sig)
546
546
# Assume that the extra pressure is between estimated and measured
@@ -930,10 +930,10 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c
930
930
931
931
# Check if we'll go over capacity from running this thunk
932
932
real_util = lock (TASK_SYNC) do
933
- AT = get! (()-> Dict {Type,Ref{UInt }} (), PROC_UTILIZATION, uid)
934
- get! (()-> Ref {UInt} ( UInt (0 )), AT, typeof (to_proc))
933
+ AT = get! (()-> Dict {Type,Ref{UInt64 }} (), PROC_UTILIZATION, uid)
934
+ get! (()-> Ref {UInt64} ( UInt64 (0 )), AT, typeof (to_proc))
935
935
end
936
- cap = UInt (capacity (OSProc (), typeof (to_proc))) * UInt (1e9 )
936
+ cap = UInt64 (capacity (OSProc (), typeof (to_proc))) * UInt64 (1e9 )
937
937
while true
938
938
lock (TASK_SYNC)
939
939
if ((extra_util isa MaxUtilization) && (real_util[] > 0 )) ||
0 commit comments