Skip to content

Commit c2dd97d

Browse files
gulducattgross
andcommitted
HostVolumePlugin interface and two implementations (#24497)
* mkdir: HostVolumePluginMkdir: just creates a directory * example-host-volume: HostVolumePluginExternal: plugin script that does mkfs and mount loopback Co-authored-by: Tim Gross <[email protected]>
1 parent 10a5f48 commit c2dd97d

14 files changed

+526
-25
lines changed

ci/test-core.json

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"client/dynamicplugins/...",
1818
"client/fingerprint/...",
1919
"client/hoststats/...",
20+
"client/hostvolumemanager/...",
2021
"client/interfaces/...",
2122
"client/lib/...",
2223
"client/logmon/...",

client/client.go

+5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/hashicorp/nomad/client/dynamicplugins"
3535
"github.com/hashicorp/nomad/client/fingerprint"
3636
"github.com/hashicorp/nomad/client/hoststats"
37+
"github.com/hashicorp/nomad/client/hostvolumemanager"
3738
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
3839
"github.com/hashicorp/nomad/client/lib/cgroupslib"
3940
"github.com/hashicorp/nomad/client/lib/numalib"
@@ -289,6 +290,8 @@ type Client struct {
289290
// drivermanager is responsible for managing driver plugins
290291
drivermanager drivermanager.Manager
291292

293+
hostVolumeManager *hostvolumemanager.HostVolumeManager
294+
292295
// baseLabels are used when emitting tagged metrics. All client metrics will
293296
// have these tags, and optionally more.
294297
baseLabels []metrics.Label
@@ -532,6 +535,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
532535
c.devicemanager = devManager
533536
c.pluginManagers.RegisterAndRun(devManager)
534537

538+
c.hostVolumeManager = hostvolumemanager.NewHostVolumeManager(cfg.AllocMountsDir, logger)
539+
535540
// Set up the service registration wrapper using the Consul and Nomad
536541
// implementations. The Nomad implementation is only ever used on the
537542
// client, so we do that here rather than within the agent.

client/host_volume_endpoint.go

+26-14
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package client
55

66
import (
77
"context"
8-
"path/filepath"
98
"time"
109

1110
metrics "github.com/armon/go-metrics"
@@ -23,31 +22,44 @@ func newHostVolumesEndpoint(c *Client) *HostVolume {
2322

2423
var hostVolumeRequestTimeout = time.Minute
2524

26-
func (v *HostVolume) requestContext() (context.Context, context.CancelFunc) {
27-
return context.WithTimeout(context.Background(), hostVolumeRequestTimeout)
28-
}
25+
func (v *HostVolume) Create(
26+
req *cstructs.ClientHostVolumeCreateRequest,
27+
resp *cstructs.ClientHostVolumeCreateResponse) error {
2928

30-
func (v *HostVolume) Create(req *cstructs.ClientHostVolumeCreateRequest, resp *cstructs.ClientHostVolumeCreateResponse) error {
3129
defer metrics.MeasureSince([]string{"client", "host_volume", "create"}, time.Now())
32-
_, cancelFn := v.requestContext()
30+
ctx, cancelFn := v.requestContext()
3331
defer cancelFn()
3432

35-
// TODO(1.10.0): call into Client's host volume manager to create the work here
33+
cresp, err := v.c.hostVolumeManager.Create(ctx, req)
34+
if err != nil {
35+
v.c.logger.Error("failed to create host volume", "name", req.Name, "error", err)
36+
return err
37+
}
3638

37-
resp.CapacityBytes = req.RequestedCapacityMinBytes
38-
resp.HostPath = filepath.Join(v.c.config.AllocMountsDir, req.ID)
39+
resp.CapacityBytes = cresp.CapacityBytes
40+
resp.HostPath = cresp.HostPath
3941

40-
v.c.logger.Debug("created host volume", "id", req.ID, "path", resp.HostPath)
42+
v.c.logger.Info("created host volume", "id", req.ID, "path", resp.HostPath)
4143
return nil
4244
}
4345

44-
func (v *HostVolume) Delete(req *cstructs.ClientHostVolumeDeleteRequest, resp *cstructs.ClientHostVolumeDeleteResponse) error {
46+
func (v *HostVolume) Delete(
47+
req *cstructs.ClientHostVolumeDeleteRequest,
48+
resp *cstructs.ClientHostVolumeDeleteResponse) error {
4549
defer metrics.MeasureSince([]string{"client", "host_volume", "create"}, time.Now())
46-
_, cancelFn := v.requestContext()
50+
ctx, cancelFn := v.requestContext()
4751
defer cancelFn()
4852

49-
// TODO(1.10.0): call into Client's host volume manager to delete the volume here
53+
_, err := v.c.hostVolumeManager.Delete(ctx, req) // db TODO(1.10.0): cresp is empty... why return it?
54+
if err != nil {
55+
v.c.logger.Error("failed to delete host volume", "ID", req.ID, "error", err)
56+
return err
57+
}
5058

51-
v.c.logger.Debug("deleted host volume", "id", req.ID, "path", req.HostPath)
59+
v.c.logger.Info("deleted host volume", "id", req.ID, "path", req.HostPath)
5260
return nil
5361
}
62+
63+
func (v *HostVolume) requestContext() (context.Context, context.CancelFunc) {
64+
return context.WithTimeout(context.Background(), hostVolumeRequestTimeout)
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Copyright (c) HashiCorp, Inc.
2+
// SPDX-License-Identifier: BUSL-1.1
3+
4+
package hostvolumemanager
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
"io"
12+
"os"
13+
"os/exec"
14+
"path/filepath"
15+
16+
"github.com/hashicorp/go-hclog"
17+
"github.com/hashicorp/go-multierror"
18+
cstructs "github.com/hashicorp/nomad/client/structs"
19+
"github.com/hashicorp/nomad/helper"
20+
)
21+
22+
type HostVolumePlugin interface {
23+
Version(ctx context.Context) (string, error)
24+
Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error)
25+
Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error
26+
// db TODO(1.10.0): update? resize? ??
27+
}
28+
29+
type HostVolumePluginCreateResponse struct {
30+
Path string `json:"path"`
31+
SizeBytes int64 `json:"bytes"`
32+
Context map[string]string `json:"context"` // metadata
33+
}
34+
35+
var _ HostVolumePlugin = &HostVolumePluginMkdir{}
36+
37+
type HostVolumePluginMkdir struct {
38+
ID string
39+
TargetPath string
40+
41+
log hclog.Logger
42+
}
43+
44+
func (p *HostVolumePluginMkdir) Version(_ context.Context) (string, error) {
45+
return "0.0.1", nil
46+
}
47+
48+
func (p *HostVolumePluginMkdir) Create(_ context.Context,
49+
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
50+
51+
path := filepath.Join(p.TargetPath, req.ID)
52+
log := p.log.With(
53+
"operation", "create",
54+
"volume_id", req.ID,
55+
"path", path)
56+
log.Debug("running plugin")
57+
58+
err := os.Mkdir(path, 0o700)
59+
if err != nil {
60+
log.Debug("error with plugin", "error", err)
61+
return nil, err
62+
}
63+
64+
log.Debug("plugin ran successfully")
65+
return &HostVolumePluginCreateResponse{
66+
Path: path,
67+
SizeBytes: 0,
68+
Context: map[string]string{},
69+
}, nil
70+
}
71+
72+
func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
73+
path := filepath.Join(p.TargetPath, req.ID)
74+
log := p.log.With(
75+
"operation", "delete",
76+
"volume_id", req.ID,
77+
"path", path)
78+
log.Debug("running plugin")
79+
80+
err := os.RemoveAll(path)
81+
if err != nil {
82+
log.Debug("error with plugin", "error", err)
83+
return err
84+
}
85+
86+
log.Debug("plugin ran successfully")
87+
return nil
88+
}
89+
90+
var _ HostVolumePlugin = &HostVolumePluginExternal{}
91+
92+
type HostVolumePluginExternal struct {
93+
ID string
94+
Executable string
95+
TargetPath string
96+
97+
log hclog.Logger
98+
}
99+
100+
func (p *HostVolumePluginExternal) Version(_ context.Context) (string, error) {
101+
return "0.0.1", nil // db TODO(1.10.0): call the plugin, use in fingerprint
102+
}
103+
104+
func (p *HostVolumePluginExternal) Create(ctx context.Context,
105+
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
106+
107+
params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): if this is nil, then PARAMETERS env will be "null"
108+
if err != nil {
109+
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
110+
}
111+
envVars := []string{
112+
"NODE_ID=" + req.NodeID,
113+
"VOLUME_NAME=" + req.Name,
114+
fmt.Sprintf("CAPACITY_MIN_BYTES=%d", req.RequestedCapacityMinBytes),
115+
fmt.Sprintf("CAPACITY_MAX_BYTES=%d", req.RequestedCapacityMaxBytes),
116+
"PARAMETERS=" + string(params),
117+
}
118+
119+
stdout, _, err := p.runPlugin(ctx, "create", req.ID, envVars)
120+
if err != nil {
121+
return nil, fmt.Errorf("error creating volume %q with plugin %q: %w", req.ID, req.PluginID, err)
122+
}
123+
124+
var pluginResp HostVolumePluginCreateResponse
125+
err = json.Unmarshal(stdout, &pluginResp)
126+
if err != nil {
127+
return nil, err
128+
}
129+
return &pluginResp, nil
130+
}
131+
132+
func (p *HostVolumePluginExternal) Delete(ctx context.Context,
133+
req *cstructs.ClientHostVolumeDeleteRequest) error {
134+
135+
params, err := json.Marshal(req.Parameters)
136+
if err != nil {
137+
return fmt.Errorf("error marshaling volume pramaters: %w", err)
138+
}
139+
envVars := []string{
140+
"NODE_ID=" + req.NodeID,
141+
"PARAMETERS=" + string(params),
142+
}
143+
144+
_, _, err = p.runPlugin(ctx, "delete", req.ID, envVars)
145+
if err != nil {
146+
return fmt.Errorf("error deleting volume %q with plugin %q: %w", req.ID, req.PluginID, err)
147+
}
148+
return nil
149+
}
150+
151+
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
152+
op, volID string, env []string) (stdout, stderr []byte, err error) {
153+
154+
path := filepath.Join(p.TargetPath, volID)
155+
log := p.log.With(
156+
"operation", op,
157+
"volume_id", volID,
158+
"path", path)
159+
log.Debug("running plugin")
160+
161+
// set up plugin execution
162+
cmd := exec.CommandContext(ctx, p.Executable, op, path)
163+
164+
cmd.Env = append([]string{
165+
"OPERATION=" + op,
166+
"HOST_PATH=" + path,
167+
}, env...)
168+
169+
var errBuf bytes.Buffer
170+
cmd.Stderr = io.Writer(&errBuf)
171+
172+
// run the command and capture output
173+
mErr := &multierror.Error{}
174+
stdout, err = cmd.Output()
175+
if err != nil {
176+
mErr = multierror.Append(mErr, err)
177+
}
178+
stderr, err = io.ReadAll(&errBuf)
179+
if err != nil {
180+
mErr = multierror.Append(mErr, err)
181+
}
182+
183+
log = log.With(
184+
"stdout", string(stdout),
185+
"stderr", string(stderr),
186+
)
187+
if mErr.ErrorOrNil() != nil {
188+
err = helper.FlattenMultierror(mErr)
189+
log.Debug("error with plugin", "error", err)
190+
return stdout, stderr, err
191+
}
192+
log.Debug("plugin ran successfully")
193+
return stdout, stderr, nil
194+
}
+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (c) HashiCorp, Inc.
2+
// SPDX-License-Identifier: BUSL-1.1
3+
4+
package hostvolumemanager
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"sync"
10+
11+
"github.com/hashicorp/go-hclog"
12+
cstructs "github.com/hashicorp/nomad/client/structs"
13+
)
14+
15+
type HostVolumeManager struct {
16+
log hclog.Logger
17+
plugins *sync.Map
18+
}
19+
20+
func NewHostVolumeManager(sharedMountDir string, logger hclog.Logger) *HostVolumeManager {
21+
log := logger.Named("host_volumes")
22+
23+
mgr := &HostVolumeManager{
24+
log: log,
25+
plugins: &sync.Map{},
26+
}
27+
// db TODO(1.10.0): discover plugins on disk, need a new plugin dir
28+
// TODO: how do we define the external mounter plugins? plugin configs?
29+
mgr.setPlugin("mkdir", &HostVolumePluginMkdir{
30+
ID: "mkdir",
31+
TargetPath: sharedMountDir,
32+
log: log.With("plugin_id", "mkdir"),
33+
})
34+
mgr.setPlugin("example-host-volume", &HostVolumePluginExternal{
35+
ID: "example-host-volume",
36+
Executable: "/opt/nomad/hostvolumeplugins/example-host-volume",
37+
TargetPath: sharedMountDir,
38+
log: log.With("plugin_id", "example-host-volume"),
39+
})
40+
return mgr
41+
}
42+
43+
// db TODO(1.10.0): fingerprint elsewhere / on sighup, and SetPlugin from afar?
44+
func (hvm *HostVolumeManager) setPlugin(id string, plug HostVolumePlugin) {
45+
hvm.plugins.Store(id, plug)
46+
}
47+
48+
func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, bool) {
49+
obj, ok := hvm.plugins.Load(id)
50+
if !ok {
51+
return nil, false
52+
}
53+
return obj.(HostVolumePlugin), true
54+
}
55+
56+
func (hvm *HostVolumeManager) Create(ctx context.Context,
57+
req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) {
58+
59+
plug, ok := hvm.getPlugin(req.PluginID)
60+
if !ok {
61+
return nil, fmt.Errorf("no such plugin %q", req.PluginID)
62+
}
63+
64+
pluginResp, err := plug.Create(ctx, req)
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
resp := &cstructs.ClientHostVolumeCreateResponse{
70+
HostPath: pluginResp.Path,
71+
CapacityBytes: pluginResp.SizeBytes,
72+
}
73+
74+
// db TODO(1.10.0): now we need to add it to the node fingerprint!
75+
// db TODO(1.10.0): and save it in client state!
76+
77+
return resp, nil
78+
}
79+
80+
func (hvm *HostVolumeManager) Delete(ctx context.Context,
81+
req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) {
82+
83+
plug, ok := hvm.getPlugin(req.PluginID)
84+
if !ok {
85+
return nil, fmt.Errorf("no such plugin %q", req.PluginID)
86+
}
87+
88+
err := plug.Delete(ctx, req)
89+
if err != nil {
90+
return nil, err
91+
}
92+
93+
resp := &cstructs.ClientHostVolumeDeleteResponse{}
94+
95+
// db TODO(1.10.0): save the client state!
96+
97+
return resp, nil
98+
}

0 commit comments

Comments
 (0)