Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a14a944

Browse files
committedSep 29, 2020
instances: implementation
1 parent 211d51d commit a14a944

File tree

4 files changed

+581
-4
lines changed

4 files changed

+581
-4
lines changed
 

‎go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ replace (
2828

2929
require (
3030
github.com/aws/aws-sdk-go v1.28.2
31+
github.com/google/uuid v1.1.1
3132
github.com/spf13/cobra v1.0.0
3233
github.com/spf13/pflag v1.0.5
3334
github.com/stretchr/testify v1.4.0
35+
k8s.io/api v0.0.0
3436
k8s.io/apimachinery v0.0.0
3537
k8s.io/apiserver v0.0.0
3638
k8s.io/cloud-provider v0.0.0

‎pkg/providers/v2/cloud.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ var _ cloudprovider.Interface = (*cloud)(nil)
4646

4747
// cloud is the AWS v2 implementation of the cloud provider interface
4848
type cloud struct {
49-
creds *credentials.Credentials
50-
region string
49+
creds *credentials.Credentials
50+
instances cloudprovider.InstancesV2
51+
region string
5152
}
5253

5354
// EC2Metadata is an abstraction over the AWS metadata service.
@@ -107,9 +108,15 @@ func newCloud() (cloudprovider.Interface, error) {
107108
return nil, err
108109
}
109110

111+
instances, err := newInstances(region, creds)
112+
if err != nil {
113+
return nil, err
114+
}
115+
110116
return &cloud{
111-
creds: creds,
112-
region: region,
117+
creds: creds,
118+
instances: instances,
119+
region: region,
113120
}, nil
114121
}
115122

‎pkg/providers/v2/instances.go

+332
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
// Package v2 is an out-of-tree only implementation of the AWS cloud provider.
15+
// It is not compatible with v1 and should only be used on new clusters.
16+
package v2
17+
18+
import (
19+
"context"
20+
"errors"
21+
"fmt"
22+
"net"
23+
"strings"
24+
25+
"github.com/aws/aws-sdk-go/aws"
26+
"github.com/aws/aws-sdk-go/aws/credentials"
27+
"github.com/aws/aws-sdk-go/aws/session"
28+
"github.com/aws/aws-sdk-go/service/ec2"
29+
30+
v1 "k8s.io/api/core/v1"
31+
cloudprovider "k8s.io/cloud-provider"
32+
)
33+
34+
// newInstances returns an implementation of cloudprovider.InstancesV2
35+
func newInstances(region string, creds *credentials.Credentials) (cloudprovider.InstancesV2, error) {
36+
awsConfig := &aws.Config{
37+
Region: aws.String(region),
38+
Credentials: creds,
39+
}
40+
awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true)
41+
42+
sess, err := session.NewSession(awsConfig)
43+
if err != nil {
44+
return nil, err
45+
}
46+
ec2Service := ec2.New(sess)
47+
48+
return &instances{
49+
ec2: ec2Service,
50+
}, nil
51+
}
52+
53+
// EC2 is an interface defining only the methods we call from the AWS EC2 SDK.
54+
type EC2 interface {
55+
DescribeInstances(request *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
56+
}
57+
58+
// instances is an implementation of cloudprovider.InstancesV2
59+
type instances struct {
60+
ec2 EC2
61+
}
62+
63+
// InstanceExists indicates whether a given node exists according to the cloud provider
64+
func (i *instances) InstanceExists(ctx context.Context, node *v1.Node) (bool, error) {
65+
var err error
66+
if node.Spec.ProviderID == "" {
67+
_, err = i.getInstanceByPrivateDNSName(ctx, node.Name)
68+
if err == cloudprovider.InstanceNotFound {
69+
return false, nil
70+
}
71+
72+
if err != nil {
73+
return false, err
74+
}
75+
} else {
76+
_, err = i.getInstanceByProviderID(ctx, node.Spec.ProviderID)
77+
if err == cloudprovider.InstanceNotFound {
78+
return false, nil
79+
}
80+
81+
if err != nil {
82+
return false, err
83+
}
84+
}
85+
86+
return true, nil
87+
}
88+
89+
// InstanceShutdown returns true if the instance is shutdown according to the cloud provider.
90+
func (i *instances) InstanceShutdown(ctx context.Context, node *v1.Node) (bool, error) {
91+
var err error
92+
var ret bool
93+
if node.Spec.ProviderID == "" {
94+
ret, err = i.instanceShutdownByPrivateDNSName(ctx, node.Name)
95+
} else {
96+
ret, err = i.instanceShutdownByProviderID(ctx, node.Spec.ProviderID)
97+
}
98+
99+
return ret, err
100+
}
101+
102+
// InstanceMetadata returns the instance's metadata.
103+
func (i *instances) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloudprovider.InstanceMetadata, error) {
104+
var err error
105+
var ec2Instance *ec2.Instance
106+
if node.Spec.ProviderID == "" {
107+
// TODO: support node name policy other than private DNS names
108+
ec2Instance, err = i.getInstanceByPrivateDNSName(ctx, node.Name)
109+
if err != nil {
110+
return nil, err
111+
}
112+
} else {
113+
ec2Instance, err = i.getInstanceByProviderID(ctx, node.Spec.ProviderID)
114+
if err != nil {
115+
return nil, err
116+
}
117+
}
118+
119+
nodeAddresses, err := nodeAddressesForInstance(ec2Instance)
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
providerID, err := getInstanceProviderID(ec2Instance)
125+
if err != nil {
126+
return nil, err
127+
}
128+
129+
metadata := &cloudprovider.InstanceMetadata{
130+
ProviderID: providerID,
131+
InstanceType: aws.StringValue(ec2Instance.InstanceType),
132+
NodeAddresses: nodeAddresses,
133+
}
134+
135+
return metadata, nil
136+
}
137+
138+
// InstanceExistsByProviderID returns the instance if the instance with the given provider id still exists.
139+
// If false an error will be returned, the instance will be immediately deleted by the cloud controller manager.
140+
func (i *instances) getInstanceByProviderID(ctx context.Context, providerID string) (*ec2.Instance, error) {
141+
instanceID, err := parseInstanceIDFromProviderID(providerID)
142+
if err != nil {
143+
return nil, err
144+
}
145+
146+
request := &ec2.DescribeInstancesInput{
147+
InstanceIds: []*string{aws.String(instanceID)},
148+
Filters: []*ec2.Filter{
149+
newEc2Filter("instance-state-name", aliveFilter...),
150+
},
151+
}
152+
153+
instances := []*ec2.Instance{}
154+
var nextToken *string
155+
for {
156+
response, err := i.ec2.DescribeInstances(request)
157+
if err != nil {
158+
return nil, fmt.Errorf("error describing ec2 instances: %v", err)
159+
}
160+
161+
for _, reservation := range response.Reservations {
162+
instances = append(instances, reservation.Instances...)
163+
}
164+
165+
nextToken = response.NextToken
166+
if aws.StringValue(nextToken) == "" {
167+
break
168+
}
169+
request.NextToken = nextToken
170+
}
171+
172+
if len(instances) == 0 {
173+
return nil, cloudprovider.InstanceNotFound
174+
}
175+
176+
if len(instances) > 1 {
177+
return nil, fmt.Errorf("multiple instances found with provider ID: %s", instanceID)
178+
}
179+
180+
return instances[0], nil
181+
}
182+
183+
// InstanceExistsByPrivateDNSName returns the instance if the instance with the given provider id still exists.
184+
// If false an error will be returned, the instance will be immediately deleted by the cloud controller manager.
185+
func (i *instances) getInstanceByPrivateDNSName(ctx context.Context, nodeName string) (*ec2.Instance, error) {
186+
request := &ec2.DescribeInstancesInput{
187+
Filters: []*ec2.Filter{
188+
newEc2Filter("private-dns-name", nodeName),
189+
newEc2Filter("instance-state-name", aliveFilter...),
190+
},
191+
}
192+
193+
instances := []*ec2.Instance{}
194+
var nextToken *string
195+
for {
196+
response, err := i.ec2.DescribeInstances(request)
197+
if err != nil {
198+
return nil, fmt.Errorf("error describing ec2 instances: %v", err)
199+
}
200+
201+
for _, reservation := range response.Reservations {
202+
instances = append(instances, reservation.Instances...)
203+
}
204+
205+
nextToken = response.NextToken
206+
if aws.StringValue(nextToken) == "" {
207+
break
208+
}
209+
request.NextToken = nextToken
210+
}
211+
212+
if len(instances) == 0 {
213+
return nil, cloudprovider.InstanceNotFound
214+
}
215+
216+
if len(instances) > 1 {
217+
return nil, fmt.Errorf("multiple instances found with private DNS name: %s", nodeName)
218+
}
219+
220+
return instances[0], nil
221+
}
222+
223+
// instanceShutdownByProviderID returns true if the instance is shutdown according to the cloud provider.
224+
func (i *instances) instanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
225+
ec2Instance, err := i.getInstanceByProviderID(ctx, providerID)
226+
if err != nil {
227+
return false, err
228+
}
229+
230+
if ec2Instance.State != nil {
231+
state := aws.StringValue(ec2Instance.State.Name)
232+
if state == ec2.InstanceStateNameStopping || state == ec2.InstanceStateNameStopped {
233+
return true, nil
234+
}
235+
}
236+
237+
return false, nil
238+
}
239+
240+
// instanceShutdownByPrivateDNSName returns true if the instance is shutdown according to the cloud provider.
241+
func (i *instances) instanceShutdownByPrivateDNSName(ctx context.Context, nodeName string) (bool, error) {
242+
ec2Instance, err := i.getInstanceByPrivateDNSName(ctx, nodeName)
243+
if err != nil {
244+
return false, err
245+
}
246+
247+
if ec2Instance.State != nil {
248+
state := aws.StringValue(ec2Instance.State.Name)
249+
if state == ec2.InstanceStateNameStopping || state == ec2.InstanceStateNameStopped {
250+
return true, nil
251+
}
252+
}
253+
254+
return false, nil
255+
}
256+
257+
// nodeAddresses for Instance returns a list of v1.NodeAddress for the give instance.
258+
// TODO: should we support ExternalIP by default?
259+
func nodeAddressesForInstance(instance *ec2.Instance) ([]v1.NodeAddress, error) {
260+
if instance == nil {
261+
return nil, errors.New("provided instances is nil")
262+
}
263+
264+
addresses := []v1.NodeAddress{}
265+
for _, networkInterface := range instance.NetworkInterfaces {
266+
// skip network interfaces that are not currently in use
267+
if aws.StringValue(networkInterface.Status) != ec2.NetworkInterfaceStatusInUse {
268+
continue
269+
}
270+
271+
for _, privateIP := range networkInterface.PrivateIpAddresses {
272+
if ipAddress := aws.StringValue(privateIP.PrivateIpAddress); ipAddress != "" {
273+
ip := net.ParseIP(ipAddress)
274+
if ip == nil {
275+
return nil, fmt.Errorf("invalid IP address %q from instance %q", ipAddress, aws.StringValue(instance.InstanceId))
276+
}
277+
278+
addresses = append(addresses, v1.NodeAddress{
279+
Type: v1.NodeInternalIP,
280+
Address: ip.String(),
281+
})
282+
}
283+
}
284+
}
285+
286+
return addresses, nil
287+
}
288+
289+
// getInstanceProviderID returns the provider ID of an instance which is ultimately set in the node.Spec.ProviderID field.
290+
// The well-known format for a node's providerID is:
291+
// * aws://<availability-zone>/<instance-id>
292+
func getInstanceProviderID(instance *ec2.Instance) (string, error) {
293+
if aws.StringValue(instance.Placement.AvailabilityZone) == "" {
294+
return "", errors.New("instance availability zone was not set")
295+
}
296+
297+
if aws.StringValue(instance.InstanceId) == "" {
298+
return "", errors.New("instance ID was not set")
299+
}
300+
301+
return "aws://" + aws.StringValue(instance.Placement.AvailabilityZone) + "/" + aws.StringValue(instance.InstanceId), nil
302+
}
303+
304+
// parseInstanceIDFromProviderID parses the node's instance ID based on the well-known provider ID format:
305+
// * aws://<availability-zone>/<instance-id>
306+
// This function always assumes a valid providerID format was provided.
307+
func parseInstanceIDFromProviderID(providerID string) (string, error) {
308+
// trim the provider name prefix 'aws://', renaming providerID should contain metadata in the format:
309+
// <availability-zone>/<instance-id>
310+
metadata := strings.Split(strings.TrimPrefix(providerID, "aws://"), "/")
311+
return metadata[1], nil
312+
}
313+
314+
var aliveFilter = []string{
315+
ec2.InstanceStateNamePending,
316+
ec2.InstanceStateNameRunning,
317+
ec2.InstanceStateNameShuttingDown,
318+
ec2.InstanceStateNameStopping,
319+
ec2.InstanceStateNameStopped,
320+
}
321+
322+
func newEc2Filter(name string, values ...string) *ec2.Filter {
323+
filter := &ec2.Filter{
324+
Name: aws.String(name),
325+
}
326+
327+
for _, value := range values {
328+
filter.Values = append(filter.Values, aws.String(value))
329+
}
330+
331+
return filter
332+
}

