Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b4c79df

Browse files
committedDec 8, 2020
tags: initial implementation of tags
1 parent bf1debf commit b4c79df

File tree

6 files changed

+360
-22
lines changed

6 files changed

+360
-22
lines changed
 

‎go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,5 @@ require (
4646
k8s.io/kubernetes v0.0.0-20201023141757-9e8ad8ce9d8a
4747
k8s.io/legacy-cloud-providers v0.0.0
4848
k8s.io/utils v0.0.0-20201015054608-420da100c033
49+
sigs.k8s.io/yaml v1.2.0
4950
)

‎pkg/providers/v2/cloud.go

+49-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package v2
2121
import (
2222
"fmt"
2323
"io"
24+
"io/ioutil"
2425
"regexp"
2526

2627
"github.com/aws/aws-sdk-go/aws"
@@ -31,11 +32,18 @@ import (
3132
"github.com/aws/aws-sdk-go/service/ec2"
3233

3334
cloudprovider "k8s.io/cloud-provider"
35+
"k8s.io/cloud-provider-aws/pkg/apis/config/v1alpha1"
36+
"sigs.k8s.io/yaml"
3437
)
3538

3639
func init() {
3740
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) {
38-
return newCloud()
41+
cfg, err := readAWSCloudConfig(config)
42+
if err != nil {
43+
return nil, fmt.Errorf("failed to read AWS cloud provider config file: %v", err)
44+
}
45+
46+
return newCloud(*cfg)
3947
})
4048
}
4149

@@ -53,6 +61,14 @@ type cloud struct {
5361
region string
5462
ec2 EC2
5563
metadata EC2Metadata
64+
tagging awsTagging
65+
}
66+
67+
// EC2 is an interface defining only the methods we call from the AWS EC2 SDK.
68+
type EC2 interface {
69+
DescribeInstances(request *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
70+
71+
CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error)
5672
}
5773

5874
// EC2Metadata is an abstraction over the AWS metadata service.
@@ -61,6 +77,30 @@ type EC2Metadata interface {
6177
GetMetadata(path string) (string, error)
6278
}
6379

80+
func readAWSCloudConfig(config io.Reader) (*v1alpha1.AWSCloudConfig, error) {
81+
if config == nil {
82+
return nil, fmt.Errorf("no AWS cloud provider config file given")
83+
}
84+
85+
// read the config file
86+
data, err := ioutil.ReadAll(config)
87+
if err != nil {
88+
return nil, fmt.Errorf("unable to read cloud configuration from %q [%v]", config, err)
89+
}
90+
91+
var cfg v1alpha1.AWSCloudConfig
92+
err = yaml.Unmarshal(data, &cfg)
93+
if err != nil {
94+
// we got an error where the decode wasn't related to a missing type
95+
return nil, err
96+
}
97+
if cfg.Kind != "AWSCloudConfig" {
98+
return nil, fmt.Errorf("invalid cloud configuration object %q", cfg.Kind)
99+
}
100+
101+
return &cfg, nil
102+
}
103+
64104
func getAvailabilityZone(metadata EC2Metadata) (string, error) {
65105
return metadata.GetMetadata("placement/availability-zone")
66106
}
@@ -82,7 +122,7 @@ func azToRegion(az string) (string, error) {
82122
}
83123

