Skip to content

Commit 388af69

Browse files
author
Erik Hollensbe
committed
*: API service abstraction
This set of interfaces (api/interfaces.go) allows us to project volplugin as a conduit for any scheduler to schedule mounts. While not here yet, this is a prerequisite to kubernetes, nomad, and native mesos support. Many changes have been made to accomodate this. Too many to split into separate commits at this point. Instead, I will itemize some of them below: * Mount management in volplugin has drastically changed (for the better). A number of panics and other anomalies related to races at boot time were fixed as a result of this change. These are in api/internals/mount and volplugin/runtime.go. * Mount reporting in NFS vastly improved. This logic is located in storage/backend/nfs/mount.go. * As a result of the rewrite, a new umbrella type for volumes (api.Volume) has been introduced to attempt to unify all the other volume types in the system. It should be used for anything that will write to DB or the API service(s). storage.Volume still exists for storage backends, however. * A number of timing issues (heisenbugs) with systemtests were resolved due to the rewrite exposing the timing issues. Signed-off-by: Erik Hollensbe <[email protected]>
1 parent 04d1402 commit 388af69

33 files changed

+1053
-1062
lines changed

api/api.go

+94-145
Original file line numberDiff line numberDiff line change
@@ -1,187 +1,136 @@
11
package api
22

33
import (
4-
"encoding/json"
4+
"bytes"
5+
"fmt"
6+
"io"
57
"io/ioutil"
68
"net/http"
7-
"os"
89
"strings"
10+
"sync"
911

10-
"github.com/contiv/errored"
12+
"github.com/contiv/volplugin/api/internals/mount"
1113
"github.com/contiv/volplugin/config"
1214
"github.com/contiv/volplugin/errors"
1315
"github.com/contiv/volplugin/lock"
14-
"github.com/contiv/volplugin/storage/control"
16+
"github.com/contiv/volplugin/storage"
17+
"github.com/contiv/volplugin/storage/backend"
1518

1619
log "github.com/Sirupsen/logrus"
1720
)
1821