‎pkg/providers/v2/instances_test.go

+236
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
// Package v2 is an out-of-tree only implementation of the AWS cloud provider.
15+
// It is not compatible with v1 and should only be used on new clusters.
16+
package v2
17+
18+
import (
19+
"context"
20+
"fmt"
21+
"testing"
22+
23+
"github.com/aws/aws-sdk-go/aws"
24+
"github.com/aws/aws-sdk-go/service/ec2"
25+
"github.com/google/uuid"
26+
"github.com/stretchr/testify/assert"
27+
28+
v1 "k8s.io/api/core/v1"
29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
cloudprovider "k8s.io/cloud-provider"
31+
)
32+
33+
const TestClusterID = "clusterid.test"
34+
35+
type fakeEC2 struct {
36+
instances []*ec2.Instance
37+
}
38+
39+
func (f fakeEC2) DescribeInstances(request *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
40+
instance := &ec2.Instance{}
41+
stateName := "running"
42+
instance.State = &ec2.InstanceState{
43+
Name: &stateName,
44+
}
45+
instance.Placement = &ec2.Placement{
46+
AvailabilityZone: aws.String("us-west-1a"),
47+
}
48+
instance.NetworkInterfaces = []*ec2.InstanceNetworkInterface{
49+
{
50+
Status: aws.String(ec2.NetworkInterfaceStatusInUse),
51+
PrivateIpAddresses: []*ec2.InstancePrivateIpAddress{
52+
{
53+
PrivateIpAddress: aws.String("192.168.0.2"),
54+
},
55+
},
56+
},
57+
}
58+
instanceID := "i-1"
59+
instance.InstanceId = &instanceID
60+
61+
return &ec2.DescribeInstancesOutput{
62+
Reservations: []*ec2.Reservation{
63+
{
64+
Instances: []*ec2.Instance{
65+
instance,
66+
},
67+
},
68+
},
69+
}, nil
70+
}
71+
72+
func newFakeInstances() (cloudprovider.InstancesV2, error) {
73+
fakeEC2 := fakeEC2{}
74+
return &instances{
75+
ec2: fakeEC2,
76+
}, nil
77+
}
78+
79+
func makeInstance(num int, privateIP, publicIP, privateDNSName, publicDNSName string, setNetInterface bool) ec2.Instance {
80+
instance := ec2.Instance{
81+
InstanceId: aws.String(fmt.Sprintf("i-%d", num)),
82+
PrivateDnsName: aws.String(privateDNSName),
83+
PrivateIpAddress: aws.String(privateIP),
84+
PublicDnsName: aws.String(publicDNSName),
85+
PublicIpAddress: aws.String(publicIP),
86+
InstanceType: aws.String("c3.large"),
87+
Placement: &ec2.Placement{AvailabilityZone: aws.String("us-west-1a")},
88+
State: &ec2.InstanceState{
89+
Name: aws.String("running"),
90+
},
91+
}
92+
if setNetInterface == true {
93+
instance.NetworkInterfaces = []*ec2.InstanceNetworkInterface{
94+
{
95+
Status: aws.String(ec2.NetworkInterfaceStatusInUse),
96+
PrivateIpAddresses: []*ec2.InstancePrivateIpAddress{
97+
{
98+
PrivateIpAddress: aws.String(privateIP),
99+
},
100+
},
101+
},
102+
}
103+
}
104+
return instance
105+
}
106+
107+
func makeNode(nodeName string, offset int) *v1.Node {
108+
id := uuid.New()
109+
instanceID := fmt.Sprintf("i-%v-%d", id.String(), offset)
110+
instance := &ec2.Instance{}
111+
instance.InstanceId = aws.String(instanceID)
112+
instance.Placement = &ec2.Placement{
113+
AvailabilityZone: aws.String("us-west-1a"),
114+
}
115+
instance.PrivateDnsName = aws.String(fmt.Sprintf("ip-172-20-0-%d.ec2.internal", 101+offset))
116+
instance.PrivateIpAddress = aws.String(fmt.Sprintf("192.168.0.%d", 1+offset))
117+
118+
var tag ec2.Tag
119+
tag.Key = aws.String("KubernetesCluster")
120+
tag.Value = aws.String(TestClusterID)
121+
instance.Tags = []*ec2.Tag{&tag}
122+
123+
providerID := "aws://us-west-1a/" + instanceID
124+
return &v1.Node{
125+
ObjectMeta: metav1.ObjectMeta{
126+
Name: nodeName,
127+
},
128+
Spec: v1.NodeSpec{
129+
ProviderID: providerID,
130+
},
131+
}
132+
}
133+
134+
func TestGetInstanceProviderID(t *testing.T) {
135+
instance0 := makeInstance(0, "192.168.0.1", "1.2.3.4", "instance-same.ec2.internal", "instance-same.ec2.external", true)
136+
instance1 := makeInstance(1, "192.168.0.2", "", "instance-same.ec2.internal", "", false)
137+
instance2 := makeInstance(2, "192.168.0.1", "1.2.3.4", "instance-other.ec2.internal", "", false)
138+
139+
testCases := []struct {
140+
instance ec2.Instance
141+
providerID string
142+
}{
143+
{instance0, "aws://us-west-1a/i-0"},
144+
{instance1, "aws://us-west-1a/i-1"},
145+
{instance2, "aws://us-west-1a/i-2"},
146+
}
147+
148+
for _, testCase := range testCases {
149+
providerID, err := getInstanceProviderID(&testCase.instance)
150+
assert.NoError(t, err)
151+
assert.Equal(t, testCase.providerID, providerID)
152+
}
153+
}
154+
155+
func TestInstanceExists(t *testing.T) {
156+
instances, err := newFakeInstances()
157+
if err != nil {
158+
t.Errorf("Error creating a new instance: %v", err)
159+
}
160+
161+
for i := 0; i < 5; i++ {
162+
nodeName := fmt.Sprintf("ip-172-21-32-%d.ec2.internal", i)
163+
164+
node := makeNode(nodeName, 2)
165+
exists, err := instances.InstanceExists(context.TODO(), node)
166+
167+
if err != nil {
168+
t.Errorf("InstanceExists failed with node %v: %v", nodeName, err)
169+
}
170+
if !exists {
171+
t.Errorf("instance not found with node %v", nodeName)
172+
}
173+
}
174+
}
175+
176+
func TestInstanceShutdown(t *testing.T) {
177+
instances, err := newFakeInstances()
178+
if err != nil {
179+
t.Errorf("Error creating a new instance: %v", err)
180+
}
181+
182+
for i := 0; i < 5; i++ {
183+
nodeName := fmt.Sprintf("ip-172-21-32-%d.ec2.internal", i)
184+
185+
node := makeNode(nodeName, 2)
186+
shutdown, err := instances.InstanceShutdown(context.TODO(), node)
187+
188+
if err != nil {
189+
t.Errorf("InstanceShutdown failed with node %v: %v", nodeName, err)
190+
}
191+
if shutdown {
192+
t.Errorf("instance is shutdown with node %v", nodeName)
193+
}
194+
}
195+
}
196+
197+
func TestInstanceMetadata(t *testing.T) {
198+
instances, err := newFakeInstances()
199+
if err != nil {
200+
t.Errorf("Error creating a new instance: %v", err)
201+
}
202+
203+
for i := 0; i < 5; i++ {
204+
nodeName := fmt.Sprintf("ip-172-21-32-%d.ec2.internal", i)
205+
206+
node := makeNode(nodeName, 2)
207+
metadata, err := instances.InstanceMetadata(context.TODO(), node)
208+
209+
if err != nil {
210+
t.Errorf("InstanceMetadata failed with node %v: %v", nodeName, err)
211+
}
212+
if metadata == nil {
213+
t.Errorf("instance's metadata is nil with node %v", nodeName)
214+
}
215+
}
216+
}
217+
218+
func TestParseInstanceIDFromProviderID(t *testing.T) {
219+
testCases := []struct {
220+
providerID string
221+
instanceID string
222+
}{
223+
{"aws://eu-central-1a/i-1238asjd8asdm123", "i-1238asjd8asdm123"},
224+
{"aws://us-west-2a/i-112as321asjd8asdm23", "i-112as321asjd8asdm23"},
225+
{"aws://us-iso-east-1a/i-123", "i-123"},
226+
{"aws://us-isob-east-1a/i-abcdef", "i-abcdef"},
227+
{"aws://us-isob-east-1a/i-abCDef", "i-abCDef"},
228+
{"aws://us-east-1a/8asdm23", "8asdm23"},
229+
}
230+
231+
for _, testCase := range testCases {
232+
ret, err := parseInstanceIDFromProviderID(testCase.providerID)
233+
assert.NoError(t, err)
234+
assert.Equal(t, testCase.instanceID, ret)
235+
}
236+
}

0 commit comments

Comments
 (0)
Please sign in to comment.