Skip to content

Add support for resource-specific resync periods and default drift remediation period #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 6, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 105 additions & 28 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,9 @@ import (
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
@@ -32,20 +35,22 @@ import (
)

const (
flagEnableLeaderElection = "enable-leader-election"
flagMetricAddr = "metrics-addr"
flagEnableDevLogging = "enable-development-logging"
flagAWSRegion = "aws-region"
flagAWSEndpointURL = "aws-endpoint-url"
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
flagLogLevel = "log-level"
flagResourceTags = "resource-tags"
flagWatchNamespace = "watch-namespace"
flagEnableWebhookServer = "enable-webhook-server"
flagWebhookServerAddr = "webhook-server-addr"
flagDeletionPolicy = "deletion-policy"
envVarAWSRegion = "AWS_REGION"
flagEnableLeaderElection = "enable-leader-election"
flagMetricAddr = "metrics-addr"
flagEnableDevLogging = "enable-development-logging"
flagAWSRegion = "aws-region"
flagAWSEndpointURL = "aws-endpoint-url"
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
flagLogLevel = "log-level"
flagResourceTags = "resource-tags"
flagWatchNamespace = "watch-namespace"
flagEnableWebhookServer = "enable-webhook-server"
flagWebhookServerAddr = "webhook-server-addr"
flagDeletionPolicy = "deletion-policy"
flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds"
flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds"
envVarAWSRegion = "AWS_REGION"
)

var (
@@ -63,20 +68,22 @@ var (

// Config contains configuration options for ACK service controllers
type Config struct {
MetricsAddr string
EnableLeaderElection bool
EnableDevelopmentLogging bool
AccountID string
Region string
IdentityEndpointURL string
EndpointURL string
AllowUnsafeEndpointURL bool
LogLevel string
ResourceTags []string
WatchNamespace string
EnableWebhookServer bool
WebhookServerAddr string
DeletionPolicy ackv1alpha1.DeletionPolicy
MetricsAddr string
EnableLeaderElection bool
EnableDevelopmentLogging bool
AccountID string
Region string
IdentityEndpointURL string
EndpointURL string
AllowUnsafeEndpointURL bool
LogLevel string
ResourceTags []string
WatchNamespace string
EnableWebhookServer bool
WebhookServerAddr string
DeletionPolicy ackv1alpha1.DeletionPolicy
ReconcileDefaultResyncSeconds int
ReconcileResourceResyncSeconds []string
}

// BindFlags defines CLI/runtime configuration options
@@ -152,6 +159,19 @@ func (cfg *Config) BindFlags() {
&cfg.DeletionPolicy, flagDeletionPolicy,
"The default deletion policy for all resources managed by the controller",
)
flag.IntVar(
&cfg.ReconcileDefaultResyncSeconds, flagReconcileDefaultResyncSeconds,
60,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah woah wait we don't want every resource to try and reconcile itself every 60 seconds. That's way too often. I was thinking on the order of every 6 or even 10 hours.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree for following reasons:

  1. We risk causing throttling error as number of resources managed increase
  2. There can be side affect of No 1 on inter service communication, e.g. services which use Autoscaling, SageMaker hosting service makes call to autoscaling service and if throttling happens, the SageMaker service will get throttled and impact scale in/out of fleet serving customer traffic
  3. Not all resources need this, e.g. SageMaker training jobs are one time jobs, SageMaker models do not support updates so the only way to introduce drift is by deleting and recreating the entire resource outside of ACK but still not resolvable by reconciling.

My vote is to drop this flag completely and let the service teams use requeue_on_success to configure or let the customer override based on their preference. I strongly suggest starting conservative here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can still support this default resync period and also have a setting that ignores this for one-time style resources like TrainingJob. Call that a future feature request. Managing resync periods for every resource individually would require users to update their configuration every time we add a new resource - this is essentially just a shortcut for all of them.

I actually believe I don't want service teams to set requeue_on_success at all. It's ultimately more important that a customer can set their expectations for requeuing resources, since they know their drift conditions better than any ACK contributor will. Sure some resources should be shorter than 10 hours, but whether it's every 5 hours or every 10 minutes, it'll depend on everyone's specific deployment context.

"The default duration, in seconds, to wait before resyncing desired state of custom resources. "+
"This value is used if no resource-specific override has been specified. Default is 60 seconds.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update this description for the new default

)
flag.StringArrayVar(
&cfg.ReconcileResourceResyncSeconds, flagReconcileResourceResyncSeconds,
[]string{},
"A Key/Value list of strings representing the reconcile resync configuration for each resource. This"+
" configuration maps resource kinds to drift remediation periods in seconds. If provided, "+
" resource-specific resync periods take precedence over the default period.",
)
}

// SetupLogger initializes the logger used in the service controller
@@ -233,6 +253,16 @@ func (cfg *Config) Validate() error {
if cfg.DeletionPolicy == "" {
cfg.DeletionPolicy = ackv1alpha1.DeletionPolicyDelete
}

if cfg.ReconcileDefaultResyncSeconds < 0 {
return fmt.Errorf("invalid value for flag '%s': resync seconds default must be greater than 0", flagReconcileDefaultResyncSeconds)
}

_, err := cfg.ParseReconcileResourceResyncSeconds()
if err != nil {
return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceResyncSeconds, err)
}

return nil
}