84124
// newCloud creates a new instance of AWSCloud.
85-
func newCloud() (cloudprovider.Interface, error) {
125+
func newCloud(cfg v1alpha1.AWSCloudConfig) (cloudprovider.Interface, error) {
86126
sess, err := session.NewSession(&aws.Config{})
87127
if err != nil {
88128
return nil, fmt.Errorf("unable to initialize AWS session: %v", err)
@@ -138,6 +178,12 @@ func newCloud() (cloudprovider.Interface, error) {
138178
ec2: ec2Service,
139179
}
140180

181+
if cfg.Config.ClusterName != "" {
182+
if err := awsCloud.tagging.init(cfg.Config.ClusterName); err != nil {
183+
return nil, err
184+
}
185+
}
186+
141187
return awsCloud, nil
142188
}
143189

@@ -177,7 +223,7 @@ func (c *cloud) Routes() (cloudprovider.Routes, bool) {
177223

178224
// HasClusterID returns true if the cluster has a clusterID
179225
func (c *cloud) HasClusterID() bool {
180-
return false
226+
return len(c.tagging.clusterName()) > 0
181227
}
182228

183229
// InstancesV2 is an implementation for instances and should only be implemented by external cloud providers.

‎pkg/providers/v2/instances.go

-5
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ func newInstances(az string, creds *credentials.Credentials) (cloudprovider.Inst
5959
}, nil
6060
}
6161

62-
// EC2 is an interface defining only the methods we call from the AWS EC2 SDK.
63-
type EC2 interface {
64-
DescribeInstances(request *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
65-
}
66-
6762
// instances is an implementation of cloudprovider.InstancesV2
6863
type instances struct {
6964
availabilityZone string

‎pkg/providers/v2/mocks/mock_ec2.go

+54-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pkg/providers/v2/tags.go

+186
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
// Package v2 is an out-of-tree only implementation of the AWS cloud provider.
15+
// It is not compatible with v1 and should only be used on new clusters.
16+
package v2
17+
18+
import (
19+
"fmt"
20+
"strings"
21+
"time"
22+
23+
"k8s.io/apimachinery/pkg/util/wait"
24+
"k8s.io/klog/v2"
25+
26+
"github.com/aws/aws-sdk-go/aws"
27+
"github.com/aws/aws-sdk-go/service/ec2"
28+
)
29+
30+
const (
31+
// TagNameKubernetesClusterPrefix is the tag name we use to differentiate multiple
32+
// logically independent clusters running in the same AZ.
33+
// tag format: kubernetes.io/cluster/=<clusterName>
34+
TagNameKubernetesClusterPrefix = "kubernetes.io/cluster/"
35+
36+
// createTag* is configuration of exponential backoff for CreateTag call. We
37+
// retry mainly because if we create an object, we cannot tag it until it is
38+
// "fully created" (eventual consistency). Starting with 1 second, doubling
39+
// it every step and taking 9 steps results in 255 second total waiting
40+
// time.
41+
createTagInitialDelay = 1 * time.Second
42+
createTagFactor = 2.0
43+
createTagSteps = 9
44+
)
45+
46+
type awsTagging struct {
47+
// ClusterName is our cluster identifier: we tag AWS resources with this value,
48+
// and thus we can run two independent clusters in the same VPC or subnets.
49+
ClusterName string
50+
}
51+
52+
// Extracts the cluster name from the given tags, if they are present
53+
// If duplicate tags are found, returns an error
54+
func findClusterName(tags []*ec2.Tag) (string, error) {
55+
clusterName := ""
56+
57+
for _, tag := range tags {
58+
tagKey := aws.StringValue(tag.Key)
59+
if strings.HasPrefix(tagKey, TagNameKubernetesClusterPrefix) {
60+
name := aws.StringValue(tag.Value)
61+
if clusterName != "" {
62+
return "", fmt.Errorf("Found multiple cluster tags with prefix %s (%q and %q)", TagNameKubernetesClusterPrefix, clusterName, name)
63+
}
64+
clusterName = name
65+
}
66+
}
67+
68+
return clusterName, nil
69+
}
70+
71+
func (t *awsTagging) init(clusterName string) error {
72+
t.ClusterName = clusterName
73+
74+
if clusterName != "" {
75+
klog.Infof("AWS cloud filtering on ClusterName: %v", clusterName)
76+
} else {
77+
return fmt.Errorf("AWS cloud failed to find ClusterName")
78+
}
79+
80+
return nil
81+
}
82+
83+
// Extracts a cluster name from the given tags, if one is present
84+
// If no clusterName is found, returns "", nil
85+
// If multiple (different) clusterNames are found, returns an error
86+
func (t *awsTagging) initFromTags(tags []*ec2.Tag) error {
87+
clusterName, err := findClusterName(tags)
88+
if err != nil {
89+
return err
90+
}
91+
92+
if clusterName == "" {
93+
klog.Errorf("Tag %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesClusterPrefix)
94+
}
95+
96+
return t.init(clusterName)
97+
}
98+
99+
func (t *awsTagging) hasClusterTag(tags []*ec2.Tag) bool {
100+
// if the clusterName is not configured -- we consider all instances.
101+
if len(t.ClusterName) == 0 {
102+
return true
103+
}
104+
105+
for _, tag := range tags {
106+
tagKey := aws.StringValue(tag.Key)
107+
if (tagKey == TagNameKubernetesClusterPrefix) && (aws.StringValue(tag.Value) == t.ClusterName) {
108+
return true
109+
}
110+
}
111+
112+
return false
113+
}
114+
115+
func (t *awsTagging) buildTags(additionalTags map[string]string, lifecycle string) map[string]string {
116+
tags := make(map[string]string)
117+
for k, v := range additionalTags {
118+
tags[k] = v
119+
}
120+
121+
// no clusterName is a sign of misconfigured cluster, but we can't be tagging the resources with empty
122+
// strings
123+
if len(t.ClusterName) == 0 {
124+
return tags
125+
}
126+
127+
// tag format: kubernetes.io/cluster/=<clusterName>
128+
tags[TagNameKubernetesClusterPrefix] = t.ClusterName
129+
130+
return tags
131+
}
132+
133+
// createTags calls EC2 CreateTags, but adds retry-on-failure logic
134+
// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency)
135+
// The error code varies though (depending on what we are tagging), so we simply retry on all errors
136+
func (t *awsTagging) createTags(ec2Client EC2, resourceID string, lifecycle string, additionalTags map[string]string) error {
137+
tags := t.buildTags(additionalTags, lifecycle)
138+
139+
if tags == nil || len(tags) == 0 {
140+
return nil
141+
}
142+
143+
var awsTags []*ec2.Tag
144+
for k, v := range tags {
145+
tag := &ec2.Tag{
146+
Key: aws.String(k),
147+
Value: aws.String(v),
148+
}
149+
awsTags = append(awsTags, tag)
150+
}
151+
152+
backoff := wait.Backoff{
153+
Duration: createTagInitialDelay,
154+
Factor: createTagFactor,
155+
Steps: createTagSteps,
156+
}
157+
158+
request := &ec2.CreateTagsInput{
159+
Resources: []*string{&resourceID},
160+
Tags: awsTags,
161+
}
162+
163+
var lastErr error
164+
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
165+
_, err := ec2Client.CreateTags(request)
166+
if err == nil {
167+
return true, nil
168+
}
169+
170+
// We could check that the error is retryable, but the error code changes based on what we are tagging
171+
// SecurityGroup: InvalidGroup.NotFound
172+
klog.V(2).Infof("Failed to create tags; will retry. Error was %q", err)
173+
lastErr = err
174+
return false, nil
175+
})
176+
if err == wait.ErrWaitTimeout {
177+
// return real CreateTags error instead of timeout
178+
err = lastErr
179+
}
180+
181+
return err
182+
}
183+
184+
func (t *awsTagging) clusterName() string {
185+
return t.ClusterName
186+
}

‎pkg/providers/v2/tags_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
// Package v2 is an out-of-tree only implementation of the AWS cloud provider.
15+
// It is not compatible with v1 and should only be used on new clusters.
16+
package v2
17+
18+
import (
19+
"testing"
20+
21+
"github.com/aws/aws-sdk-go/aws"
22+
"github.com/aws/aws-sdk-go/service/ec2"
23+
)
24+
25+
func TestFindClusterName(t *testing.T) {
26+
grid := []struct {
27+
Tags map[string]string
28+
ExpectedClusterName string
29+
ExpectError bool
30+
}{
31+
{
32+
Tags: map[string]string{},
33+
},
34+
{
35+
Tags: map[string]string{
36+
TagNameKubernetesClusterPrefix: TestClusterID,
37+
},
38+
ExpectedClusterName: TestClusterID,
39+
},
40+
{
41+
Tags: map[string]string{
42+
TagNameKubernetesClusterPrefix: "",
43+
},
44+
ExpectedClusterName: "",
45+
},
46+
}
47+
for _, g := range grid {
48+
var ec2Tags []*ec2.Tag
49+
for k, v := range g.Tags {
50+
ec2Tags = append(ec2Tags, &ec2.Tag{Key: aws.String(k), Value: aws.String(v)})
51+
}
52+
clusterName, err := findClusterName(ec2Tags)
53+
if g.ExpectError {
54+
if err == nil {
55+
t.Errorf("expected error for tags %v", g.Tags)
56+
continue
57+
}
58+
} else {
59+
if err != nil {
60+
t.Errorf("unexpected error for tags %v: %v", g.Tags, err)
61+
continue
62+
}
63+
64+
if g.ExpectedClusterName != clusterName {
65+
t.Errorf("unexpected new clusterName for tags %v: %s vs %s", g.Tags, g.ExpectedClusterName, clusterName)
66+
continue
67+
}
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)
Please sign in to comment.