Skip to content

Commit 6e61606

Browse files
Make number of scheduler workers reloadable (#11593)
## Development Environment Changes * Added stringer to build deps ## New HTTP APIs * Added scheduler worker config API * Added scheduler worker info API ## New Internals * (Scheduler)Worker API refactor—Start(), Stop(), Pause(), Resume() * Update shutdown to use context * Add mutex for contended server data - `workerLock` for the `workers` slice - `workerConfigLock` for the `Server.Config.NumSchedulers` and `Server.Config.EnabledSchedulers` values ## Other * Adding docs for scheduler worker api * Add changelog message Co-authored-by: Derek Strickland <[email protected]>
1 parent d5e8081 commit 6e61606

21 files changed

+2211
-101
lines changed

.changelog/11593.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:improvement
2+
server: Make num_schedulers and enabled_schedulers hot reloadable; add agent API endpoint to enable dynamic modifications of these values.
3+
```
+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
{
2+
"$schema": "https://aka.ms/codetour-schema",
3+
"title": "Scheduler Worker - Hot Reload",
4+
"steps": [
5+
{
6+
"file": "nomad/server.go",
7+
"description": "## Server.Reload()\n\nServer configuration reloads start here.",
8+
"line": 782,
9+
"selection": {
10+
"start": {
11+
"line": 780,
12+
"character": 4
13+
},
14+
"end": {
15+
"line": 780,
16+
"character": 10
17+
}
18+
}
19+
},
20+
{
21+
"file": "nomad/server.go",
22+
"description": "## Did NumSchedulers change?\nIf the number of schedulers has changed between the running configuration and the new one we need to adopt that change in realtime.",
23+
"line": 812
24+
},
25+
{
26+
"file": "nomad/server.go",
27+
"description": "## Server.setupNewWorkers()\n\nsetupNewWorkers performs three tasks:\n\n- makes a copy of the existing worker pointers\n\n- creates a fresh array and loads a new set of workers into them\n\n- iterates through the \"old\" workers and shuts them down in individual\n goroutines for maximum parallelism",
28+
"line": 1482,
29+
"selection": {
30+
"start": {
31+
"line": 1480,
32+
"character": 4
33+
},
34+
"end": {
35+
"line": 1480,
36+
"character": 12
37+
}
38+
}
39+
},
40+
{
41+
"file": "nomad/server.go",
42+
"description": "Once all of the work in setupNewWorkers is complete, we stop the old ones.",
43+
"line": 1485
44+
},
45+
{
46+
"file": "nomad/server.go",
47+
"description": "The `stopOldWorkers` function iterates through the array of workers and calls their `Shutdown` method\nas a goroutine to prevent blocking.",
48+
"line": 1505
49+
},
50+
{
51+
"file": "nomad/worker.go",
52+
"description": "The `Shutdown` method sets `w.stop` to true signaling that we intend for the `Worker` to stop the next time we consult it. We also manually unpause the `Worker` by setting w.paused to false and sending a `Broadcast()` via the cond.",
53+
"line": 110
54+
}
55+
],
56+
"ref": "f-reload-num-schedulers"
57+
}

.tours/scheduler-worker---pause.tour

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
{
2+
"$schema": "https://aka.ms/codetour-schema",
3+
"title": "Scheduler Worker - Pause",
4+
"steps": [
5+
{
6+
"file": "nomad/leader.go",
7+
"description": "## Server.establishLeadership()\n\nUpon becoming a leader, the server pauses a subset of the workers to allow for the additional burden of the leader's goroutines. The `handlePausableWorkers` function takes a boolean that states whether or not the current node is a leader or not. Because we are in `establishLeadership` we use `true` rather than calling `s.IsLeader()`",
8+
"line": 233,
9+
"selection": {
10+
"start": {
11+
"line": 233,
12+
"character": 4
13+
},
14+
"end": {
15+
"line": 233,
16+
"character": 12
17+
}
18+
}
19+
},
20+
{
21+
"file": "nomad/leader.go",
22+
"description": "## Server.handlePausableWorkers()\n\nhandlePausableWorkers ranges over a slice of Workers and manipulates their paused state by calling their `SetPause` method.",
23+
"line": 443,
24+
"selection": {
25+
"start": {
26+
"line": 443,
27+
"character": 18
28+
},
29+
"end": {
30+
"line": 443,
31+
"character": 26
32+
}
33+
}
34+
},
35+
{
36+
"file": "nomad/leader.go",
37+
"description": "## Server.pausableWorkers()\n\nThe pausableWorkers function provides a consistent slice of workers that the server can pause and unpause. Since the Worker array is never mutated, the same slice is returned by pausableWorkers on every invocation.\nThis comment is interesting/potentially confusing\n\n```golang\n // Disabling 3/4 of the workers frees CPU for raft and the\n\t// plan applier which uses 1/2 the cores.\n``` \n\nHowever, the key point is that it will return a slice containg 3/4th of the workers.",
38+
"line": 1100,
39+
"selection": {
40+
"start": {
41+
"line": 1104,
42+
"character": 1
43+
},
44+
"end": {
45+
"line": 1105,
46+
"character": 43
47+
}
48+
}
49+
},
50+
{
51+
"file": "nomad/worker.go",
52+
"description": "## Worker.SetPause()\n\nThe `SetPause` function is used to signal an intention to pause the worker. Because the worker's work is happening in the `run()` goroutine, pauses happen asynchronously.",
53+
"line": 91
54+
},
55+
{
56+
"file": "nomad/worker.go",
57+
"description": "## Worker.dequeueEvaluation()\n\nCalls checkPaused, which will be the function we wait in if the scheduler is set to be paused. \n\n> **NOTE:** This is called here rather than in run() because this function loops in case of an error fetching a evaluation.",
58+
"line": 206
59+
},
60+
{
61+
"file": "nomad/worker.go",
62+
"description": "## Worker.checkPaused()\n\nWhen `w.paused` is `true`, we call the `Wait()` function on the condition. Execution of this goroutine will stop here until it receives a `Broadcast()` or a `Signal()`. At this point, the `Worker` is paused.",
63+
"line": 104
64+
}
65+
]
66+
}
+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"$schema": "https://aka.ms/codetour-schema",
3+
"title": "Scheduler Worker - Unpause",
4+
"steps": [
5+
{
6+
"file": "nomad/leader.go",
7+
"description": "## revokeLeadership()\n\nAs a server transistions from leader to non-leader, the pausableWorkers are resumed since the other leader goroutines are stopped providing extra capacity.",
8+
"line": 1040,
9+
"selection": {
10+
"start": {
11+
"line": 1038,
12+
"character": 10
13+
},
14+
"end": {
15+
"line": 1038,
16+
"character": 20
17+
}
18+
}
19+
},
20+
{
21+
"file": "nomad/leader.go",
22+
"description": "## handlePausableWorkers()\n\nThe handlePausableWorkers method is called with `false`. We fetch the pausableWorkers and call their SetPause method with `false`.\n",
23+
"line": 443,
24+
"selection": {
25+
"start": {
26+
"line": 443,
27+
"character": 18
28+
},
29+
"end": {
30+
"line": 443,
31+
"character": 27
32+
}
33+
}
34+
},
35+
{
36+
"file": "nomad/worker.go",
37+
"description": "## Worker.SetPause()\n\nDuring unpause, p is false. We update w.paused in the mutex, and then call Broadcast on the cond. This wakes the goroutine sitting in the Wait() inside of `checkPaused()`",
38+
"line": 91
39+
},
40+
{
41+
"file": "nomad/worker.go",
42+
"description": "## Worker.checkPaused()\n\nOnce the goroutine receives the `Broadcast()` message from `SetPause()`, execution continues here. Now that `w.paused == false`, we exit the loop and return to the caller (the `dequeueEvaluation()` function).",
43+
"line": 104
44+
},
45+
{
46+
"file": "nomad/worker.go",
47+
"description": "## Worker.dequeueEvaluation\n\nWe return back into dequeueEvaluation after the call to checkPaused. At this point the worker will either stop (if that signal boolean has been set) or continue looping after returning to run().",
48+
"line": 207
49+
}
50+
]
51+
}

