Skip to content

Commit ad6dcd6

Browse files
committed
instances: initial implementation
1 parent 211d51d commit ad6dcd6

File tree

4 files changed

+525
-4
lines changed

4 files changed

+525
-4
lines changed

Diff for: go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ replace (
2828

2929
require (
3030
github.com/aws/aws-sdk-go v1.28.2
31+
github.com/google/uuid v1.1.1
3132
github.com/spf13/cobra v1.0.0
3233
github.com/spf13/pflag v1.0.5
3334
github.com/stretchr/testify v1.4.0
35+
k8s.io/api v0.0.0
3436
k8s.io/apimachinery v0.0.0
3537
k8s.io/apiserver v0.0.0
3638
k8s.io/cloud-provider v0.0.0

Diff for: pkg/providers/v2/cloud.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ var _ cloudprovider.Interface = (*cloud)(nil)
4646

4747
// cloud is the AWS v2 implementation of the cloud provider interface
4848
type cloud struct {
49-
creds *credentials.Credentials
50-
region string
49+
creds *credentials.Credentials
50+
instances cloudprovider.InstancesV2
51+
region string
5152
}
5253

5354
// EC2Metadata is an abstraction over the AWS metadata service.
@@ -107,9 +108,15 @@ func newCloud() (cloudprovider.Interface, error) {
107108
return nil, err
108109
}
109110

111+
instances, err := newInstances(region, creds)
112+
if err != nil {
113+
return nil, err
114+
}
115+
110116
return &cloud{
111-
creds: creds,
112-
region: region,
117+
creds: creds,
118+
instances: instances,
119+
region: region,
113120
}, nil
114121
}
115122

Diff for: pkg/providers/v2/instances.go

+335
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
// Package v2 is an out-of-tree only implementation of the AWS cloud provider.
15+
// It is not compatible with v1 and should only be used on new clusters.
16+
package v2
17+
18+
import (
19+
"context"
20+
"errors"
21+
"fmt"
22+
"net"
23+
"strings"
24+
25+
"github.com/aws/aws-sdk-go/aws"
26+
"github.com/aws/aws-sdk-go/aws/credentials"
27+
"github.com/aws/aws-sdk-go/aws/session"
28+
"github.com/aws/aws-sdk-go/service/ec2"
29+
30+
v1 "k8s.io/api/core/v1"
31+
cloudprovider "k8s.io/cloud-provider"
32+
"k8s.io/klog"
33+
)
34+
35+
// newInstances returns an implementation of cloudprovider.InstancesV2
36+
func newInstances(region string, creds *credentials.Credentials) (cloudprovider.InstancesV2, error) {
37+
awsConfig := &aws.Config{
38+
Region: aws.String(region),
39+
Credentials: creds,
40+
}
41+
awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true)
42+
43+
sess, err := session.NewSession(awsConfig)
44+
if err != nil {
45+
return nil, err
46+
}
47+
ec2Service := ec2.New(sess)
48+
49+
return &instances{
50+
ec2: ec2Service,
51+
}, nil
52+
}
53+
54+
// EC2 is an interface defining only the methods we call from the AWS EC2 SDK.
55+
type EC2 interface {
56+
DescribeInstances(request *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
57+
}
58+
59+
// instances is an implementation of cloudprovider.InstancesV2
60+
type instances struct {
61+
ec2 EC2
62+
}
63+
64+
// InstanceExists indicates whether a given node exists according to the cloud provider
65+
func (i *instances) InstanceExists(ctx context.Context, node *v1.Node) (bool, error) {
66+
var err error
67+
if node.Spec.ProviderID == "" {
68+
_, err = i.getInstanceByPrivateDNSName(ctx, node.Name)
69+
if err == cloudprovider.InstanceNotFound {
70+
return false, nil
71+
}
72+
73+
if err != nil {
74+
return false, err
75+
}
76+
} else {
77+
_, err = i.getInstanceByProviderID(ctx, node.Spec.ProviderID)
78+
if err == cloudprovider.InstanceNotFound {
79+
return false, nil
80+
}
81+
82+
if err != nil {
83+
return false, err
84+
}
85+
}
86+
87+
return true, nil
88+
}
89+
90+
// InstanceShutdown returns true if the instance is shutdown according to the cloud provider.
91+
func (i *instances) InstanceShutdown(ctx context.Context, node *v1.Node) (bool, error) {
92+
var err error
93+
var ret bool
94+
if node.Spec.ProviderID == "" {
95+
ret, err = i.instanceShutdownByPrivateDNSName(ctx, node.Name)
96+
} else {
97+
ret, err = i.instanceShutdownByProviderID(ctx, node.Spec.ProviderID)
98+
}
99+
100+
return ret, err
101+
}
102+
103+
func (i *instances) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloudprovider.InstanceMetadata, error) {
104+
var err error
105+
var ec2Instance *ec2.Instance
106+
if node.Spec.ProviderID == "" {
107+
// TODO: support node name policy other than private DNS names
108+
ec2Instance, err = i.getInstanceByPrivateDNSName(ctx, node.Name)
109+
if err != nil {
110+
return nil, err
111+
}
112+
} else {
113+
ec2Instance, err = i.getInstanceByProviderID(ctx, node.Spec.ProviderID)
114+
if err != nil {
115+
return nil, err
116+
}
117+
}
118+
119+
nodeAddresses, err := nodeAddressesForInstance(ec2Instance)
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
providerID, err := getInstanceProviderID(ec2Instance)
125+
if err != nil {
126+
return nil, err
127+
}
128+
129+
metadata := &cloudprovider.InstanceMetadata{
130+
ProviderID: providerID,
131+
InstanceType: aws.StringValue(ec2Instance.InstanceType),
132+
NodeAddresses: nodeAddresses,
133+
}
134+
135+
return metadata, nil
136+
}
137+
138+
// InstanceExistsByProviderID returns the instance if the instance with the given provider id still exists.
139+
// If false an error will be returned, the instance will be immediately deleted by the cloud controller manager.
140+
func (i *instances) getInstanceByProviderID(ctx context.Context, providerID string) (*ec2.Instance, error) {
141+
instanceID, err := parseInstanceIDFromProviderID(providerID)
142+
if err != nil {
143+
return nil, err
144+
}
145+
146+
request := &ec2.DescribeInstancesInput{
147+
InstanceIds: []*string{aws.String(instanceID)},
148+
}
149+
150+
instances := []*ec2.Instance{}
151+
var nextToken *string
152+
for {
153+
response, err := i.ec2.DescribeInstances(request)
154+
if err != nil {
155+
return nil, fmt.Errorf("error describing ec2 instances: %v", err)
156+
}
157+
158+
for _, reservation := range response.Reservations {
159+
instances = append(instances, reservation.Instances...)
160+
}
161+
162+
nextToken = response.NextToken
163+
if aws.StringValue(nextToken) == "" {
164+
break
165+
}
166+
request.NextToken = nextToken
167+
}
168+
169+
if len(instances) == 0 {
170+
return nil, cloudprovider.InstanceNotFound
171+
}
172+
173+
if len(instances) > 1 {
174+
return nil, fmt.Errorf("multiple instances found with provider ID: %s", instanceID)
175+
}
176+
177+
state := instances[0].State.Name
178+
if *state == ec2.InstanceStateNameTerminated {
179+
klog.Warningf("the instance %s is terminated", instanceID)
180+
return nil, nil
181+
}
182+
183+
return instances[0], nil
184+
}
185+
186+
// InstanceExistsByPrivateDNSName returns the instance if the instance with the given provider id still exists.
187+
// If false an error will be returned, the instance will be immediately deleted by the cloud controller manager.
188+
func (i *instances) getInstanceByPrivateDNSName(ctx context.Context, nodeName string) (*ec2.Instance, error) {
189+
request := &ec2.DescribeInstancesInput{
190+
Filters: []*ec2.Filter{
191+
newEc2Filter("private-dns-name", nodeName),
192+
newEc2Filter("instance-state-name", aliveFilter...),
193+
},
194+
}
195+
196+
instances := []*ec2.Instance{}
197+
var nextToken *string
198+
for {
199+
response, err := i.ec2.DescribeInstances(request)
200+
if err != nil {
201+
return nil, fmt.Errorf("error describing ec2 instances: %v", err)
202+
}
203+
204+
for _, reservation := range response.Reservations {
205+
instances = append(instances, reservation.Instances...)
206+
}
207+
208+
nextToken = response.NextToken
209+
if aws.StringValue(nextToken) == "" {
210+
break
211+
}
212+
request.NextToken = nextToken
213+
}
214+
215+
if len(instances) == 0 {
216+
return nil, cloudprovider.InstanceNotFound
217+
}
218+
219+
if len(instances) > 1 {
220+
return nil, fmt.Errorf("multiple instances found with private DNS name: %s", nodeName)
221+
}
222+
223+
return instances[0], nil
224+
}
225+
226+
// instanceShutdownByProviderID returns true if the instance is shutdown according to the cloud provider.
227+
func (i *instances) instanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
228+
ec2Instance, err := i.getInstanceByProviderID(ctx, providerID)
229+
if err != nil {
230+
return false, err
231+
}
232+
233+
if ec2Instance.State != nil {
234+
state := aws.StringValue(ec2Instance.State.Name)
235+
if state == ec2.InstanceStateNameTerminated || state == ec2.InstanceStateNameStopping || state == ec2.InstanceStateNameStopped {
236+
return true, nil
237+
}
238+
}
239+
240+
return false, nil
241+
}
242+
243+
// instanceShutdownByPrivateDNSName returns true if the instance is shutdown according to the cloud provider.
244+
func (i *instances) instanceShutdownByPrivateDNSName(ctx context.Context, nodeName string) (bool, error) {
245+
ec2Instance, err := i.getInstanceByPrivateDNSName(ctx, nodeName)
246+
if err != nil {
247+
return false, err
248+
}
249+
250+
if ec2Instance.State != nil {
251+
state := aws.StringValue(ec2Instance.State.Name)
252+
if state == ec2.InstanceStateNameTerminated || state == ec2.InstanceStateNameStopping || state == ec2.InstanceStateNameStopped {
253+
return true, nil
254+
}
255+
}
256+
257+
return false, nil
258+
}
259+
260+
// nodeAddresses for Instance returns a list of v1.NodeAddress for the give instance.
261+
// TODO: should we support ExternalIP by default?
262+
func nodeAddressesForInstance(instance *ec2.Instance) ([]v1.NodeAddress, error) {
263+
if instance == nil {
264+
return nil, errors.New("provided instances is nil")
265+
}
266+
267+
addresses := []v1.NodeAddress{}
268+
for _, networkInterface := range instance.NetworkInterfaces {
269+
// skip network interfaces that are not currently in use
270+
if aws.StringValue(networkInterface.Status) != ec2.NetworkInterfaceStatusInUse {
271+
continue
272+
}
273+
274+
for _, privateIP := range networkInterface.PrivateIpAddresses {
275+
if ipAddress := aws.StringValue(privateIP.PrivateIpAddress); ipAddress != "" {
276+
ip := net.ParseIP(ipAddress)
277+
if ip == nil {
278+
return nil, fmt.Errorf("invalid IP address %q from instance %q", ipAddress, aws.StringValue(instance.InstanceId))
279+
}
280+
281+
addresses = append(addresses, v1.NodeAddress{
282+
Type: v1.NodeInternalIP,
283+
Address: ip.String(),
284+
})
285+
}
286+
}
287+
}
288+
289+
return addresses, nil
290+
}
291+
292+
// getInstanceProviderID returns the provider ID of an instance which is ultimately set in the node.Spec.ProviderID field.
293+
// The well-known format for a node's providerID is:
294+
// * aws://<availability-zone>/<instance-id>
295+
func getInstanceProviderID(instance *ec2.Instance) (string, error) {
296+
if aws.StringValue(instance.Placement.AvailabilityZone) == "" {
297+
return "", errors.New("instance availability zone was not set")
298+
}
299+
300+
if aws.StringValue(instance.InstanceId) == "" {
301+
return "", errors.New("instance ID was not set")
302+
}
303+
304+
return "aws://" + aws.StringValue(instance.Placement.AvailabilityZone) + "/" + aws.StringValue(instance.InstanceId), nil
305+
}
306+
307+
// parseInstanceIDFromProviderID parses the node's instance ID based on the well-known provider ID format:
308+
// * aws://<availability-zone>/<instance-id>
309+
// This function always assumes a valid providerID format was provided.
310+
func parseInstanceIDFromProviderID(providerID string) (string, error) {
311+
// trim the provider name prefix 'aws://', renaming providerID should contain metadata in the format:
312+
// <availability-zone>/<instance-id>
313+
metadata := strings.Split(strings.TrimPrefix(providerID, "aws://"), "/")
314+
return metadata[1], nil
315+
}
316+
317+
var aliveFilter = []string{
318+
ec2.InstanceStateNamePending,
319+
ec2.InstanceStateNameRunning,
320+
ec2.InstanceStateNameShuttingDown,
321+
ec2.InstanceStateNameStopping,
322+
ec2.InstanceStateNameStopped,
323+
}
324+
325+
func newEc2Filter(name string, values ...string) *ec2.Filter {
326+
filter := &ec2.Filter{
327+
Name: aws.String(name),
328+
}
329+
330+
for _, value := range values {
331+
filter.Values = append(filter.Values, aws.String(value))
332+
}
333+
334+
return filter
335+
}

0 commit comments

Comments
 (0)