@@ -10,7 +10,7 @@ using .job
10
10
11
11
export ClusterEnv, QUEUE_SIZE, ZONES
12
12
13
- const QUEUE_SIZE, ZONES = 128 , 32
13
+ const QUEUE_SIZE, ZONES, SLICE_SIZE = 128 , 32 , 50_000
14
14
15
15
struct Metrics
16
16
avg_bounded_slowdown:: Float32
@@ -23,6 +23,7 @@ mutable struct ClusterEnv <: AbstractEnv
23
23
workload:: Workload
24
24
time:: Int
25
25
next_job_index:: Int
26
+ last_job_index:: Int
26
27
reward:: Float32
27
28
done:: Bool
28
29
cluster:: Vector{Job}
@@ -33,15 +34,19 @@ mutable struct ClusterEnv <: AbstractEnv
33
34
metrics:: Union{Nothing, Metrics}
34
35
end
35
36
37
+ choose_index (wl:: Workload ) = rand (1 : length (wl. jobs) - SLICE_SIZE)
38
+
36
39
function RLBase. reset! (env:: ClusterEnv )
37
- # workload = Workload(rand(WORKLOADS))
38
- # env.workload = workload
40
+ workload = Workload (rand (WORKLOADS))
41
+ env. workload = workload
42
+ index = choose_index (workload)
39
43
env. time = env. workload. jobs[1 ]. submit_time
40
- env. next_job_index = 2
44
+ env. next_job_index = index + 1
45
+ env. last_job_index = index + SLICE_SIZE
41
46
env. reward = 0
42
47
env. done = false
43
48
env. cluster = []
44
- env. queue = [env. workload. jobs[1 ]]
49
+ env. queue = [env. workload. jobs[index ]]
45
50
env. are_pending_jobs = true
46
51
env. available_cores = env. workload. cores
47
52
env. utilization = []
50
55
51
56
function ClusterEnv ()
52
57
workload = Workload (rand (WORKLOADS))
58
+ index = choose_index (workload)
53
59
time = workload. jobs[1 ]. submit_time
54
- next_job_index = 2
55
- reward = 0
56
- done = false
57
- cluster = []
58
- queue = [workload. jobs[1 ]]
59
- are_pending_jobs = true
60
- available_cores = workload. cores
61
- utilization = []
62
- metrics = nothing
63
-
64
- ClusterEnv (workload, time, next_job_index, reward, done, cluster, queue, are_pending_jobs, available_cores, utilization, metrics)
65
- end
66
-
67
- function ClusterEnv (index)
68
- workload = Workload (WORKLOADS[index])
69
- time = workload. jobs[1 ]. submit_time
70
- next_job_index = 2
60
+ next_job_index = index + 1
61
+ last_job_index = index + SLICE_SIZE
71
62
reward = 0
72
63
done = false
73
64
cluster = []
74
- queue = [workload. jobs[1 ]]
65
+ queue = [workload. jobs[index ]]
75
66
are_pending_jobs = true
76
67
available_cores = workload. cores
77
68
utilization = []
78
69
metrics = nothing
79
70
80
- ClusterEnv (workload, time, next_job_index, reward, done, cluster, queue, are_pending_jobs, available_cores, utilization, metrics)
71
+ ClusterEnv (workload, time, next_job_index, last_job_index, reward, done, cluster, queue, are_pending_jobs, available_cores, utilization, metrics)
81
72
end
82
73
83
74
RLBase. action_space (env:: ClusterEnv ) = Base. OneTo (QUEUE_SIZE)
@@ -214,7 +205,7 @@ function (env::ClusterEnv)(action)
214
205
end
215
206
216
207
# check for termination
217
- if env. next_job_index > length ( env. workload . jobs) # no more pending jobs!
208
+ if env. next_job_index > env. last_job_index # no more pending jobs!
218
209
env. are_pending_jobs = false
219
210
end
220
211
# we've finished!
@@ -233,8 +224,8 @@ function (env::ClusterEnv)(action)
233
224
bslds = [max ((j. simulated_wait_time + j. simulated_run_time) / max (j. simulated_run_time, 10 ), 1 ) for j in env. workload. jobs]
234
225
sum_bslds = + (bslds... )
235
226
avg_bsld = sum_bslds / length (env. workload. jobs)
236
- # currently: negative average bounded slowdown
237
- env. reward = - avg_bsld
227
+ # currently: negative average bounded slowdown relative to SJF
228
+ env. reward = env . workload . sjf_bsld - avg_bsld
238
229
env. done = true
239
230
env. metrics = Metrics (avg_bsld, avg_wait_time, max_wait_time, avg_utilization)
240
231
break
0 commit comments