.tours/scheduler-worker.tour

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"$schema": "https://aka.ms/codetour-schema",
3+
"title": "Scheduler Worker - Start",
4+
"steps": [
5+
{
6+
"file": "nomad/server.go",
7+
"description": "## Server.NewServer()\n\nScheduler workers are started as the agent starts the `server` go routines.",
8+
"line": 402
9+
},
10+
{
11+
"file": "nomad/server.go",
12+
"description": "## Server.setupWorkers()\n\nThe `setupWorkers()` function validates that there are enabled Schedulers by type and count. It then creates s.config.NumSchedulers by calling `NewWorker()`\n\nThe `_core` scheduler _**must**_ be enabled. **TODO: why?**\n",
13+
"line": 1443,
14+
"selection": {
15+
"start": {
16+
"line": 1442,
17+
"character": 4
18+
},
19+
"end": {
20+
"line": 1442,
21+
"character": 12
22+
}
23+
}
24+
},
25+
{
26+
"file": "nomad/worker.go",
27+
"description": "## Worker.NewWorker\n\nNewWorker creates the Worker and starts `run()` in a goroutine.",
28+
"line": 78
29+
},
30+
{
31+
"file": "nomad/worker.go",
32+
"description": "## Worker.run()\n\nThe `run()` function runs in a loop until it's paused, it's stopped, or the server indicates that it is shutting down. All of the work the `Worker` performs should be\nimplemented in or called from here.\n",
33+
"line": 152
34+
}
35+
]
36+
}