19-
// API is a typed representation of API handlers.
20-
type API struct {
21-
Hostname string
22-
DockerPlugin bool
23-
Client *config.Client
24-
Global **config.Global // double pointer so we can track watch updates
25-
Lock *lock.Driver
22+
// Volume abstracts the notion of a volume as it is received from the plugins.
23+
// It is used heavily by the interfaces.
24+
type Volume struct {
25+
Mountpoint string
26+
Policy string
27+
Name string
28+
Options map[string]string
2629
}
2730

28-
// NewAPI returns an *API
29-
func NewAPI(client *config.Client, global **config.Global, dockerPlugin bool) *API {
30-
return &API{Client: client, Global: global, DockerPlugin: dockerPlugin}
31+
func (v *Volume) String() string {
32+
return fmt.Sprintf("%s/%s", v.Policy, v.Name)
3133
}
3234

33-
// Create fully creates a volume
34-
func (a *API) Create(w http.ResponseWriter, r *http.Request) {
35-
content, err := ioutil.ReadAll(r.Body)
36-
if err != nil {
37-
a.HTTPError(w, errors.ReadBody.Combine(err))
38-
return
39-
}
40-
41-
var req VolumeCreateRequest
35+
// API is a typed representation of API handlers.
36+
type API struct {
37+
Volplugin
38+
Hostname string
39+
Client *config.Client
40+
Global **config.Global // double pointer so we can track watch updates
41+
Lock *lock.Driver
42+
lockStopChanMutex sync.Mutex
43+
lockStopChans map[string]chan struct{}
44+
MountCounter *mount.Counter
45+
MountCollection *mount.Collection
46+
}
4247

43-
if err := json.Unmarshal(content, &req); err != nil {
44-
a.HTTPError(w, errors.UnmarshalRequest.Combine(err))
45-
return
48+
// NewAPI returns an *API
49+
func NewAPI(volplugin Volplugin, hostname string, client *config.Client, global **config.Global) *API {
50+
return &API{
51+
Volplugin: volplugin,
52+
Hostname: hostname,
53+
Client: client,
54+
Global: global,
55+
Lock: lock.NewDriver(client),
56+
MountCollection: mount.NewCollection(),
57+
MountCounter: mount.NewCounter(),
58+
lockStopChans: map[string]chan struct{}{},
4659
}
60+
}
4761

48-
parts := strings.SplitN(req.Name, "/", 2)
49-
if len(parts) != 2 {
50-
a.HTTPError(w, errors.UnmarshalRequest.Combine(errors.InvalidVolume))
51-
return
62+
// RESTHTTPError returns a 500 status with the error.
63+
func RESTHTTPError(w http.ResponseWriter, err error) {
64+
if err == nil {
65+
err = errors.Unknown
5266
}
5367

54-
policy, volume := parts[0], parts[1]
68+
log.Errorf("Returning HTTP error handling plugin negotiation: %s", err.Error())
69+
http.Error(w, err.Error(), http.StatusInternalServerError)
70+
}
5571

56-
if policy == "" {
57-
a.HTTPError(w, errors.GetPolicy.Combine(errored.Errorf("policy was blank for volume %q", req.Name)))
58-
return
59-
}
72+
// Action is a catchall for additional driver functions.
73+
func Action(w http.ResponseWriter, r *http.Request) {
74+
defer r.Body.Close()
75+
log.Debugf("Unknown driver action at %q", r.URL.Path)
76+
content, _ := ioutil.ReadAll(r.Body)
77+
log.Debug("Body content:", string(content))
78+
w.WriteHeader(503)
79+
}
6080

61-
if volume == "" {
62-
a.HTTPError(w, errors.GetVolume.Combine(errored.Errorf("volume was blank for volume %q", req.Name)))
63-
return
64-
}
81+
// LogHandler injects a request logging handler if debugging is active. In either event it will dispatch.
82+
func LogHandler(name string, debug bool, actionFunc func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
83+
return func(w http.ResponseWriter, r *http.Request) {
84+
if debug {
85+
buf := new(bytes.Buffer)
86+
io.Copy(buf, r.Body)
87+
log.Debugf("Dispatching %s with %v", name, strings.TrimSpace(string(buf.Bytes())))
88+
var writer *io.PipeWriter
89+
r.Body, writer = io.Pipe()
90+
go func() {
91+
io.Copy(writer, buf)
92+
writer.Close()
93+
}()
94+
}
6595

66-
if vol, err := a.Client.GetVolume(policy, volume); err == nil && vol != nil {
67-
a.HTTPError(w, errors.Exists)
68-
return
96+
actionFunc(w, r)
6997
}
98+
}
7099

71-
log.Infof("Creating volume %q", req.Name)
72-
73-
hostname, err := os.Hostname()
100+
// GetStorageParameters accepts a Volume API request and turns it into several internal structs.
101+
func (a *API) GetStorageParameters(uc *Volume) (storage.MountDriver, *config.Volume, storage.DriverOptions, error) {
102+
driverOpts := storage.DriverOptions{}
103+
volConfig, err := a.Client.GetVolume(uc.Policy, uc.Name)
74104
if err != nil {
75-
a.HTTPError(w, errors.GetHostname.Combine(err))
76-
return
105+
return nil, nil, driverOpts, err
77106
}
78107

79-
policyObj, err := a.Client.GetPolicy(policy)
108+
driver, err := backend.NewMountDriver(volConfig.Backends.Mount, (*a.Global).MountPath)
80109
if err != nil {
81-
a.HTTPError(w, errors.GetPolicy.Combine(errored.New(policy)).Combine(err))
82-
return
110+
return nil, nil, driverOpts, errors.GetDriver.Combine(err)
83111
}
84112

85-
uc := &config.UseMount{
86-
Volume: strings.Join([]string{policy, volume}, "/"),
87-
Reason: lock.ReasonCreate,
88-
Hostname: hostname,
89-
}
90-
91-
snapUC := &config.UseSnapshot{
92-
Volume: strings.Join([]string{policy, volume}, "/"),
93-
Reason: lock.ReasonCreate,
94-
}
95-
96-
global := *a.Global
97-
98-
err = lock.NewDriver(a.Client).ExecuteWithMultiUseLock([]config.UseLocker{uc, snapUC}, global.Timeout, func(ld *lock.Driver, ucs []config.UseLocker) error {
99-
volConfig, err := a.Client.CreateVolume(config.Request{Policy: policy, Volume: volume, Options: req.Opts})
100-
if err != nil {
101-
return err
102-
}
103-
104-
log.Debugf("Volume Create: %#v", *volConfig)
105-
106-
do, err := control.CreateVolume(policyObj, volConfig, global.Timeout)
107-
if err == errors.NoActionTaken {
108-
goto publish
109-
}
110-
111-
if err != nil {
112-
return errors.CreateVolume.Combine(err)
113-
}
114-
115-
if err := control.FormatVolume(volConfig, do); err != nil {
116-
if err := control.RemoveVolume(volConfig, global.Timeout); err != nil {
117-
log.Errorf("Error during cleanup of failed format: %v", err)
118-
}
119-
return errors.FormatVolume.Combine(err)
120-
}
121-
122-
publish:
123-
if err := a.Client.PublishVolume(volConfig); err != nil && err != errors.Exists {
124-
// FIXME this shouldn't leak down to the client.
125-
if _, ok := err.(*errored.Error); !ok {
126-
return errors.PublishVolume.Combine(err)
127-
}
128-
return err
129-
}
130-
131-
content, err = json.Marshal(volConfig)
132-
if err != nil {
133-
return errors.MarshalPolicy.Combine(err)
134-
}
135-
136-
w.Write(content)
137-
return nil
138-
})
139-
140-
if err != nil && err != errors.Exists {
141-
a.HTTPError(w, errors.CreateVolume.Combine(err))
142-
return
113+
driverOpts, err = volConfig.ToDriverOptions((*a.Global).Timeout)
114+
if err != nil {
115+
return nil, nil, driverOpts, errors.UnmarshalRequest.Combine(err)
143116
}
144-
}
145117

146-
// HTTPError is a generic HTTP error function that works across the plugin and
147-
// REST interfaces. It is intended to be used by handlers that exist in this
148-
// package.
149-
func (a *API) HTTPError(w http.ResponseWriter, err error) {
150-
if a.DockerPlugin {
151-
DockerHTTPError(w, err)
152-
} else {
153-
RESTHTTPError(w, err)
154-
}
118+
return driver, volConfig, driverOpts, nil
155119
}
156120

157-
// DockerHTTPError returns a 200 status to docker with an error struct. It returns
158-
// 500 if marshalling failed.
159-
func DockerHTTPError(w http.ResponseWriter, err error) {
160-
content, errc := json.Marshal(VolumeCreateResponse{Mountpoint: "", Err: err.Error()})
161-
if errc != nil {
162-
http.Error(w, errc.Error(), http.StatusInternalServerError)
163-
return
164-
}
165-
166-
log.Errorf("Returning HTTP error handling plugin negotiation: %s", err.Error())
167-
http.Error(w, string(content), http.StatusOK)
121+
// AddStopChan adds a stop channel for the purposes of controlling mount ttl refresh goroutines
122+
func (a *API) AddStopChan(name string, stopChan chan struct{}) {
123+
a.lockStopChanMutex.Lock()
124+
a.lockStopChans[name] = stopChan
125+
a.lockStopChanMutex.Unlock()
168126
}
169127

170-
// RESTHTTPError returns a 500 status with the error.
171-
func RESTHTTPError(w http.ResponseWriter, err error) {
172-
if err == nil {
173-
err = errors.Unknown
128+
// RemoveStopChan removes a stop channel for the purposes of controlling mount ttl refresh goroutines
129+
func (a *API) RemoveStopChan(name string) {
130+
a.lockStopChanMutex.Lock()
131+
if test, ok := a.lockStopChans[name]; ok && test != nil {
132+
a.lockStopChans[name] <- struct{}{}
174133
}
175-
176-
log.Errorf("Returning HTTP error handling plugin negotiation: %s", err.Error())
177-
http.Error(w, err.Error(), http.StatusInternalServerError)
178-
}
179-
180-
// Action is a catchall for additional driver functions.
181-
func Action(w http.ResponseWriter, r *http.Request) {
182-
defer r.Body.Close()
183-
log.Debugf("Unknown driver action at %q", r.URL.Path)
184-
content, _ := ioutil.ReadAll(r.Body)
185-
log.Debug("Body content:", string(content))
186-
w.WriteHeader(503)
134+
delete(a.lockStopChans, name)
135+
a.lockStopChanMutex.Unlock()
187136
}

0 commit comments

Comments
 (0)