diff --git a/pkg/config/config.go b/pkg/config/config.go index 08a8403..2cfe3eb 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -17,6 +17,9 @@ import ( "errors" "fmt" "net/url" + "strconv" + "strings" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -32,20 +35,22 @@ import ( ) const ( - flagEnableLeaderElection = "enable-leader-election" - flagMetricAddr = "metrics-addr" - flagEnableDevLogging = "enable-development-logging" - flagAWSRegion = "aws-region" - flagAWSEndpointURL = "aws-endpoint-url" - flagAWSIdentityEndpointURL = "aws-identity-endpoint-url" - flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls" - flagLogLevel = "log-level" - flagResourceTags = "resource-tags" - flagWatchNamespace = "watch-namespace" - flagEnableWebhookServer = "enable-webhook-server" - flagWebhookServerAddr = "webhook-server-addr" - flagDeletionPolicy = "deletion-policy" - envVarAWSRegion = "AWS_REGION" + flagEnableLeaderElection = "enable-leader-election" + flagMetricAddr = "metrics-addr" + flagEnableDevLogging = "enable-development-logging" + flagAWSRegion = "aws-region" + flagAWSEndpointURL = "aws-endpoint-url" + flagAWSIdentityEndpointURL = "aws-identity-endpoint-url" + flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls" + flagLogLevel = "log-level" + flagResourceTags = "resource-tags" + flagWatchNamespace = "watch-namespace" + flagEnableWebhookServer = "enable-webhook-server" + flagWebhookServerAddr = "webhook-server-addr" + flagDeletionPolicy = "deletion-policy" + flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds" + flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds" + envVarAWSRegion = "AWS_REGION" ) var ( @@ -63,20 +68,22 @@ var ( // Config contains configuration options for ACK service controllers type Config struct { - MetricsAddr string - EnableLeaderElection bool - EnableDevelopmentLogging bool - AccountID string - Region string - IdentityEndpointURL string - EndpointURL string - AllowUnsafeEndpointURL bool - LogLevel string - ResourceTags []string - WatchNamespace string - EnableWebhookServer bool - WebhookServerAddr string - DeletionPolicy ackv1alpha1.DeletionPolicy + MetricsAddr string + EnableLeaderElection bool + EnableDevelopmentLogging bool + AccountID string + Region string + IdentityEndpointURL string + EndpointURL string + AllowUnsafeEndpointURL bool + LogLevel string + ResourceTags []string + WatchNamespace string + EnableWebhookServer bool + WebhookServerAddr string + DeletionPolicy ackv1alpha1.DeletionPolicy + ReconcileDefaultResyncSeconds int + ReconcileResourceResyncSeconds []string } // BindFlags defines CLI/runtime configuration options @@ -152,6 +159,19 @@ func (cfg *Config) BindFlags() { &cfg.DeletionPolicy, flagDeletionPolicy, "The default deletion policy for all resources managed by the controller", ) + flag.IntVar( + &cfg.ReconcileDefaultResyncSeconds, flagReconcileDefaultResyncSeconds, + 0, + "The default duration, in seconds, to wait before resyncing desired state of custom resources. "+ + "This value is used if no resource-specific override has been specified. Default is 10 hours.", + ) + flag.StringArrayVar( + &cfg.ReconcileResourceResyncSeconds, flagReconcileResourceResyncSeconds, + []string{}, + "A Key/Value list of strings representing the reconcile resync configuration for each resource. This"+ + " configuration maps resource kinds to drift remediation periods in seconds. If provided, "+ + " resource-specific resync periods take precedence over the default period.", + ) } // SetupLogger initializes the logger used in the service controller @@ -233,6 +253,16 @@ func (cfg *Config) Validate() error { if cfg.DeletionPolicy == "" { cfg.DeletionPolicy = ackv1alpha1.DeletionPolicyDelete } + + if cfg.ReconcileDefaultResyncSeconds < 0 { + return fmt.Errorf("invalid value for flag '%s': resync seconds default must be greater than 0", flagReconcileDefaultResyncSeconds) + } + + _, err := cfg.ParseReconcileResourceResyncSeconds() + if err != nil { + return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceResyncSeconds, err) + } + return nil } @@ -244,3 +274,50 @@ func (cfg *Config) checkUnsafeEndpoint(endpoint *url.URL) error { } return nil } + +// ParseReconcileResourceResyncSeconds parses the values of the --reconcile-resource-resync-seconds +// flag and returns a map that maps resource names to resync periods. +// The flag arguments are expected to have the format "resource=seconds", where "resource" is the +// name of the resource and "seconds" is the number of seconds that the reconciler should wait before +// reconciling the resource again. +func (cfg *Config) ParseReconcileResourceResyncSeconds() (map[string]time.Duration, error) { + resourceResyncPeriods := make(map[string]time.Duration, len(cfg.ReconcileResourceResyncSeconds)) + for _, resourceResyncSecondsFlag := range cfg.ReconcileResourceResyncSeconds { + // Parse the resource name and resync period from the flag argument + resourceName, resyncSeconds, err := parseReconcileFlagArgument(resourceResyncSecondsFlag) + if err != nil { + return nil, fmt.Errorf("error parsing flag argument '%v': %v. Expected format: resource=seconds", resourceResyncSecondsFlag, err) + } + resourceResyncPeriods[strings.ToLower(resourceName)] = time.Duration(resyncSeconds) + } + return resourceResyncPeriods, nil +} + +// parseReconcileFlagArgument parses a flag argument of the form "key=value" into +// its individual elements. The key must be a non-empty string and the value must be +// a non-empty positive integer. If the flag argument is not in the expected format +// or has invalid elements, an error is returned. +// +// The function returns the parsed key and value as separate elements. +func parseReconcileFlagArgument(flagArgument string) (string, int, error) { + delimiter := "=" + elements := strings.Split(flagArgument, delimiter) + if len(elements) != 2 { + return "", 0, fmt.Errorf("invalid flag argument format: expected key=value") + } + if elements[0] == "" { + return "", 0, fmt.Errorf("missing key in flag argument") + } + if elements[1] == "" { + return "", 0, fmt.Errorf("missing value in flag argument") + } + + resyncSeconds, err := strconv.Atoi(elements[1]) + if err != nil { + return "", 0, fmt.Errorf("invalid value in flag argument: %v", err) + } + if resyncSeconds < 0 { + return "", 0, fmt.Errorf("invalid value in flag argument: expected non-negative integer, got %d", resyncSeconds) + } + return elements[0], resyncSeconds, nil +} diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go new file mode 100644 index 0000000..852f558 --- /dev/null +++ b/pkg/config/config_test.go @@ -0,0 +1,61 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package config + +import "testing" + +func TestParseReconcileFlagArgument(t *testing.T) { + tests := []struct { + flagArgument string + expectedKey string + expectedVal int + expectedErr bool + expectedErrMsg string + }{ + // Test valid flag arguments + {"key=1", "key", 1, false, ""}, + {"key=123456", "key", 123456, false, ""}, + {"key=600", "key", 600, false, ""}, + {"k=1", "k", 1, false, ""}, + {"ke_y=123456", "ke_y", 123456, false, ""}, + + // Test invalid flag arguments + {"key", "", 0, true, "invalid flag argument format: expected key=value"}, + {"key=", "", 0, true, "missing value in flag argument"}, + {"=value", "", 0, true, "missing key in flag argument"}, + {"key=value1=value2", "", 0, true, "invalid flag argument format: expected key=value"}, + {"key=a", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"a\": invalid syntax"}, + {"key=-1", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -1"}, + {"key=-123456", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -123456"}, + {"key=1.1", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"1.1\": invalid syntax"}, + } + for _, test := range tests { + key, val, err := parseReconcileFlagArgument(test.flagArgument) + if err != nil && !test.expectedErr { + t.Errorf("unexpected error for flag argument '%s': %v", test.flagArgument, err) + } + if err == nil && test.expectedErr { + t.Errorf("expected error for flag argument '%s', got nil", test.flagArgument) + } + if err != nil && err.Error() != test.expectedErrMsg { + t.Errorf("unexpected error message for flag argument '%s': expected '%s', got '%v'", test.flagArgument, test.expectedErrMsg, err) + } + if key != test.expectedKey { + t.Errorf("unexpected key for flag argument '%s': expected '%s', got '%s'", test.flagArgument, test.expectedKey, key) + } + if val != test.expectedVal { + t.Errorf("unexpected value for flag argument '%s': expected %d, got %d", test.flagArgument, test.expectedVal, val) + } + } +} diff --git a/pkg/runtime/reconciler.go b/pkg/runtime/reconciler.go index 1d5d916..262fc1b 100644 --- a/pkg/runtime/reconciler.go +++ b/pkg/runtime/reconciler.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" backoff "github.com/cenkalti/backoff/v4" @@ -47,7 +48,7 @@ const ( // The default duration to trigger the sync for an ACK resource after // the successful reconciliation. This behavior for a resource can be // overriden by RequeueOnSuccessSeconds configuration for that resource. - resyncPeriod = 10 * time.Hour + defaultResyncPeriod = 10 * time.Hour ) // reconciler describes a generic reconciler within ACK. @@ -70,8 +71,9 @@ type reconciler struct { // object)s and sharing watch and informer queues across those controllers. type resourceReconciler struct { reconciler - rmf acktypes.AWSResourceManagerFactory - rd acktypes.AWSResourceDescriptor + rmf acktypes.AWSResourceManagerFactory + rd acktypes.AWSResourceDescriptor + resyncPeriod time.Duration } // GroupKind returns the string containing the API group and kind reconciled by @@ -887,31 +889,8 @@ func (r *resourceReconciler) handleRequeues( } // The code below only executes for "ConditionTypeResourceSynced" if condition.Status == corev1.ConditionTrue { - if duration := r.rmf.RequeueOnSuccessSeconds(); duration > 0 { - rlog.Debug( - "requeueing resource after resource synced condition true", - ) - return latest, requeue.NeededAfter(nil, time.Duration(duration)*time.Second) - } - // Since RequeueOnSuccessSeconds <= 0, requeue the resource - // with "resyncPeriod" to perform periodic drift detection and - // sync the desired state. - // - // Upstream controller-runtime provides SyncPeriod functionality - // which flushes the go-client cache and triggers Sync for all - // the objects in cache every 10 hours by default. - // - // ACK controller use non-cached client to read objects - // from API Server, hence controller-runtime's SyncPeriod - // functionality does not work. - // https://github.com/aws-controllers-k8s/community/issues/1355 - // - // ACK controllers use api-reader(non-cached client) to avoid - // reading stale copies of ACK resource that can cause - // duplicate resource creation when resource identifier is - // not present in stale copy of resource. - // https://github.com/aws-controllers-k8s/community/issues/894#issuecomment-911876354 - return latest, requeue.NeededAfter(nil, resyncPeriod) + rlog.Debug("requeuing", "after", r.resyncPeriod) + return latest, requeue.NeededAfter(nil, r.resyncPeriod) } else { rlog.Debug( "requeueing resource after finding resource synced condition false", @@ -1103,6 +1082,56 @@ func (r *resourceReconciler) getEndpointURL( return r.cfg.EndpointURL } +// getResyncPeriod returns the period of the recurring reconciler process which ensures the desired +// state of custom resources is maintained. +// It attempts to retrieve the duration from the following sources, in this order: +// 1. A resource-specific reconciliation resync period specified in the reconciliation resync +// configuration map (--reconcile-default-resync-seconds). +// 2. A resource-specific requeue on success period specified by the resource manager factory. +// The resource manager factory is controller-specific, and thus this period is to specified +// by controller authors (using ack-generate). +// 3. The default reconciliation resync period period specified in the controller binary flags. +// (--reconcile-resource-resync-seconds) +// 4. The default resync period defined in the ACK runtime package. Defined in defaultResyncPeriod +// within the same file +// +// Each reconciler has a unique value to use. This function should only be called during the +// instantiation of an AWSResourceReconciler and should not be called during the reconciliation +// function r.Sync +func getResyncPeriod(rmf acktypes.AWSResourceManagerFactory, cfg ackcfg.Config) time.Duration { + // The reconciliation resync period configuration has already been validated as + // a clean map. Therefore, we can safely ignore any errors that may occur while + // parsing it and avoid changing the signature of NewReconcilerWithClient. + drc, _ := cfg.ParseReconcileResourceResyncSeconds() + + // First, try to use a resource-specific resync period if provided in the resource + // resync period configuration. + resourceKind := rmf.ResourceDescriptor().GroupKind().Kind + if duration, ok := drc[strings.ToLower(resourceKind)]; ok && duration > 0 { + return time.Duration(duration) * time.Second + } + + // Second, try to use a resource-specific requeue on success period specified by the + // resource manager factory. This value is set during the code generation of the + // controller and takes precedence over the default resync period period because + // it allows existing controllers that rely on this value to maintain their intended + // behavior. + if duration := rmf.RequeueOnSuccessSeconds(); duration > 0 { + return time.Duration(duration) * time.Second + } + + // Third, try to use the default resync period resync period specified during controller + // start-up. + if cfg.ReconcileDefaultResyncSeconds > 0 { + return time.Duration(cfg.ReconcileDefaultResyncSeconds) * time.Second + } + + // If none of the above values are present or valid, use the default resync period + // defined in the ACK runtime package. Defined in `defaultResyncPeriod` within the + // same file + return defaultResyncPeriod +} + // NewReconciler returns a new reconciler object func NewReconciler( sc acktypes.ServiceController, @@ -1126,16 +1155,23 @@ func NewReconcilerWithClient( metrics *ackmetrics.Metrics, cache ackrtcache.Caches, ) acktypes.AWSResourceReconciler { + rtLog := log.WithName("ackrt") + resyncPeriod := getResyncPeriod(rmf, cfg) + rtLog.V(1).Info("Initiating reconciler", + "reconciler kind", rmf.ResourceDescriptor().GroupKind().Kind, + "resync period seconds", resyncPeriod.Seconds(), + ) return &resourceReconciler{ reconciler: reconciler{ sc: sc, kc: kc, - log: log.WithName("ackrt"), + log: rtLog, cfg: cfg, metrics: metrics, cache: cache, }, - rmf: rmf, - rd: rmf.ResourceDescriptor(), + rmf: rmf, + rd: rmf.ResourceDescriptor(), + resyncPeriod: resyncPeriod, } } diff --git a/pkg/runtime/reconciler_test.go b/pkg/runtime/reconciler_test.go index 9eeac51..53ea0c8 100644 --- a/pkg/runtime/reconciler_test.go +++ b/pkg/runtime/reconciler_test.go @@ -136,6 +136,7 @@ func managerFactoryMocks( rmf := &ackmocks.AWSResourceManagerFactory{} rmf.On("ResourceDescriptor").Return(rd) + rmf.On("RequeueOnSuccessSeconds").Return(0) reg := ackrt.NewRegistry() reg.RegisterResourceManagerFactory(rmf) diff --git a/pkg/runtime/service_controller_test.go b/pkg/runtime/service_controller_test.go index 667d45c..e2bc672 100644 --- a/pkg/runtime/service_controller_test.go +++ b/pkg/runtime/service_controller_test.go @@ -143,6 +143,7 @@ func TestServiceController(t *testing.T) { rmf := &mocks.AWSResourceManagerFactory{} rmf.On("ResourceDescriptor").Return(rd) + rmf.On("RequeueOnSuccessSeconds").Return(0) reg := ackrt.NewRegistry() reg.RegisterResourceManagerFactory(rmf)