Skip to content

Commit 5b1e59d

Browse files
committed
tags: initial implementation of tags
1 parent accb0dc commit 5b1e59d

File tree

5 files changed

+388
-22
lines changed

5 files changed

+388
-22
lines changed

Diff for: pkg/providers/v2/cloud.go

+46-3
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,19 @@ import (
2929
"github.com/aws/aws-sdk-go/aws/ec2metadata"
3030
"github.com/aws/aws-sdk-go/aws/session"
3131
"github.com/aws/aws-sdk-go/service/ec2"
32+
"gopkg.in/gcfg.v1"
3233

3334
cloudprovider "k8s.io/cloud-provider"
3435
)
3536

3637
func init() {
3738
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) {
38-
return newCloud()
39+
cfg, err := readAWSCloudConfig(config)
40+
if err != nil {
41+
return nil, fmt.Errorf("failed to read AWS cloud provider config file: %v", err)
42+
}
43+
44+
return newCloud(*cfg)
3945
})
4046
}
4147

@@ -53,6 +59,22 @@ type cloud struct {
5359
region string
5460
ec2 EC2
5561
metadata EC2Metadata
62+
tagging awsTagging
63+
}
64+
65+
// CloudConfig wraps the settings for the AWS cloud provider.
66+
type CloudConfig struct {
67+
Global struct {
68+
// KubernetesClusterID is the cluster id we'll use to identify our cluster resources
69+
KubernetesClusterID string
70+
}
71+
}
72+
73+
// EC2 is an interface defining only the methods we call from the AWS EC2 SDK.
74+
type EC2 interface {
75+
DescribeInstances(request *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
76+
77+
CreateTags(*ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error)
5678
}
5779

5880
// EC2Metadata is an abstraction over the AWS metadata service.
@@ -61,6 +83,20 @@ type EC2Metadata interface {
6183
GetMetadata(path string) (string, error)
6284
}
6385

86+
func readAWSCloudConfig(config io.Reader) (*CloudConfig, error) {
87+
if config == nil {
88+
return nil, fmt.Errorf("no AWS cloud provider config file given")
89+
}
90+
91+
var cfg CloudConfig
92+
err := gcfg.ReadInto(&cfg, config)
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
return &cfg, nil
98+
}
99+
64100
func getAvailabilityZone(metadata EC2Metadata) (string, error) {
65101
return metadata.GetMetadata("placement/availability-zone")
66102
}
@@ -82,7 +118,7 @@ func azToRegion(az string) (string, error) {
82118
}
83119

84120
// newCloud creates a new instance of AWSCloud.
85-
func newCloud() (cloudprovider.Interface, error) {
121+
func newCloud(cfg CloudConfig) (cloudprovider.Interface, error) {
86122
sess, err := session.NewSession(&aws.Config{})
87123
if err != nil {
88124
return nil, fmt.Errorf("unable to initialize AWS session: %v", err)
@@ -138,6 +174,13 @@ func newCloud() (cloudprovider.Interface, error) {
138174
ec2: ec2Service,
139175
}
140176

177+
if cfg.Global.KubernetesClusterID != "" {
178+
if err := awsCloud.tagging.init(cfg.Global.KubernetesClusterID); err != nil {
179+
return nil, err
180+
}
181+
}
182+
// TODO: initialize cloud tagging from tags
183+
141184
return awsCloud, nil
142185
}
143186

@@ -177,7 +220,7 @@ func (c *cloud) Routes() (cloudprovider.Routes, bool) {
177220

178221
// HasClusterID returns true if the cluster has a clusterID
179222
func (c *cloud) HasClusterID() bool {
180-
return false
223+
return len(c.tagging.clusterID()) > 0
181224
}
182225

183226
// InstancesV2 is an implementation for instances and should only be implemented by external cloud providers.

Diff for: pkg/providers/v2/instances.go

-5
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ func newInstances(az string, creds *credentials.Credentials) (cloudprovider.Inst
5959
}, nil
6060
}
6161

62-
// EC2 is an interface defining only the methods we call from the AWS EC2 SDK.
63-
type EC2 interface {
64-
DescribeInstances(request *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
65-
}
66-
6762
// instances is an implementation of cloudprovider.InstancesV2
6863
type instances struct {
6964
availabilityZone string

Diff for: pkg/providers/v2/mocks/mock_ec2.go

+54-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: pkg/providers/v2/tags.go

+212
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
// Package v2 is an out-of-tree only implementation of the AWS cloud provider.
15+
// It is not compatible with v1 and should only be used on new clusters.
16+
package v2
17+
18+
import (
19+
"fmt"
20+
"strings"
21+
"time"
22+
23+
"k8s.io/apimachinery/pkg/util/wait"
24+
"k8s.io/klog/v2"
25+
26+
"github.com/aws/aws-sdk-go/aws"
27+
"github.com/aws/aws-sdk-go/service/ec2"
28+
)
29+
30+
const (
31+
// TagNameKubernetesClusterPrefix is the tag name we use to differentiate multiple
32+
// logically independent clusters running in the same AZ.
33+
// we support two tags:
34+
// 1) kubernetes.io/cluster/=<clusterID>
35+
// 2) for shared resoueces between clusters, kubernetes.io/cluster/<clusterID>=""
36+
TagNameKubernetesClusterPrefix = "kubernetes.io/cluster/"
37+
38+
// ResourceLifecycleShared is the value we use when tagging resources to indicate
39+
// that the resource is shared between multiple clusters, and should not be destroyed
40+
// if the cluster is destroyed.
41+
ResourceLifecycleShared = "shared"
42+
43+
// createTag* is configuration of exponential backoff for CreateTag call. We
44+
// retry mainly because if we create an object, we cannot tag it until it is
45+
// "fully created" (eventual consistency). Starting with 1 second, doubling
46+
// it every step and taking 9 steps results in 255 second total waiting
47+
// time.
48+
createTagInitialDelay = 1 * time.Second
49+
createTagFactor = 2.0
50+
createTagSteps = 9
51+
)
52+
53+
type awsTagging struct {
54+
// ClusterID is our cluster identifier: we tag AWS resources with this value,
55+
// and thus we can run two independent clusters in the same VPC or subnets.
56+
// This gives us similar functionality to GCE projects.
57+
ClusterID string
58+
59+
// resourceShared is true if the resource is shared between multiple clusters
60+
resourceShared bool
61+
}
62+
63+
// Extracts the cluster id from the given tags, if they are present
64+
// If duplicate tags are found, returns an error
65+
func findClusterID(tags []*ec2.Tag) (string, error) {
66+
clusterID := ""
67+
68+
for _, tag := range tags {
69+
tagKey := aws.StringValue(tag.Key)
70+
if strings.HasPrefix(tagKey, TagNameKubernetesClusterPrefix) {
71+
id := aws.StringValue(tag.Value)
72+
if id == "" {
73+
id = strings.TrimPrefix(tagKey, TagNameKubernetesClusterPrefix)
74+
}
75+
76+
if clusterID != "" {
77+
return "", fmt.Errorf("Found multiple cluster tags with prefix %s (%q and %q)", TagNameKubernetesClusterPrefix, clusterID, id)
78+
}
79+
clusterID = id
80+
}
81+
}
82+
83+
return clusterID, nil
84+
}
85+
86+
func (t *awsTagging) init(clusterID string) error {
87+
t.ClusterID = clusterID
88+
89+
if clusterID != "" {
90+
klog.Infof("AWS cloud filtering on ClusterID: %v", clusterID)
91+
} else {
92+
return fmt.Errorf("AWS cloud failed to find ClusterID")
93+
}
94+
95+
return nil
96+
}
97+
98+
// Extracts a clusterID from the given tags, if one is present
99+
// If no clusterID is found, returns "", nil
100+
// If multiple (different) clusterIDs are found, returns an error
101+
func (t *awsTagging) initFromTags(tags []*ec2.Tag) error {
102+
clusterID, err := findClusterID(tags)
103+
if err != nil {
104+
return err
105+
}
106+
107+
if clusterID == "" {
108+
klog.Errorf("Tag %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesClusterPrefix)
109+
}
110+
111+
return t.init(clusterID)
112+
}
113+
114+
func (t *awsTagging) hasClusterTag(tags []*ec2.Tag) bool {
115+
// if the clusterID is not configured -- we consider all instances.
116+
if len(t.ClusterID) == 0 {
117+
return true
118+
}
119+
120+
for _, tag := range tags {
121+
tagKey := aws.StringValue(tag.Key)
122+
if (tagKey == TagNameKubernetesClusterPrefix) && (aws.StringValue(tag.Value) == t.ClusterID) {
123+
return true
124+
}
125+
126+
// for shared resources
127+
if tagKey == (TagNameKubernetesClusterPrefix + t.ClusterID) {
128+
return true
129+
}
130+
}
131+
132+
return false
133+
}
134+
135+
func (t *awsTagging) buildTags(additionalTags map[string]string, lifecycle string) map[string]string {
136+
tags := make(map[string]string)
137+
for k, v := range additionalTags {
138+
tags[k] = v
139+
}
140+
141+
// no clusterID is a sign of misconfigured cluster, but we can't be tagging the resources with empty
142+
// strings
143+
if len(t.ClusterID) == 0 {
144+
return tags
145+
}
146+
147+
// we support two tags:
148+
// 1) kubernetes.io/cluster/=<clusterID>
149+
// 2) for shared resoueces between clusters, kubernetes.io/cluster/<clusterID>=""
150+
if lifecycle == ResourceLifecycleShared {
151+
tags[TagNameKubernetesClusterPrefix+t.ClusterID] = ""
152+
} else {
153+
tags[TagNameKubernetesClusterPrefix] = t.ClusterID
154+
}
155+
156+
return tags
157+
}
158+
159+
// createTags calls EC2 CreateTags, but adds retry-on-failure logic
160+
// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency)
161+
// The error code varies though (depending on what we are tagging), so we simply retry on all errors
162+
func (t *awsTagging) createTags(ec2Client EC2, resourceID string, lifecycle string, additionalTags map[string]string) error {
163+
tags := t.buildTags(additionalTags, lifecycle)
164+
165+
if tags == nil || len(tags) == 0 {
166+
return nil
167+
}
168+
169+
var awsTags []*ec2.Tag
170+
for k, v := range tags {
171+
tag := &ec2.Tag{
172+
Key: aws.String(k),
173+
Value: aws.String(v),
174+
}
175+
awsTags = append(awsTags, tag)
176+
}
177+
178+
backoff := wait.Backoff{
179+
Duration: createTagInitialDelay,
180+
Factor: createTagFactor,
181+
Steps: createTagSteps,
182+
}
183+
184+
request := &ec2.CreateTagsInput{
185+
Resources: []*string{&resourceID},
186+
Tags: awsTags,
187+
}
188+
189+
var lastErr error
190+
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
191+
_, err := ec2Client.CreateTags(request)
192+
if err == nil {
193+
return true, nil
194+
}
195+
196+
// We could check that the error is retryable, but the error code changes based on what we are tagging
197+
// SecurityGroup: InvalidGroup.NotFound
198+
klog.V(2).Infof("Failed to create tags; will retry. Error was %q", err)
199+
lastErr = err
200+
return false, nil
201+
})
202+
if err == wait.ErrWaitTimeout {
203+
// return real CreateTags error instead of timeout
204+
err = lastErr
205+
}
206+
207+
return err
208+
}
209+
210+
func (t *awsTagging) clusterID() string {
211+
return t.ClusterID
212+
}

0 commit comments

Comments
 (0)