@@ -244,3 +274,50 @@ func (cfg *Config) checkUnsafeEndpoint(endpoint *url.URL) error {
}
return nil
}

// ParseReconcileResourceResyncSeconds parses the values of the --reconcile-resource-resync-seconds
// flag and returns a map that maps resource names to resync periods.
// The flag arguments are expected to have the format "resource=seconds", where "resource" is the
// name of the resource and "seconds" is the number of seconds that the reconciler should wait before
// reconciling the resource again.
func (cfg *Config) ParseReconcileResourceResyncSeconds() (map[string]time.Duration, error) {
resourceResyncPeriods := make(map[string]time.Duration, len(cfg.ReconcileResourceResyncSeconds))
for _, resourceResyncSecondsFlag := range cfg.ReconcileResourceResyncSeconds {
// Parse the resource name and resync period from the flag argument
resourceName, resyncSeconds, err := parseReconcileFlagArgument(resourceResyncSecondsFlag)
if err != nil {
return nil, fmt.Errorf("error parsing flag argument '%v': %v. Expected format: resource=seconds", resourceResyncSecondsFlag, err)
}
resourceResyncPeriods[strings.ToLower(resourceName)] = time.Duration(resyncSeconds)
}
return resourceResyncPeriods, nil
}

// parseReconcileFlagArgument parses a flag argument of the form "key=value" into
// its individual elements. The key must be a non-empty string and the value must be
// a non-empty positive integer. If the flag argument is not in the expected format
// or has invalid elements, an error is returned.
//
// The function returns the parsed key and value as separate elements.
func parseReconcileFlagArgument(flagArgument string) (string, int, error) {
delimiter := "="
elements := strings.Split(flagArgument, delimiter)
if len(elements) != 2 {
return "", 0, fmt.Errorf("invalid flag argument format: expected key=value")
}
if elements[0] == "" {
return "", 0, fmt.Errorf("missing key in flag argument")
}
if elements[1] == "" {
return "", 0, fmt.Errorf("missing value in flag argument")
}

resyncSeconds, err := strconv.Atoi(elements[1])
if err != nil {
return "", 0, fmt.Errorf("invalid value in flag argument: %v", err)
}
if resyncSeconds < 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think resyncSeconds <= 0 should be the check. If there are 0 seconds of reconciliation, it'll be constantly reconciled with no exponential backoff. Maybe we can suggest at least 1 second of wait?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked here, if a used provides 0 seconds, the controller will use the package default value which is 10hours

return "", 0, fmt.Errorf("invalid value in flag argument: expected non-negative integer, got %d", resyncSeconds)
}
return elements[0], resyncSeconds, nil
}
61 changes: 61 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package config

import "testing"