GNUmakefile

+1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ deps: ## Install build and development dependencies
124124
go install github.com/hashicorp/go-msgpack/codec/[email protected]
125125
go install github.com/bufbuild/buf/cmd/[email protected]
126126
go install github.com/hashicorp/go-changelog/cmd/changelog-build@latest
127+
go install golang.org/x/tools/cmd/[email protected]
127128

128129
.PHONY: lint-deps
129130
lint-deps: ## Install linter dependencies

api/agent.go

+75
Original file line numberDiff line numberDiff line change
@@ -494,3 +494,78 @@ type HostDataResponse struct {
494494
AgentID string
495495
HostData *HostData `json:",omitempty"`
496496
}
497+
498+
// GetSchedulerWorkerConfig returns the targeted agent's worker pool configuration
499+
func (a *Agent) GetSchedulerWorkerConfig(q *QueryOptions) (*SchedulerWorkerPoolArgs, error) {
500+
var resp AgentSchedulerWorkerConfigResponse
501+
_, err := a.client.query("/v1/agent/schedulers/config", &resp, q)
502+
if err != nil {
503+
return nil, err
504+
}
505+
506+
return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil
507+
}
508+
509+
// SetSchedulerWorkerConfig attempts to update the targeted agent's worker pool configuration
510+
func (a *Agent) SetSchedulerWorkerConfig(args SchedulerWorkerPoolArgs, q *WriteOptions) (*SchedulerWorkerPoolArgs, error) {
511+
req := AgentSchedulerWorkerConfigRequest(args)
512+
var resp AgentSchedulerWorkerConfigResponse
513+
514+
_, err := a.client.write("/v1/agent/schedulers/config", &req, &resp, q)
515+
if err != nil {
516+
return nil, err
517+
}
518+
519+
return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil
520+
}
521+
522+
type SchedulerWorkerPoolArgs struct {
523+
NumSchedulers int
524+
EnabledSchedulers []string
525+
}
526+
527+
// AgentSchedulerWorkerConfigRequest is used to provide new scheduler worker configuration
528+
// to a specific Nomad server. EnabledSchedulers must contain at least the `_core` scheduler
529+
// to be valid.
530+
type AgentSchedulerWorkerConfigRequest struct {
531+
NumSchedulers int `json:"num_schedulers"`
532+
EnabledSchedulers []string `json:"enabled_schedulers"`
533+
}
534+
535+
// AgentSchedulerWorkerConfigResponse contains the Nomad server's current running configuration
536+
// as well as the server's id as a convenience. This can be used to provide starting values for
537+
// creating an AgentSchedulerWorkerConfigRequest to make changes to the running configuration.
538+
type AgentSchedulerWorkerConfigResponse struct {
539+
ServerID string `json:"server_id"`
540+
NumSchedulers int `json:"num_schedulers"`
541+
EnabledSchedulers []string `json:"enabled_schedulers"`
542+
}
543+
544+
// GetSchedulerWorkersInfo returns the current status of all of the scheduler workers on
545+
// a Nomad server.
546+
func (a *Agent) GetSchedulerWorkersInfo(q *QueryOptions) (*AgentSchedulerWorkersInfo, error) {
547+
var out *AgentSchedulerWorkersInfo
548+
549+
_, err := a.client.query("/v1/agent/schedulers", &out, q)
550+
if err != nil {
551+
return nil, err
552+
}
553+
554+
return out, nil
555+
}
556+
557+
// AgentSchedulerWorkersInfo is the response from the scheduler information endpoint containing
558+
// a detailed status of each scheduler worker running on the server.
559+
type AgentSchedulerWorkersInfo struct {
560+
ServerID string `json:"server_id"`
561+
Schedulers []AgentSchedulerWorkerInfo `json:"schedulers"`
562+
}
563+
564+
// AgentSchedulerWorkerInfo holds the detailed status information for a single scheduler worker.
565+
type AgentSchedulerWorkerInfo struct {
566+
ID string `json:"id"`
567+
EnabledSchedulers []string `json:"enabled_schedulers"`
568+
Started string `json:"started"`
569+
Status string `json:"status"`
570+
WorkloadStatus string `json:"workload_status"`
571+
}

