Skip to content

Commit 49c147b

Browse files
authored
dynamic host volumes: change env vars, fixup auto-delete (#24943)
* plugin env: DHV_HOST_PATH->DHV_VOLUMES_DIR * client config: host_volumes_dir * plugin env: add namespace+nodepool * only auto-delete after error saving client state on *initial* create
1 parent 890daba commit 49c147b

18 files changed

+244
-161
lines changed

client/client.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -539,8 +539,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
539539

540540
// set up dynamic host volume manager
541541
c.hostVolumeManager = hvm.NewHostVolumeManager(logger, hvm.Config{
542-
PluginDir: cfg.HostVolumePluginDir,
543-
SharedMountDir: cfg.AllocMountsDir,
542+
PluginDir: c.GetConfig().HostVolumePluginDir,
543+
VolumesDir: c.GetConfig().HostVolumesDir,
544+
NodePool: c.Node().NodePool,
544545
StateMgr: c.stateDB,
545546
UpdateNodeVols: c.batchNodeUpdates.updateNodeFromHostVolume,
546547
})
@@ -702,6 +703,13 @@ func (c *Client) init() error {
702703

703704
c.stateDB = db
704705

706+
// Ensure host_volumes_dir config is not empty.
707+
if conf.HostVolumesDir == "" {
708+
conf = c.UpdateConfig(func(c *config.Config) {
709+
c.HostVolumesDir = filepath.Join(conf.StateDir, "host_volumes")
710+
})
711+
}
712+
705713
// Ensure the alloc mounts dir exists if we are configured with a custom path.
706714
if conf.AllocMountsDir != "" {
707715
if err := os.MkdirAll(conf.AllocMountsDir, 0o711); err != nil {

client/config/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,10 @@ type Config struct {
314314
// HostVolumes is a map of the configured host volumes by name.
315315
HostVolumes map[string]*structs.ClientHostVolumeConfig
316316

317+
// HostVolumesDir is the suggested directory for plugins to put volumes.
318+
// Volume plugins may ignore this suggestion, but we provide this default.
319+
HostVolumesDir string
320+
317321
// HostVolumePluginDir is the directory with dynamic host volume plugins.
318322
HostVolumePluginDir string
319323

client/fingerprint/dynamic_host_volumes.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (h *DynamicHostVolumePluginFingerprint) Fingerprint(request *FingerprintReq
4444
return nil
4545
}
4646

47-
plugins, err := GetHostVolumePluginVersions(h.logger, pluginDir)
47+
plugins, err := GetHostVolumePluginVersions(h.logger, pluginDir, request.Node.NodePool)
4848
if err != nil {
4949
if os.IsNotExist(err) {
5050
h.logger.Debug("plugin dir does not exist", "dir", pluginDir)
@@ -74,10 +74,10 @@ func (h *DynamicHostVolumePluginFingerprint) Periodic() (bool, time.Duration) {
7474
return false, 0
7575
}
7676

77-
// GetHostVolumePluginVersions finds all the executable files on disk
78-
// that respond to a Version call (arg $1 = 'version' / env $OPERATION = 'version')
79-
// The return map's keys are plugin IDs, and the values are version strings.
80-
func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string]string, error) {
77+
// GetHostVolumePluginVersions finds all the executable files on disk that
78+
// respond to a `fingerprint` call. The return map's keys are plugin IDs,
79+
// and the values are version strings.
80+
func GetHostVolumePluginVersions(log hclog.Logger, pluginDir, nodePool string) (map[string]string, error) {
8181
files, err := helper.FindExecutableFiles(pluginDir)
8282
if err != nil {
8383
return nil, err
@@ -97,7 +97,7 @@ func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string
9797

9898
log := log.With("plugin_id", file)
9999

100-
p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "")
100+
p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "", nodePool)
101101
if err != nil {
102102
log.Warn("error getting plugin", "error", err)
103103
return

client/host_volume_endpoint_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestHostVolume(t *testing.T) {
3131
StateMgr: client.stateDB,
3232
UpdateNodeVols: client.updateNodeFromHostVol,
3333
PluginDir: "/no/ext/plugins",
34-
SharedMountDir: tmp,
34+
VolumesDir: tmp,
3535
})
3636
client.hostVolumeManager = manager
3737
hostPathCreate := filepath.Join(tmp, "test-vol-id-1")
@@ -177,7 +177,7 @@ func TestHostVolume(t *testing.T) {
177177
StateMgr: client.stateDB,
178178
UpdateNodeVols: client.updateNodeFromHostVol,
179179
PluginDir: "/no/ext/plugins",
180-
SharedMountDir: "host_volume_endpoint_test.go",
180+
VolumesDir: "host_volume_endpoint_test.go",
181181
})
182182

183183
req := &cstructs.ClientHostVolumeCreateRequest{

client/hostvolumemanager/host_volume_plugin.go

+72-58
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,18 @@ import (
2323

2424
const (
2525
// environment variables for external plugins
26+
2627
EnvOperation = "DHV_OPERATION"
27-
EnvHostPath = "DHV_HOST_PATH"
28-
EnvNodeID = "DHV_NODE_ID"
28+
EnvVolumesDir = "DHV_VOLUMES_DIR"
29+
EnvPluginDir = "DHV_PLUGIN_DIR"
30+
EnvCreatedPath = "DHV_CREATED_PATH"
31+
EnvNamespace = "DHV_NAMESPACE"
2932
EnvVolumeName = "DHV_VOLUME_NAME"
3033
EnvVolumeID = "DHV_VOLUME_ID"
34+
EnvNodeID = "DHV_NODE_ID"
35+
EnvNodePool = "DHV_NODE_POOL"
3136
EnvCapacityMin = "DHV_CAPACITY_MIN_BYTES"
3237
EnvCapacityMax = "DHV_CAPACITY_MAX_BYTES"
33-
EnvPluginDir = "DHV_PLUGIN_DIR"
3438
EnvParameters = "DHV_PARAMETERS"
3539
)
3640

@@ -62,10 +66,10 @@ const HostVolumePluginMkdirVersion = "0.0.1"
6266
var _ HostVolumePlugin = &HostVolumePluginMkdir{}
6367

6468
// HostVolumePluginMkdir is a plugin that creates a directory within the
65-
// specified TargetPath. It is built-in to Nomad, so is always available.
69+
// specified VolumesDir. It is built-in to Nomad, so is always available.
6670
type HostVolumePluginMkdir struct {
6771
ID string
68-
TargetPath string
72+
VolumesDir string
6973

7074
log hclog.Logger
7175
}
@@ -80,7 +84,7 @@ func (p *HostVolumePluginMkdir) Fingerprint(_ context.Context) (*PluginFingerpri
8084
func (p *HostVolumePluginMkdir) Create(_ context.Context,
8185
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
8286

83-
path := filepath.Join(p.TargetPath, req.ID)
87+
path := filepath.Join(p.VolumesDir, req.ID)
8488
log := p.log.With(
8589
"operation", "create",
8690
"volume_id", req.ID,
@@ -102,7 +106,7 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
102106
return nil, err
103107
}
104108

105-
err := os.Mkdir(path, 0o700)
109+
err := os.MkdirAll(path, 0o700)
106110
if err != nil {
107111
log.Debug("error with plugin", "error", err)
108112
return nil, err
@@ -113,7 +117,7 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
113117
}
114118

115119
func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
116-
path := filepath.Join(p.TargetPath, req.ID)
120+
path := filepath.Join(p.VolumesDir, req.ID)
117121
log := p.log.With(
118122
"operation", "delete",
119123
"volume_id", req.ID,
@@ -135,7 +139,7 @@ var _ HostVolumePlugin = &HostVolumePluginExternal{}
135139
// NewHostVolumePluginExternal returns an external host volume plugin
136140
// if the specified executable exists on disk.
137141
func NewHostVolumePluginExternal(log hclog.Logger,
138-
pluginDir, filename, targetPath string) (*HostVolumePluginExternal, error) {
142+
pluginDir, filename, volumesDir, nodePool string) (*HostVolumePluginExternal, error) {
139143
// this should only be called with already-detected executables,
140144
// but we'll double-check it anyway, so we can provide a tidy error message
141145
// if it has changed between fingerprinting and execution.
@@ -153,8 +157,9 @@ func NewHostVolumePluginExternal(log hclog.Logger,
153157
return &HostVolumePluginExternal{
154158
ID: filename,
155159
Executable: executable,
156-
TargetPath: targetPath,
160+
VolumesDir: volumesDir,
157161
PluginDir: pluginDir,
162+
NodePool: nodePool,
158163
log: log,
159164
}, nil
160165
}
@@ -166,16 +171,17 @@ func NewHostVolumePluginExternal(log hclog.Logger,
166171
type HostVolumePluginExternal struct {
167172
ID string
168173
Executable string
169-
TargetPath string
174+
VolumesDir string
170175
PluginDir string
176+
NodePool string
171177

172178
log hclog.Logger
173179
}
174180

175181
// Fingerprint calls the executable with the following parameters:
176-
// arguments: fingerprint
182+
// arguments: $1=fingerprint
177183
// environment:
178-
// DHV_OPERATION=fingerprint
184+
// - DHV_OPERATION=fingerprint
179185
//
180186
// Response should be valid JSON on stdout, with a "version" key, e.g.:
181187
// {"version": "0.0.1"}
@@ -205,21 +211,23 @@ func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFing
205211
}
206212

207213
// Create calls the executable with the following parameters:
208-
// arguments: create {path to create}
214+
// arguments: $1=create
209215
// environment:
210-
// DHV_OPERATION=create
211-
// DHV_HOST_PATH={path to create}
212-
// DHV_NODE_ID={Nomad node ID}
213-
// DHV_VOLUME_NAME={name from the volume specification}
214-
// DHV_VOLUME_ID={Nomad volume ID}
215-
// DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec}
216-
// DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec}
217-
// DHV_PARAMETERS={json of parameters from the volume spec}
218-
// DHV_PLUGIN_DIR={path to directory containing plugins}
216+
// - DHV_OPERATION=create
217+
// - DHV_VOLUMES_DIR={directory to put the volume in}
218+
// - DHV_PLUGIN_DIR={path to directory containing plugins}
219+
// - DHV_NAMESPACE={volume namespace}
220+
// - DHV_VOLUME_NAME={name from the volume specification}
221+
// - DHV_VOLUME_ID={volume ID generated by Nomad}
222+
// - DHV_NODE_ID={Nomad node ID}
223+
// - DHV_NODE_POOL={Nomad node pool}
224+
// - DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec, expressed in bytes}
225+
// - DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec, expressed in bytes}
226+
// - DHV_PARAMETERS={stringified json of parameters from the volume spec}
219227
//
220228
// Response should be valid JSON on stdout with "path" and "bytes", e.g.:
221-
// {"path": $HOST_PATH, "bytes": 50000000}
222-
// "path" must be provided to confirm that the requested path is what was
229+
// {"path": "/path/that/was/created", "bytes": 50000000}
230+
// "path" must be provided to confirm the requested path is what was
223231
// created by the plugin. "bytes" is the actual size of the volume created
224232
// by the plugin; if excluded, it will default to 0.
225233
//
@@ -233,14 +241,22 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
233241
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
234242
}
235243
envVars := []string{
236-
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
244+
fmt.Sprintf("%s=%s", EnvOperation, "create"),
245+
fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir),
246+
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
247+
fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool),
248+
// values from volume spec
249+
fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace),
237250
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
251+
fmt.Sprintf("%s=%s", EnvVolumeID, req.ID),
238252
fmt.Sprintf("%s=%d", EnvCapacityMin, req.RequestedCapacityMinBytes),
239253
fmt.Sprintf("%s=%d", EnvCapacityMax, req.RequestedCapacityMaxBytes),
254+
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
240255
fmt.Sprintf("%s=%s", EnvParameters, params),
241256
}
242257

243-
stdout, _, err := p.runPlugin(ctx, "create", req.ID, envVars)
258+
log := p.log.With("volume_name", req.Name, "volume_id", req.ID)
259+
stdout, _, err := p.runPlugin(ctx, log, "create", envVars)
244260
if err != nil {
245261
return nil, fmt.Errorf("error creating volume %q with plugin %q: %w", req.ID, p.ID, err)
246262
}
@@ -253,20 +269,22 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
253269
// an error here after the plugin has done who-knows-what.
254270
return nil, err
255271
}
256-
// TODO: validate returned host path
257272
return &pluginResp, nil
258273
}
259274

260275
// Delete calls the executable with the following parameters:
261-
// arguments: delete {path to create}
276+
// arguments: $1=delete
262277
// environment:
263-
// DHV_OPERATION=delete
264-
// DHV_HOST_PATH={path to create}
265-
// DHV_NODE_ID={Nomad node ID}
266-
// DHV_VOLUME_NAME={name from the volume specification}
267-
// DHV_VOLUME_ID={Nomad volume ID}
268-
// DHV_PARAMETERS={json of parameters from the volume spec}
269-
// DHV_PLUGIN_DIR={path to directory containing plugins}
278+
// - DHV_OPERATION=delete
279+
// - DHV_CREATED_PATH={path that `create` returned}
280+
// - DHV_VOLUMES_DIR={directory that volumes should be put in}
281+
// - DHV_PLUGIN_DIR={path to directory containing plugins}
282+
// - DHV_NAMESPACE={volume namespace}
283+
// - DHV_VOLUME_NAME={name from the volume specification}
284+
// - DHV_VOLUME_ID={volume ID generated by Nomad}
285+
// - DHV_NODE_ID={Nomad node ID}
286+
// - DHV_NODE_POOL={Nomad node pool}
287+
// - DHV_PARAMETERS={stringified json of parameters from the volume spec}
270288
//
271289
// Response on stdout is discarded.
272290
//
@@ -280,42 +298,38 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context,
280298
return fmt.Errorf("error marshaling volume pramaters: %w", err)
281299
}
282300
envVars := []string{
283-
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
301+
fmt.Sprintf("%s=%s", EnvOperation, "delete"),
302+
fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir),
303+
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
304+
fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool),
305+
// from create response
306+
fmt.Sprintf("%s=%s", EnvCreatedPath, req.HostPath),
307+
// values from volume spec
308+
fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace),
284309
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
310+
fmt.Sprintf("%s=%s", EnvVolumeID, req.ID),
311+
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
285312
fmt.Sprintf("%s=%s", EnvParameters, params),
286313
}
287314

288-
_, _, err = p.runPlugin(ctx, "delete", req.ID, envVars)
315+
log := p.log.With("volume_name", req.Name, "volume_id", req.ID)
316+
_, _, err = p.runPlugin(ctx, log, "delete", envVars)
289317
if err != nil {
290318
return fmt.Errorf("error deleting volume %q with plugin %q: %w", req.ID, p.ID, err)
291319
}
292320
return nil
293321
}
294322

