-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathtest_util.go
638 lines (576 loc) · 20.3 KB
/
test_util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package tests
import (
"context"
"crypto/tls"
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"github.com/arangodb/arangosync/client"
"github.com/arangodb/arangosync/tasks"
driver "github.com/arangodb/go-driver"
vst "github.com/arangodb/go-driver/vst"
vstProtocol "github.com/arangodb/go-driver/vst/protocol"
"github.com/pkg/errors"
"github.com/rs/zerolog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
)
const (
deploymentReadyTimeout = time.Minute * 4
)
var (
maskAny = errors.WithStack
syncClientCache client.ClientCache
showEnterpriseImageOnce sync.Once
)
// CreateArangodClientForDNSName creates a go-driver client for a given DNS name.
func createArangodVSTClientForDNSName(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsName string, shortTimeout bool) (driver.Client, error) {
config := driver.ClientConfig{}
connConfig, err := createArangodVSTConfigForDNSNames(ctx, cli, apiObject, []string{dnsName}, shortTimeout)
if err != nil {
return nil, maskAny(err)
}
// TODO deal with TLS with proper CA checking
conn, err := vst.NewConnection(connConfig)
if err != nil {
return nil, maskAny(err)
}
// Create client
config = driver.ClientConfig{
Connection: conn,
}
auth := driver.BasicAuthentication("root", "")
if err != nil {
return nil, maskAny(err)
}
config.Authentication = auth
c, err := driver.NewClient(config)
if err != nil {
return nil, maskAny(err)
}
return c, nil
}
// createArangodVSTConfigForDNSNames creates a go-driver VST connection config for a given DNS names.
func createArangodVSTConfigForDNSNames(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsNames []string, shortTimeout bool) (vst.ConnectionConfig, error) {
scheme := "http"
tlsConfig := &tls.Config{}
timeout := 90 * time.Second
if shortTimeout {
timeout = 100 * time.Millisecond
}
if apiObject != nil && apiObject.Spec.IsSecure() {
scheme = "https"
tlsConfig = &tls.Config{InsecureSkipVerify: true}
}
transport := vstProtocol.TransportConfig{
IdleConnTimeout: timeout,
Version: vstProtocol.Version1_1,
}
connConfig := vst.ConnectionConfig{
TLSConfig: tlsConfig,
Transport: transport,
}
for _, dnsName := range dnsNames {
connConfig.Endpoints = append(connConfig.Endpoints, scheme+"://"+net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoPort)))
}
return connConfig, nil
}
// CreateArangodDatabaseVSTClient creates a go-driver client for accessing the entire cluster (or single server) via VST
func createArangodDatabaseVSTClient(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, shortTimeout bool) (driver.Client, error) {
// Create connection
dnsName := k8sutil.CreateDatabaseClientServiceDNSName(apiObject)
c, err := createArangodVSTClientForDNSName(ctx, cli, apiObject, dnsName, shortTimeout)
if err != nil {
return nil, maskAny(err)
}
return c, nil
}
// longOrSkip checks the short test flag.
// If short is set, the current test is skipped.
// If not, this function returns as normal.
func longOrSkip(t *testing.T) {
if testing.Short() {
t.Skip("Test skipped in short test")
}
}
// getEnterpriseImageOrSkip returns the docker image used for enterprise
// tests. If empty, enterprise tests are skipped.
func getEnterpriseImageOrSkip(t *testing.T) string {
image := strings.TrimSpace(os.Getenv("ENTERPRISEIMAGE"))
if image == "" {
t.Skip("Skipping test because ENTERPRISEIMAGE is not set")
} else {
showEnterpriseImageOnce.Do(func() {
t.Logf("Using enterprise image: %s", image)
})
}
return image
}
// shouldCleanDeployments returns true when deployments created
// by tests should be removed, even when the test fails.
func shouldCleanDeployments() bool {
return os.Getenv("CLEANDEPLOYMENTS") != ""
}
// mustNewKubeClient creates a kubernetes client
// failing the test on errors.
func mustNewKubeClient(t *testing.T) kubernetes.Interface {
c, err := k8sutil.NewKubeClient()
if err != nil {
t.Fatalf("Failed to create kube cli: %v", err)
}
return c
}
// DatabaseClientOptions contains options for creating an ArangoDB database client.
type DatabaseClientOptions struct {
ShortTimeout bool // If set, the connection timeout is set very short
UseVST bool // If set, a VST connection is created instead of an HTTP connection
}
// mustNewArangodDatabaseClient creates a new database client,
// failing the test on errors.
func mustNewArangodDatabaseClient(ctx context.Context, kubecli kubernetes.Interface, apiObject *api.ArangoDeployment, t *testing.T, options *DatabaseClientOptions) driver.Client {
var c driver.Client
var err error
shortTimeout := options != nil && options.ShortTimeout
useVST := options != nil && options.UseVST
if useVST {
c, err = createArangodDatabaseVSTClient(ctx, kubecli.CoreV1(), apiObject, shortTimeout)
} else {
c, err = arangod.CreateArangodDatabaseClient(ctx, kubecli.CoreV1(), apiObject, shortTimeout)
}
if err != nil {
t.Fatalf("Failed to create arango database client: %v", err)
}
return c
}
// mustNewArangoSyncClient creates a new arangosync client, with all syncmasters
// as endpoint. It is failing the test on errors.
func mustNewArangoSyncClient(ctx context.Context, kubecli kubernetes.Interface, apiObject *api.ArangoDeployment, t *testing.T) client.API {
ns := apiObject.GetNamespace()
secrets := kubecli.CoreV1().Secrets(ns)
secretName := apiObject.Spec.Sync.Authentication.GetJWTSecretName()
jwtToken, err := k8sutil.GetTokenSecret(secrets, secretName)
if err != nil {
t.Fatalf("Failed to get sync jwt secret '%s': %s", secretName, err)
}
// Fetch service DNS name
dnsName := k8sutil.CreateSyncMasterClientServiceDNSName(apiObject)
ep := client.Endpoint{"https://" + net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoSyncMasterPort))}
// Build client
log := zerolog.Logger{}
tlsAuth := tasks.TLSAuthentication{}
auth := client.NewAuthentication(tlsAuth, jwtToken)
insecureSkipVerify := true
c, err := syncClientCache.GetClient(log, ep, auth, insecureSkipVerify)
if err != nil {
t.Fatalf("Failed to get sync client: %s", err)
}
return c
}
// getNamespace returns the kubernetes namespace in which to run tests.
func getNamespace(t *testing.T) string {
ns := os.Getenv("TEST_NAMESPACE")
if ns == "" {
t.Fatal("Missing environment variable TEST_NAMESPACE")
}
return ns
}
// newDeployment creates a basic ArangoDeployment with configured
// type, name and image.
func newDeployment(name string) *api.ArangoDeployment {
depl := &api.ArangoDeployment{
TypeMeta: metav1.TypeMeta{
APIVersion: api.SchemeGroupVersion.String(),
Kind: api.ArangoDeploymentResourceKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: strings.ToLower(name),
},
}
// set default image to the value given in env
// some tests will override this value if they need a specific version
// like update tests
// if no value is given, use the operator default, which is arangodb/arangodb:latest
image := strings.TrimSpace(os.Getenv("ARANGODIMAGE"))
if image != "" {
depl.Spec.Image = util.NewString(image)
}
disableIPv6 := strings.TrimSpace(os.Getenv("TESTDISABLEIPV6"))
if disableIPv6 != "" && disableIPv6 != "0" {
depl.Spec.DisableIPv6 = util.NewBool(true)
}
return depl
}
// waitUntilDeployment waits until a deployment with given name in given namespace
// reached a state where the given predicate returns true.
func waitUntilDeployment(cli versioned.Interface, deploymentName, ns string, predicate func(*api.ArangoDeployment) error, timeout ...time.Duration) (*api.ArangoDeployment, error) {
var result *api.ArangoDeployment
op := func() error {
obj, err := cli.DatabaseV1alpha().ArangoDeployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
result = nil
return maskAny(err)
}
result = obj
if predicate != nil {
if err := predicate(obj); err != nil {
return maskAny(err)
}
}
return nil
}
actualTimeout := deploymentReadyTimeout
if len(timeout) > 0 {
actualTimeout = timeout[0]
}
if err := retry.Retry(op, actualTimeout); err != nil {
return nil, maskAny(err)
}
return result, nil
}
// waitUntilSecret waits until a secret with given name in given namespace
// reached a state where the given predicate returns true.
func waitUntilSecret(cli kubernetes.Interface, secretName, ns string, predicate func(*v1.Secret) error, timeout time.Duration) (*v1.Secret, error) {
var result *v1.Secret
op := func() error {
obj, err := cli.CoreV1().Secrets(ns).Get(secretName, metav1.GetOptions{})
if err != nil {
result = nil
return maskAny(err)
}
result = obj
if predicate != nil {
if err := predicate(obj); err != nil {
return maskAny(err)
}
}
return nil
}
if err := retry.Retry(op, timeout); err != nil {
return nil, maskAny(err)
}
return result, nil
}
// waitUntilSecretNotFound waits until a secret with given name in given namespace
// is no longer found.
func waitUntilSecretNotFound(cli kubernetes.Interface, secretName, ns string, timeout time.Duration) error {
op := func() error {
if _, err := cli.CoreV1().Secrets(ns).Get(secretName, metav1.GetOptions{}); k8sutil.IsNotFound(err) {
return nil
} else if err != nil {
return maskAny(err)
}
return maskAny(fmt.Errorf("Secret %s still there", secretName))
}
if err := retry.Retry(op, timeout); err != nil {
return maskAny(err)
}
return nil
}
// waitUntilClusterHealth waits until an arango cluster
// reached a state where the given predicate returns nil.
func waitUntilClusterHealth(cli driver.Client, predicate func(driver.ClusterHealth) error) error {
ctx := context.Background()
op := func() error {
cluster, err := cli.Cluster(ctx)
if err != nil {
return maskAny(err)
}
h, err := cluster.Health(ctx)
if err != nil {
return maskAny(err)
}
if predicate != nil {
if err := predicate(h); err != nil {
return maskAny(err)
}
}
return nil
}
if err := retry.Retry(op, deploymentReadyTimeout); err != nil {
return maskAny(err)
}
return nil
}
// waitUntilVersionUp waits until the arango database responds to
// an `/_api/version` request without an error. An additional Predicate
// can do a check on the VersionInfo object returned by the server.
func waitUntilVersionUp(cli driver.Client, predicate func(driver.VersionInfo) error, allowNoLeaderResponse ...bool) error {
var noLeaderErr error
allowNoLead := len(allowNoLeaderResponse) > 0 && allowNoLeaderResponse[0]
ctx := context.Background()
op := func() error {
if version, err := cli.Version(ctx); allowNoLead && driver.IsNoLeader(err) {
noLeaderErr = err
return nil //return nil to make the retry below pass
} else if err != nil {
return maskAny(err)
} else if predicate != nil {
return predicate(version)
}
return nil
}
if err := retry.Retry(op, deploymentReadyTimeout); err != nil {
return maskAny(err)
}
// noLeadErr updated in op
if noLeaderErr != nil {
return maskAny(noLeaderErr)
}
return nil
}
// waitUntilSyncVersionUp waits until the syncmasters responds to
// an `/_api/version` request without an error. An additional Predicate
// can do a check on the VersionInfo object returned by the server.
func waitUntilSyncVersionUp(cli client.API, predicate func(client.VersionInfo) error) error {
ctx := context.Background()
op := func() error {
if version, err := cli.Version(ctx); err != nil {
return maskAny(err)
} else if predicate != nil {
return predicate(version)
}
return nil
}
if err := retry.Retry(op, deploymentReadyTimeout); err != nil {
return maskAny(err)
}
return nil
}
// waitUntilSyncMasterCountReached waits until the number of syncmasters
// is equal to the given number.
func waitUntilSyncMasterCountReached(cli client.API, expectedSyncMasters int) error {
ctx := context.Background()
op := func() error {
if list, err := cli.Master().Masters(ctx); err != nil {
return maskAny(err)
} else if len(list) != expectedSyncMasters {
return maskAny(fmt.Errorf("Expected %d syncmasters, got %d", expectedSyncMasters, len(list)))
}
return nil
}
if err := retry.Retry(op, deploymentReadyTimeout); err != nil {
return maskAny(err)
}
return nil
}
// waitUntilSyncWorkerCountReached waits until the number of syncworkers
// is equal to the given number.
func waitUntilSyncWorkerCountReached(cli client.API, expectedSyncWorkers int) error {
ctx := context.Background()
op := func() error {
if list, err := cli.Master().RegisteredWorkers(ctx); err != nil {
return maskAny(err)
} else if len(list) != expectedSyncWorkers {
return maskAny(fmt.Errorf("Expected %d syncworkers, got %d", expectedSyncWorkers, len(list)))
}
return nil
}
if err := retry.Retry(op, deploymentReadyTimeout); err != nil {
return maskAny(err)
}
return nil
}
// creates predicate to be used in waitUntilVersionUp
func createEqualVersionsPredicate(version driver.Version) func(driver.VersionInfo) error {
return func(infoFromServer driver.VersionInfo) error {
if version.CompareTo(infoFromServer.Version) != 0 {
return maskAny(fmt.Errorf("given version %v and version from server %v do not match", version, infoFromServer.Version))
}
return nil
}
}
// clusterHealthEqualsSpec returns nil when the given health matches
// with the given deployment spec.
func clusterHealthEqualsSpec(h driver.ClusterHealth, spec api.DeploymentSpec) error {
agents := 0
goodDBServers := 0
goodCoordinators := 0
for _, s := range h.Health {
if s.Role == driver.ServerRoleAgent {
agents++
} else if s.Status == driver.ServerStatusGood {
switch s.Role {
case driver.ServerRoleDBServer:
goodDBServers++
case driver.ServerRoleCoordinator:
goodCoordinators++
}
}
}
if spec.Agents.GetCount() == agents &&
spec.DBServers.GetCount() == goodDBServers &&
spec.Coordinators.GetCount() == goodCoordinators {
return nil
}
return fmt.Errorf("Expected %d,%d,%d got %d,%d,%d",
spec.Agents.GetCount(), spec.DBServers.GetCount(), spec.Coordinators.GetCount(),
agents, goodDBServers, goodCoordinators,
)
}
// updateDeployment updates a deployment
func updateDeployment(cli versioned.Interface, deploymentName, ns string, update func(*api.DeploymentSpec)) (*api.ArangoDeployment, error) {
for {
// Get current version
current, err := cli.Database().ArangoDeployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return nil, maskAny(err)
}
update(¤t.Spec)
current, err = cli.Database().ArangoDeployments(ns).Update(current)
if k8sutil.IsConflict(err) {
// Retry
} else if err != nil {
return nil, maskAny(err)
}
return current, nil
}
}
// removeDeployment removes a deployment
func removeDeployment(cli versioned.Interface, deploymentName, ns string) error {
if err := cli.Database().ArangoDeployments(ns).Delete(deploymentName, nil); err != nil && k8sutil.IsNotFound(err) {
return maskAny(err)
}
return nil
}
// deferedCleanupDeployment removes a deployment when shouldCleanDeployments return true.
// This function is intended to be used in a defer statement.
func deferedCleanupDeployment(cli versioned.Interface, deploymentName, ns string) error {
if !shouldCleanDeployments() {
return nil
}
if err := removeDeployment(cli, deploymentName, ns); err != nil {
return maskAny(err)
}
return nil
}
// removeSecret removes a secret
func removeSecret(cli kubernetes.Interface, secretName, ns string) error {
if err := cli.CoreV1().Secrets(ns).Delete(secretName, nil); err != nil && k8sutil.IsNotFound(err) {
return maskAny(err)
}
return nil
}
// check if a deployment is up and has reached a state where it is able to answer to /_api/version requests.
// Optionally the returned version can be checked against a user provided version
func waitUntilArangoDeploymentHealthy(deployment *api.ArangoDeployment, DBClient driver.Client, k8sClient kubernetes.Interface, versionString string) error {
// deployment checks
var checkVersionPredicate func(driver.VersionInfo) error
if len(versionString) > 0 {
checkVersionPredicate = createEqualVersionsPredicate(driver.Version(versionString))
}
switch mode := deployment.Spec.GetMode(); mode {
case api.DeploymentModeCluster:
// Wait for cluster to be completely ready
if err := waitUntilClusterHealth(DBClient, func(h driver.ClusterHealth) error {
return clusterHealthEqualsSpec(h, deployment.Spec)
}); err != nil {
return maskAny(fmt.Errorf("Cluster not running in expected health in time: %s", err))
}
case api.DeploymentModeSingle:
if err := waitUntilVersionUp(DBClient, checkVersionPredicate); err != nil {
return maskAny(fmt.Errorf("Single Server not running in time: %s", err))
}
case api.DeploymentModeActiveFailover:
if err := waitUntilVersionUp(DBClient, checkVersionPredicate); err != nil {
return maskAny(fmt.Errorf("Single Server not running in time: %s", err))
}
members := deployment.Status.Members
singles := members.Single
agents := members.Agents
if len(singles) != *deployment.Spec.Single.Count || len(agents) != *deployment.Spec.Agents.Count {
return maskAny(fmt.Errorf("Wrong number of servers: single %d - agents %d", len(singles), len(agents)))
}
ctx := context.Background()
//check agents
for _, agent := range agents {
dbclient, err := arangod.CreateArangodClient(ctx, k8sClient.CoreV1(), deployment, api.ServerGroupAgents, agent.ID)
if err != nil {
return maskAny(fmt.Errorf("Unable to create connection to: %s", agent.ID))
}
if err := waitUntilVersionUp(dbclient, checkVersionPredicate); err != nil {
return maskAny(fmt.Errorf("Version check failed for: %s", agent.ID))
}
}
//check single servers
{
var goodResults, noLeaderResults int
for _, single := range singles {
dbclient, err := arangod.CreateArangodClient(ctx, k8sClient.CoreV1(), deployment, api.ServerGroupSingle, single.ID)
if err != nil {
return maskAny(fmt.Errorf("Unable to create connection to: %s", single.ID))
}
if err := waitUntilVersionUp(dbclient, checkVersionPredicate, true); err == nil {
goodResults++
} else if driver.IsNoLeader(err) {
noLeaderResults++
} else {
return maskAny(fmt.Errorf("Version check failed for: %s", single.ID))
}
}
expectedGood := *deployment.Spec.Single.Count
expectedNoLeader := 0
if goodResults != expectedGood || noLeaderResults != expectedNoLeader {
return maskAny(fmt.Errorf("Wrong number of results: good %d (expected: %d)- noleader %d (expected %d)", goodResults, expectedGood, noLeaderResults, expectedNoLeader))
}
}
default:
return maskAny(fmt.Errorf("DeploymentMode %s is not supported", mode))
}
return nil
}
// testServerRole performs a synchronize endpoints and then requests the server role.
// On success, the role is compared with the given expected role.
// When the requests fail or the role is not equal to the expected role, an error is returned.
func testServerRole(ctx context.Context, client driver.Client, expectedRole driver.ServerRole) error {
op := func(ctx context.Context) error {
if err := client.SynchronizeEndpoints(ctx); err != nil {
return maskAny(err)
}
role, err := client.ServerRole(ctx)
if err != nil {
return maskAny(err)
}
if role != expectedRole {
return retry.Permanent(fmt.Errorf("Unexpected server role: Expected '%s', got '%s'", expectedRole, role))
}
return nil
}
if err := retry.RetryWithContext(ctx, op, time.Second*20); err != nil {
return maskAny(err)
}
return nil
}