Skip to content

Commit 891e3e0

Browse files
committed
tags: initial implementation of tags
1 parent b390ec1 commit 891e3e0

File tree

5 files changed

+386
-22
lines changed

5 files changed

+386
-22
lines changed

Diff for: pkg/providers/v2/cloud.go

+45-3
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,19 @@ import (
2929
"github.com/aws/aws-sdk-go/aws/ec2metadata"
3030
"github.com/aws/aws-sdk-go/aws/session"
3131
"github.com/aws/aws-sdk-go/service/ec2"
32+
"gopkg.in/gcfg.v1"
3233

3334
cloudprovider "k8s.io/cloud-provider"
3435
)
3536

3637
func init() {
3738
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) {
38-
return newCloud()
39+
cfg, err := readAWSCloudConfig(config)
40+
if err != nil {
41+
return nil, fmt.Errorf("failed to read AWS cloud provider config file: %v", err)
42+
}
43+
44+
return newCloud(*cfg)
3945
})
4046
}
4147

@@ -53,6 +59,22 @@ type cloud struct {
5359
region string
5460
ec2 EC2
5561
metadata EC2Metadata
62+
tagging awsTagging
63+
}
64+
65+
// AWSCloudConfig wraps the settings for the AWS cloud provider.
66+
type AWSCloudConfig struct {
67+
Config struct {
68+
// ClusterName is the cluster name we'll use to identify our cluster resources
69+
ClusterName string
70+
}
71+
}
72+
73+
// EC2 is an interface defining only the methods we call from the AWS EC2 SDK.
74+
type EC2 interface {
75+
DescribeInstances(request *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
76+
77+
CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error)
5678
}
5779

5880
// EC2Metadata is an abstraction over the AWS metadata service.
@@ -61,6 +83,20 @@ type EC2Metadata interface {
6183
GetMetadata(path string) (string, error)
6284
}
6385

86+
func readAWSCloudConfig(config io.Reader) (*AWSCloudConfig, error) {
87+
if config == nil {
88+
return nil, fmt.Errorf("no AWS cloud provider config file given")
89+
}
90+
91+
var cfg AWSCloudConfig
92+
err := gcfg.ReadInto(&cfg, config)
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
return &cfg, nil
98+
}
99+
64100
func getAvailabilityZone(metadata EC2Metadata) (string, error) {
65101
return metadata.GetMetadata("placement/availability-zone")
66102
}
@@ -82,7 +118,7 @@ func azToRegion(az string) (string, error) {
82118
}
83119