func TestParseReconcileFlagArgument(t *testing.T) {
tests := []struct {
flagArgument string
expectedKey string
expectedVal int
expectedErr bool
expectedErrMsg string
}{
// Test valid flag arguments
{"key=1", "key", 1, false, ""},
{"key=123456", "key", 123456, false, ""},
{"key=600", "key", 600, false, ""},
{"k=1", "k", 1, false, ""},
{"ke_y=123456", "ke_y", 123456, false, ""},

// Test invalid flag arguments
{"key", "", 0, true, "invalid flag argument format: expected key=value"},
{"key=", "", 0, true, "missing value in flag argument"},
{"=value", "", 0, true, "missing key in flag argument"},
{"key=value1=value2", "", 0, true, "invalid flag argument format: expected key=value"},
{"key=a", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"a\": invalid syntax"},
{"key=-1", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -1"},
{"key=-123456", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -123456"},
{"key=1.1", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"1.1\": invalid syntax"},
}
for _, test := range tests {
key, val, err := parseReconcileFlagArgument(test.flagArgument)
if err != nil && !test.expectedErr {
t.Errorf("unexpected error for flag argument '%s': %v", test.flagArgument, err)
}
if err == nil && test.expectedErr {
t.Errorf("expected error for flag argument '%s', got nil", test.flagArgument)
}
if err != nil && err.Error() != test.expectedErrMsg {
t.Errorf("unexpected error message for flag argument '%s': expected '%s', got '%v'", test.flagArgument, test.expectedErrMsg, err)
}
if key != test.expectedKey {
t.Errorf("unexpected key for flag argument '%s': expected '%s', got '%s'", test.flagArgument, test.expectedKey, key)
}
if val != test.expectedVal {
t.Errorf("unexpected value for flag argument '%s': expected %d, got %d", test.flagArgument, test.expectedVal, val)
}
}
}
97 changes: 66 additions & 31 deletions pkg/runtime/reconciler.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

backoff "github.com/cenkalti/backoff/v4"
@@ -47,7 +48,7 @@ const (
// The default duration to trigger the sync for an ACK resource after
// the successful reconciliation. This behavior for a resource can be
// overriden by RequeueOnSuccessSeconds configuration for that resource.
resyncPeriod = 10 * time.Hour
defaultResyncPeriod = 10 * time.Hour
)

// reconciler describes a generic reconciler within ACK.
@@ -70,8 +71,9 @@ type reconciler struct {
// object)s and sharing watch and informer queues across those controllers.
type resourceReconciler struct {
reconciler
rmf acktypes.AWSResourceManagerFactory
rd acktypes.AWSResourceDescriptor
rmf acktypes.AWSResourceManagerFactory
rd acktypes.AWSResourceDescriptor
resyncPeriod time.Duration
}

// GroupKind returns the string containing the API group and kind reconciled by
@@ -887,31 +889,8 @@ func (r *resourceReconciler) handleRequeues(
}
// The code below only executes for "ConditionTypeResourceSynced"
if condition.Status == corev1.ConditionTrue {
if duration := r.rmf.RequeueOnSuccessSeconds(); duration > 0 {
rlog.Debug(
"requeueing resource after resource synced condition true",
)
return latest, requeue.NeededAfter(nil, time.Duration(duration)*time.Second)
}
// Since RequeueOnSuccessSeconds <= 0, requeue the resource
// with "resyncPeriod" to perform periodic drift detection and
// sync the desired state.
//
// Upstream controller-runtime provides SyncPeriod functionality
// which flushes the go-client cache and triggers Sync for all
// the objects in cache every 10 hours by default.
//
// ACK controller use non-cached client to read objects
// from API Server, hence controller-runtime's SyncPeriod
// functionality does not work.
// https://github.com/aws-controllers-k8s/community/issues/1355
//
// ACK controllers use api-reader(non-cached client) to avoid
// reading stale copies of ACK resource that can cause
// duplicate resource creation when resource identifier is
// not present in stale copy of resource.
// https://github.com/aws-controllers-k8s/community/issues/894#issuecomment-911876354
return latest, requeue.NeededAfter(nil, resyncPeriod)
rlog.Debug("requeuing", "after", r.resyncPeriod)
return latest, requeue.NeededAfter(nil, r.resyncPeriod)
} else {
rlog.Debug(
"requeueing resource after finding resource synced condition false",
@@ -1103,6 +1082,55 @@ func (r *resourceReconciler) getEndpointURL(
return r.cfg.EndpointURL
}

// getResyncPeriod returns the period of the recurring reconciler process which ensures the desired
// state of custom resources is maintained.
// It attempts to retrieve the duration from the following sources, in this order:
// 1. A resource-specific reconciliation resync period specified in the reconciliation resync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a super fan of resource-level drift remediation control - but happy to hear what other folks think about it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why resync configuration map?

can we use annotation on the resource like other existing features e.g.

* https://aws-controllers-k8s.github.io/community/docs/user-docs/multi-region-resource-management/

* https://aws-controllers-k8s.github.io/community/docs/user-docs/deletion-policy/

@surajkota for both of those annotations, there is a corresponding controller CLI flag:

flag.StringVar(
&cfg.Region, flagAWSRegion,
envutil.WithDefault(envVarAWSRegion, ""),
"The AWS Region in which the service controller will create its resources",
)

flag.Var(
&cfg.DeletionPolicy, flagDeletionPolicy,
"The default deletion policy for all resources managed by the controller",
)

The CLI flags serve as defaults if the annotation is not present.

Copy link
Member

@surajkota surajkota Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack on the controller flag, my question is related to highest in order to precedence.

Annotation or configmap, both are giving control to the user to configure the resync period. Since we don't have a common configmap or a CRD to define all controller configurations, my preference is to keep the experience consistent and not introduce another place~~

Copy link
Member

@surajkota surajkota Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi folks, Nick helped me with clarification that resync configuration map != ConfigMap, so we can resolve this comment with one suggestion, rename the comment to say resource resync CLI flag or something along those lines

I do think even CLI flag is not a great option because changing it requires restarting the controller or reinstalling helm chart but thats a discussion for another time. This is good to get started

see my comment on No 3 here. We should drop it

// configuration map.
// 2. A resource-specific requeue on success period specified by the resource manager factory.
// The resource manager factory is controller-specific, and thus this period is to specified
// by controller authors.
// 3. The default reconciliation resync period period specified in the controller binary flags.
// 4. The default resync period defined in the ACK runtime package. Defined in defaultResyncPeriod
// within the same file
//
// Each reconciler has a unique value to use. This function should only be called during the
// instantiation of an AWSResourceReconciler and should not be called during the reconciliation
// function r.Sync
func getResyncPeriod(rmf acktypes.AWSResourceManagerFactory, cfg ackcfg.Config) time.Duration {
// The reconciliation resync period configuration has already been validated as
// a clean map. Therefore, we can safely ignore any errors that may occur while
// parsing it and avoid changing the signature of NewReconcilerWithClient.
drc, _ := cfg.ParseReconcileResourceResyncSeconds()

// First, try to use a resource-specific resync period if provided in the resource
// resync period configuration.
resourceKind := rmf.ResourceDescriptor().GroupKind().Kind
if duration, ok := drc[strings.ToLower(resourceKind)]; ok && duration > 0 {
return time.Duration(duration) * time.Second
}

// Second, try to use a resource-specific requeue on success period specified by the
// resource manager factory. This value is set during the code generation of the
// controller and takes precedence over the default resync period period because
// it allows existing controllers that rely on this value to maintain their intended
// behavior.
if duration := rmf.RequeueOnSuccessSeconds(); duration > 0 {
return time.Duration(duration) * time.Second
}

// Third, try to use the default resync period resync period specified during controller
// start-up.
if cfg.ReconcileDefaultResyncSeconds > 0 {
return time.Duration(cfg.ReconcileDefaultResyncSeconds) * time.Second
}

// If none of the above values are present or valid, use the default resync period
// defined in the ACK runtime package. Defined in `defaultResyncPeriod` within the
// same file
return defaultResyncPeriod
}

// NewReconciler returns a new reconciler object
func NewReconciler(
sc acktypes.ServiceController,
@@ -1126,16 +1154,23 @@ func NewReconcilerWithClient(
metrics *ackmetrics.Metrics,
cache ackrtcache.Caches,
) acktypes.AWSResourceReconciler {
rtLog := log.WithName("ackrt")
resyncPeriod := getResyncPeriod(rmf, cfg)
rtLog.V(1).Info("Initiating reconciler",
"reconciler kind", rmf.ResourceDescriptor().GroupKind().Kind,
"resync period seconds", resyncPeriod.Seconds(),
)
return &resourceReconciler{
reconciler: reconciler{
sc: sc,
kc: kc,
log: log.WithName("ackrt"),
log: rtLog,
cfg: cfg,
metrics: metrics,
cache: cache,
},
rmf: rmf,
rd: rmf.ResourceDescriptor(),
rmf: rmf,
rd: rmf.ResourceDescriptor(),
resyncPeriod: resyncPeriod,
}
}
1 change: 1 addition & 0 deletions pkg/runtime/reconciler_test.go
Original file line number Diff line number Diff line change
@@ -136,6 +136,7 @@ func managerFactoryMocks(

rmf := &ackmocks.AWSResourceManagerFactory{}
rmf.On("ResourceDescriptor").Return(rd)
rmf.On("RequeueOnSuccessSeconds").Return(0)

reg := ackrt.NewRegistry()
reg.RegisterResourceManagerFactory(rmf)
1 change: 1 addition & 0 deletions pkg/runtime/service_controller_test.go
Original file line number Diff line number Diff line change
@@ -143,6 +143,7 @@ func TestServiceController(t *testing.T) {

rmf := &mocks.AWSResourceManagerFactory{}
rmf.On("ResourceDescriptor").Return(rd)
rmf.On("RequeueOnSuccessSeconds").Return(0)

reg := ackrt.NewRegistry()
reg.RegisterResourceManagerFactory(rmf)