295-
// runPlugin executes the... executable with these additional env vars:
296-
// DHV_OPERATION={op}
297-
// DHV_HOST_PATH={path to create}
298-
// DHV_VOLUME_ID={Nomad volume ID}
299-
// DHV_PLUGIN_DIR={path to directory containing plugins}
300-
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
301-
op, volID string, env []string) (stdout, stderr []byte, err error) {
323+
// runPlugin executes the... executable
324+
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, log hclog.Logger,
325+
op string, env []string) (stdout, stderr []byte, err error) {
302326

303-
path := filepath.Join(p.TargetPath, volID)
304-
log := p.log.With(
305-
"operation", op,
306-
"volume_id", volID,
307-
"path", path)
327+
log = log.With("operation", op)
308328
log.Debug("running plugin")
309329

310330
// set up plugin execution
311-
cmd := exec.CommandContext(ctx, p.Executable, op, path)
312-
313-
cmd.Env = append([]string{
314-
fmt.Sprintf("%s=%s", EnvOperation, op),
315-
fmt.Sprintf("%s=%s", EnvHostPath, path),
316-
fmt.Sprintf("%s=%s", EnvVolumeID, volID),
317-
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
318-
}, env...)
331+
cmd := exec.CommandContext(ctx, p.Executable, op)
332+
cmd.Env = env
319333

320334
stdout, stderr, err = runCommand(cmd)
321335

0 commit comments

Comments
 (0)