@@ -24,6 +24,7 @@ package storage
24
24
25
25
import (
26
26
"context"
27
+ "crypto/sha1"
27
28
"encoding/json"
28
29
"fmt"
29
30
"math/rand"
@@ -32,6 +33,7 @@ import (
32
33
"sort"
33
34
"strconv"
34
35
"strings"
36
+ "time"
35
37
36
38
"k8s.io/apimachinery/pkg/api/resource"
37
39
@@ -41,6 +43,8 @@ import (
41
43
42
44
api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
43
45
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner"
46
+ "github.com/arangodb/kube-arangodb/pkg/util/constants"
47
+ "github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
44
48
)
45
49
46
50
const (
@@ -72,7 +76,24 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
72
76
clients [i ], clients [j ] = clients [j ], clients [i ]
73
77
})
74
78
79
+ var nodeClientMap map [string ]provisioner.API
75
80
for i , claim := range unboundClaims {
81
+ // Find deployment name & role in the claim (if any)
82
+ deplName , role , enforceAniAffinity := getDeploymentInfo (claim )
83
+ allowedClients := clients
84
+ if enforceAniAffinity && deplName != "" {
85
+ // Select nodes to choose from such that no volume in group lands on the same node
86
+ if nodeClientMap == nil {
87
+ nodeClientMap = createNodeClientMap (ctx , clients )
88
+ }
89
+ var err error
90
+ allowedClients , err = ls .filterAllowedNodes (nodeClientMap , deplName , role )
91
+ if err != nil {
92
+ log .Warn ().Err (err ).Msg ("Failed to filter allowed nodes" )
93
+ continue // We'll try this claim again later
94
+ }
95
+ }
96
+
76
97
// Find size of PVC
77
98
volSize := defaultVolumeSize
78
99
if reqStorage := claim .Spec .Resources .Requests .StorageEphemeral (); reqStorage != nil {
@@ -81,7 +102,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
81
102
}
82
103
}
83
104
// Create PV
84
- if err := ls .createPV (ctx , apiObject , clients , i , volSize ); err != nil {
105
+ if err := ls .createPV (ctx , apiObject , allowedClients , i , volSize , claim , deplName , role ); err != nil {
85
106
log .Error ().Err (err ).Msg ("Failed to create PersistentVolume" )
86
107
}
87
108
}
@@ -90,7 +111,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
90
111
}
91
112
92
113
// createPV creates a PersistentVolume.
93
- func (ls * LocalStorage ) createPV (ctx context.Context , apiObject * api.ArangoLocalStorage , clients []provisioner.API , clientsOffset int , volSize int64 ) error {
114
+ func (ls * LocalStorage ) createPV (ctx context.Context , apiObject * api.ArangoLocalStorage , clients []provisioner.API , clientsOffset int , volSize int64 , claim v1. PersistentVolumeClaim , deploymentName , role string ) error {
94
115
log := ls .deps .Log
95
116
// Try clients
96
117
for clientIdx := 0 ; clientIdx < len (clients ); clientIdx ++ {
@@ -117,7 +138,7 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
117
138
continue
118
139
}
119
140
// Create a volume
120
- pvName := apiObject .GetName () + "-" + name
141
+ pvName := strings . ToLower ( apiObject .GetName () + "-" + shortHash ( info . NodeName ) + "-" + name )
121
142
volumeMode := v1 .PersistentVolumeFilesystem
122
143
nodeAff , err := createNodeAffinity (info .NodeName )
123
144
if err != nil {
@@ -131,6 +152,10 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
131
152
v1 .AlphaStorageNodeAffinityAnnotation : nodeAff ,
132
153
nodeNameAnnotation : info .NodeName ,
133
154
},
155
+ Labels : map [string ]string {
156
+ k8sutil .LabelKeyArangoDeployment : deploymentName ,
157
+ k8sutil .LabelKeyRole : role ,
158
+ },
134
159
},
135
160
Spec : v1.PersistentVolumeSpec {
136
161
Capacity : v1.ResourceList {
@@ -147,6 +172,13 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
147
172
},
148
173
StorageClassName : apiObject .Spec .StorageClass .Name ,
149
174
VolumeMode : & volumeMode ,
175
+ ClaimRef : & v1.ObjectReference {
176
+ Kind : "PersistentVolumeClaim" ,
177
+ APIVersion : "" ,
178
+ Name : claim .GetName (),
179
+ Namespace : claim .GetNamespace (),
180
+ UID : claim .GetUID (),
181
+ },
150
182
},
151
183
}
152
184
// Attach PV to ArangoLocalStorage
@@ -159,6 +191,16 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
159
191
Str ("name" , pvName ).
160
192
Str ("node-name" , info .NodeName ).
161
193
Msg ("Created PersistentVolume" )
194
+
195
+ // Bind claim to volume
196
+ if err := ls .bindClaimToVolume (claim , pv .GetName ()); err != nil {
197
+ // Try to delete the PV now
198
+ if err := ls .deps .KubeCli .CoreV1 ().PersistentVolumes ().Delete (pv .GetName (), & metav1.DeleteOptions {}); err != nil {
199
+ log .Error ().Err (err ).Msg ("Failed to delete PV after binding PVC failed" )
200
+ }
201
+ return maskAny (err )
202
+ }
203
+
162
204
return nil
163
205
}
164
206
}
@@ -204,3 +246,96 @@ func createNodeAffinity(nodeName string) (string, error) {
204
246
}
205
247
return string (encoded ), nil
206
248
}
249
+
250
+ // createNodeClientMap creates a map from node name to API.
251
+ // Clients that do not respond properly on a GetNodeInfo request are
252
+ // ignored.
253
+ func createNodeClientMap (ctx context.Context , clients []provisioner.API ) map [string ]provisioner.API {
254
+ result := make (map [string ]provisioner.API )
255
+ for _ , c := range clients {
256
+ if info , err := c .GetNodeInfo (ctx ); err == nil {
257
+ result [info .NodeName ] = c
258
+ }
259
+ }
260
+ return result
261
+ }
262
+
263
+ // getDeploymentInfo returns the name of the deployment that created the given claim,
264
+ // the role of the server that the claim is used for and the value for `enforceAntiAffinity`.
265
+ // If not found, empty strings are returned.
266
+ // Returns deploymentName, role, enforceAntiAffinity.
267
+ func getDeploymentInfo (pvc v1.PersistentVolumeClaim ) (string , string , bool ) {
268
+ deploymentName := pvc .GetLabels ()[k8sutil .LabelKeyArangoDeployment ]
269
+ role := pvc .GetLabels ()[k8sutil .LabelKeyRole ]
270
+ enforceAntiAffinity , _ := strconv .ParseBool (pvc .GetAnnotations ()[constants .AnnotationEnforceAntiAffinity ]) // If annotation empty, this will yield false.
271
+ return deploymentName , role , enforceAntiAffinity
272
+ }
273
+
274
+ // filterAllowedNodes returns those clients that do not yet have a volume for the given deployment name & role.
275
+ func (ls * LocalStorage ) filterAllowedNodes (clients map [string ]provisioner.API , deploymentName , role string ) ([]provisioner.API , error ) {
276
+ // Find all PVs for given deployment & role
277
+ list , err := ls .deps .KubeCli .CoreV1 ().PersistentVolumes ().List (metav1.ListOptions {
278
+ LabelSelector : fmt .Sprintf ("%s=%s,%s=%s" , k8sutil .LabelKeyArangoDeployment , deploymentName , k8sutil .LabelKeyRole , role ),
279
+ })
280
+ if err != nil {
281
+ return nil , maskAny (err )
282
+ }
283
+ excludedNodes := make (map [string ]struct {})
284
+ for _ , pv := range list .Items {
285
+ nodeName := pv .GetAnnotations ()[nodeNameAnnotation ]
286
+ excludedNodes [nodeName ] = struct {}{}
287
+ }
288
+ result := make ([]provisioner.API , 0 , len (clients ))
289
+ for nodeName , c := range clients {
290
+ if _ , found := excludedNodes [nodeName ]; ! found {
291
+ result = append (result , c )
292
+ }
293
+ }
294
+ return result , nil
295
+ }
296
+
297
+ // bindClaimToVolume tries to bind the given claim to the volume with given name.
298
+ // If the claim has been updated, the function retries several times.
299
+ func (ls * LocalStorage ) bindClaimToVolume (claim v1.PersistentVolumeClaim , volumeName string ) error {
300
+ log := ls .deps .Log .With ().Str ("pvc-name" , claim .GetName ()).Str ("volume-name" , volumeName ).Logger ()
301
+ pvcs := ls .deps .KubeCli .CoreV1 ().PersistentVolumeClaims (claim .GetNamespace ())
302
+
303
+ for attempt := 0 ; attempt < 10 ; attempt ++ {
304
+ // Backoff if needed
305
+ time .Sleep (time .Millisecond * time .Duration (10 * attempt ))
306
+
307
+ // Fetch latest version of claim
308
+ updated , err := pvcs .Get (claim .GetName (), metav1.GetOptions {})
309
+ if k8sutil .IsNotFound (err ) {
310
+ return maskAny (err )
311
+ } else if err != nil {
312
+ log .Warn ().Err (err ).Msg ("Failed to load updated PersistentVolumeClaim" )
313
+ continue
314
+ }
315
+
316
+ // Check claim. If already bound, bail out
317
+ if ! pvcNeedsVolume (* updated ) {
318
+ return maskAny (fmt .Errorf ("PersistentVolumeClaim '%s' no longer needs a volume" , claim .GetName ()))
319
+ }
320
+
321
+ // Try to bind
322
+ updated .Spec .VolumeName = volumeName
323
+ if _ , err := pvcs .Update (updated ); k8sutil .IsConflict (err ) {
324
+ // Claim modified already, retry
325
+ log .Debug ().Err (err ).Msg ("PersistentVolumeClaim has been modified. Retrying." )
326
+ } else if err != nil {
327
+ log .Error ().Err (err ).Msg ("Failed to bind PVC to volume" )
328
+ return maskAny (err )
329
+ }
330
+ log .Debug ().Msg ("Bound volume to PersistentVolumeClaim" )
331
+ return nil
332
+ }
333
+ log .Error ().Msg ("All attempts to bind PVC to volume failed" )
334
+ return maskAny (fmt .Errorf ("All attempts to bind PVC to volume failed" ))
335
+ }
336
+
337
+ // shortHash creates a 6 letter hash of the given name.
338
+ func shortHash (name string ) string {
339
+ h := sha1 .Sum ([]byte (name ))
340
+ return fmt .Sprintf ("%0x" , h )[:6 ]
341
+ }
0 commit comments