api/agent_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package api
22

33
import (
44
"fmt"
5+
"net/http"
56
"reflect"
67
"sort"
78
"strings"
@@ -456,3 +457,50 @@ func TestAgentProfile(t *testing.T) {
456457
require.Nil(t, resp)
457458
}
458459
}
460+
461+
func TestAgent_SchedulerWorkerConfig(t *testing.T) {
462+
t.Parallel()
463+
464+
c, s := makeClient(t, nil, nil)
465+
defer s.Stop()
466+
a := c.Agent()
467+
468+
config, err := a.GetSchedulerWorkerConfig(nil)
469+
require.NoError(t, err)
470+
require.NotNil(t, config)
471+
newConfig := SchedulerWorkerPoolArgs{NumSchedulers: 0, EnabledSchedulers: []string{"_core", "system"}}
472+
resp, err := a.SetSchedulerWorkerConfig(newConfig, nil)
473+
require.NoError(t, err)
474+
assert.NotEqual(t, config, resp)
475+
}
476+
477+
func TestAgent_SchedulerWorkerConfig_BadRequest(t *testing.T) {
478+
t.Parallel()
479+
480+
c, s := makeClient(t, nil, nil)
481+
defer s.Stop()
482+
a := c.Agent()
483+
484+
config, err := a.GetSchedulerWorkerConfig(nil)
485+
require.NoError(t, err)
486+
require.NotNil(t, config)
487+
newConfig := SchedulerWorkerPoolArgs{NumSchedulers: -1, EnabledSchedulers: []string{"_core", "system"}}
488+
_, err = a.SetSchedulerWorkerConfig(newConfig, nil)
489+
require.Error(t, err)
490+
require.Contains(t, err.Error(), fmt.Sprintf("%v (%s)", http.StatusBadRequest, "Invalid request"))
491+
}
492+
493+
func TestAgent_SchedulerWorkersInfo(t *testing.T) {
494+
t.Parallel()
495+
c, s := makeClient(t, nil, nil)
496+
defer s.Stop()
497+
a := c.Agent()
498+
499+
info, err := a.GetSchedulerWorkersInfo(nil)
500+
require.NoError(t, err)
501+
require.NotNil(t, info)
502+
defaultSchedulers := []string{"batch", "system", "sysbatch", "service", "_core"}
503+
for _, worker := range info.Schedulers {
504+
require.ElementsMatch(t, defaultSchedulers, worker.EnabledSchedulers)
505+
}
506+
}

0 commit comments

Comments
 (0)