84120
// newCloud creates a new instance of AWSCloud.
85-
func newCloud() (cloudprovider.Interface, error) {
121+
func newCloud(cfg AWSCloudConfig) (cloudprovider.Interface, error) {
86122
sess, err := session.NewSession(&aws.Config{})
87123
if err != nil {
88124
return nil, fmt.Errorf("unable to initialize AWS session: %v", err)
@@ -138,6 +174,12 @@ func newCloud() (cloudprovider.Interface, error) {
138174
ec2: ec2Service,
139175
}
140176

177+
if cfg.Config.ClusterName != "" {
178+
if err := awsCloud.tagging.init(cfg.Config.ClusterName); err != nil {
179+
return nil, err
180+
}
181+
}
182+
141183
return awsCloud, nil
142184
}
143185

@@ -177,7 +219,7 @@ func (c *cloud) Routes() (cloudprovider.Routes, bool) {
177219

178220
// HasClusterID returns true if the cluster has a clusterID
179221
func (c *cloud) HasClusterID() bool {
180-
return false
222+
return len(c.tagging.clusterName()) > 0
181223
}
182224

183225
// InstancesV2 is an implementation for instances and should only be implemented by external cloud providers.

Diff for: pkg/providers/v2/instances.go

-5
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ func newInstances(az string, creds *credentials.Credentials) (cloudprovider.Inst
5959
}, nil
6060
}
6161

62-
// EC2 is an interface defining only the methods we call from the AWS EC2 SDK.
63-
type EC2 interface {
64-
DescribeInstances(request *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
65-
}
66-
6762
// instances is an implementation of cloudprovider.InstancesV2
6863
type instances struct {
6964
availabilityZone string

Diff for: pkg/providers/v2/mocks/mock_ec2.go

+54-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: pkg/providers/v2/tags.go

+211
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
// Package v2 is an out-of-tree only implementation of the AWS cloud provider.
15+
// It is not compatible with v1 and should only be used on new clusters.
16+
package v2
17+
18+
import (
19+
"fmt"
20+
"strings"
21+
"time"
22+
23+
"k8s.io/apimachinery/pkg/util/wait"
24+
"k8s.io/klog/v2"
25+
26+
"github.com/aws/aws-sdk-go/aws"
27+
"github.com/aws/aws-sdk-go/service/ec2"
28+
)
29+
30+
const (
31+
// TagNameKubernetesClusterPrefix is the tag name we use to differentiate multiple
32+
// logically independent clusters running in the same AZ.
33+
// we support two tags:
34+
// 1) kubernetes.io/cluster/=<clusterName>
35+
// 2) for shared resoueces between clusters, kubernetes.io/cluster/<clusterName>=""
36+
TagNameKubernetesClusterPrefix = "kubernetes.io/cluster/"
37+
38+
// ResourceLifecycleShared is the value we use when tagging resources to indicate
39+
// that the resource is shared between multiple clusters, and should not be destroyed
40+
// if the cluster is destroyed.
41+
ResourceLifecycleShared = "shared"
42+
43+
// createTag* is configuration of exponential backoff for CreateTag call. We
44+
// retry mainly because if we create an object, we cannot tag it until it is
45+
// "fully created" (eventual consistency). Starting with 1 second, doubling
46+
// it every step and taking 9 steps results in 255 second total waiting
47+
// time.
48+
createTagInitialDelay = 1 * time.Second
49+
createTagFactor = 2.0
50+
createTagSteps = 9
51+
)
52+
53+
type awsTagging struct {
54+
// ClusterName is our cluster identifier: we tag AWS resources with this value,
55+
// and thus we can run two independent clusters in the same VPC or subnets.
56+
ClusterName string
57+
58+
// resourceShared is true if the resource is shared between multiple clusters
59+
resourceShared bool
60+
}
61+
62+
// Extracts the cluster name from the given tags, if they are present
63+
// If duplicate tags are found, returns an error
64+
func findClusterName(tags []*ec2.Tag) (string, error) {
65+
clusterName := ""
66+
67+
for _, tag := range tags {
68+
tagKey := aws.StringValue(tag.Key)
69+
if strings.HasPrefix(tagKey, TagNameKubernetesClusterPrefix) {
70+
name := aws.StringValue(tag.Value)
71+
if name == "" {
72+
name = strings.TrimPrefix(tagKey, TagNameKubernetesClusterPrefix)
73+
}
74+
75+
if clusterName != "" {
76+
return "", fmt.Errorf("Found multiple cluster tags with prefix %s (%q and %q)", TagNameKubernetesClusterPrefix, clusterName, name)
77+
}
78+
clusterName = name
79+
}
80+
}
81+
82+
return clusterName, nil
83+
}
84+
85+
func (t *awsTagging) init(clusterName string) error {
86+
t.ClusterName = clusterName
87+
88+
if clusterName != "" {
89+
klog.Infof("AWS cloud filtering on ClusterName: %v", clusterName)
90+
} else {
91+
return fmt.Errorf("AWS cloud failed to find ClusterName")
92+
}
93+
94+
return nil
95+
}
96+
97+
// Extracts a cluster name from the given tags, if one is present
98+
// If no clusterName is found, returns "", nil
99+
// If multiple (different) clusterNames are found, returns an error
100+
func (t *awsTagging) initFromTags(tags []*ec2.Tag) error {
101+
clusterName, err := findClusterName(tags)
102+
if err != nil {
103+
return err
104+
}
105+
106+
if clusterName == "" {
107+
klog.Errorf("Tag %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesClusterPrefix)
108+
}
109+
110+
return t.init(clusterName)
111+
}
112+
113+
func (t *awsTagging) hasClusterTag(tags []*ec2.Tag) bool {
114+
// if the clusterName is not configured -- we consider all instances.
115+
if len(t.ClusterName) == 0 {
116+
return true
117+
}
118+
119+
for _, tag := range tags {
120+
tagKey := aws.StringValue(tag.Key)
121+
if (tagKey == TagNameKubernetesClusterPrefix) && (aws.StringValue(tag.Value) == t.ClusterName) {
122+
return true
123+
}
124+
125+
// for shared resources
126+
if tagKey == (TagNameKubernetesClusterPrefix + t.ClusterName) {
127+
return true
128+
}
129+
}
130+
131+
return false
132+
}
133+
134+
func (t *awsTagging) buildTags(additionalTags map[string]string, lifecycle string) map[string]string {
135+
tags := make(map[string]string)
136+
for k, v := range additionalTags {
137+
tags[k] = v
138+
}
139+
140+
// no clusterName is a sign of misconfigured cluster, but we can't be tagging the resources with empty
141+
// strings
142+
if len(t.ClusterName) == 0 {
143+
return tags
144+
}
145+
146+
// we support two tags:
147+
// 1) kubernetes.io/cluster/=<clusterName>
148+
// 2) for shared resoueces between clusters, kubernetes.io/cluster/<clusterName>=""
149+
if lifecycle == ResourceLifecycleShared {
150+
tags[TagNameKubernetesClusterPrefix+t.ClusterName] = ""
151+
} else {
152+
tags[TagNameKubernetesClusterPrefix] = t.ClusterName
153+
}
154+
155+
return tags
156+
}
157+
158+
// createTags calls EC2 CreateTags, but adds retry-on-failure logic
159+
// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency)
160+
// The error code varies though (depending on what we are tagging), so we simply retry on all errors
161+
func (t *awsTagging) createTags(ec2Client EC2, resourceID string, lifecycle string, additionalTags map[string]string) error {
162+
tags := t.buildTags(additionalTags, lifecycle)
163+
164+
if tags == nil || len(tags) == 0 {
165+
return nil
166+
}
167+
168+
var awsTags []*ec2.Tag
169+
for k, v := range tags {
170+
tag := &ec2.Tag{
171+
Key: aws.String(k),
172+
Value: aws.String(v),
173+
}
174+
awsTags = append(awsTags, tag)
175+
}
176+
177+
backoff := wait.Backoff{
178+
Duration: createTagInitialDelay,
179+
Factor: createTagFactor,
180+
Steps: createTagSteps,
181+
}
182+
183+
request := &ec2.CreateTagsInput{
184+
Resources: []*string{&resourceID},
185+
Tags: awsTags,
186+
}
187+
188+
var lastErr error
189+
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
190+
_, err := ec2Client.CreateTags(request)
191+
if err == nil {
192+
return true, nil
193+
}
194+
195+
// We could check that the error is retryable, but the error code changes based on what we are tagging
196+
// SecurityGroup: InvalidGroup.NotFound
197+
klog.V(2).Infof("Failed to create tags; will retry. Error was %q", err)
198+
lastErr = err
199+
return false, nil
200+
})
201+
if err == wait.ErrWaitTimeout {
202+
// return real CreateTags error instead of timeout
203+
err = lastErr
204+
}
205+
206+
return err
207+
}
208+
209+
func (t *awsTagging) clusterName() string {
210+
return t.ClusterName
211+
}

0 commit comments

Comments
 (0)