Skip to content

Commit 93851e7

Browse files
authored
fix: stop scale loop for pause Scaledbject (issue #4253) (#4550)
Signed-off-by: Tobo Atchou <[email protected]>
1 parent bce3375 commit 93851e7

File tree

9 files changed

+334
-41
lines changed

9 files changed

+334
-41
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ New deprecation(s):
100100
- **General**: Use default metrics provider from sigs.k8s.io/custom-metrics-apiserver ([#4473](https://github.com/kedacore/keda/pull/4473))
101101
- **General**: Refactor several functions for Status & Conditions handling into pkg util functions ([#2906](https://github.com/kedacore/keda/pull/2906))
102102
- **General**: Bump `kubernetes-sigs/controller-runtime` to v0.15.0 and code alignment ([#4582](https://github.com/kedacore/keda/pull/4582))
103+
- **General**: Stop logging errors for paused ScaledObject (with "autoscaling.keda.sh/paused-replicas" annotation) by skipping reconciliation loop for the object (stop the scale loop and delete the HPA) ([#4253](https://github.com/kedacore/keda/pull/4253))
103104

104105
## v2.10.1
105106

apis/keda/v1alpha1/condition_types.go

+32-3
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,19 @@ const (
3232
ConditionActive ConditionType = "Active"
3333
// ConditionFallback specifies that the resource has a fallback active.
3434
ConditionFallback ConditionType = "Fallback"
35+
// ConditionPaused specifies that the resource is paused.
36+
ConditionPaused ConditionType = "Paused"
3537
)
3638

3739
const (
3840
// ScaledObjectConditionReadySucccesReason defines the default Reason for correct ScaledObject
3941
ScaledObjectConditionReadySucccesReason = "ScaledObjectReady"
4042
// ScaledObjectConditionReadySuccessMessage defines the default Message for correct ScaledObject
4143
ScaledObjectConditionReadySuccessMessage = "ScaledObject is defined correctly and is ready for scaling"
44+
// ScaledObjectConditionPausedReason defines the default Reason for paused ScaledObject
45+
ScaledObjectConditionPausedReason = "ScaledObjectPaused"
46+
// ScaledObjectConditionPausedMessage defines the default Message for paused ScaledObject
47+
ScaledObjectConditionPausedMessage = "ScaledObject is paused"
4248
)
4349

4450
// Condition to store the condition state
@@ -70,6 +76,7 @@ func (c *Conditions) AreInitialized() bool {
7076
foundReady := false
7177
foundActive := false
7278
foundFallback := false
79+
foundPaused := false
7380
if *c != nil {
7481
for _, condition := range *c {
7582
if condition.Type == ConditionReady {
@@ -89,14 +96,20 @@ func (c *Conditions) AreInitialized() bool {
8996
break
9097
}
9198
}
99+
for _, condition := range *c {
100+
if condition.Type == ConditionPaused {
101+
foundPaused = true
102+
break
103+
}
104+
}
92105
}
93106

94-
return foundReady && foundActive && foundFallback
107+
return foundReady && foundActive && foundFallback && foundPaused
95108
}
96109

97110
// GetInitializedConditions returns Conditions initialized to the default -> Status: Unknown
98111
func GetInitializedConditions() *Conditions {
99-
return &Conditions{{Type: ConditionReady, Status: metav1.ConditionUnknown}, {Type: ConditionActive, Status: metav1.ConditionUnknown}, {Type: ConditionFallback, Status: metav1.ConditionUnknown}}
112+
return &Conditions{{Type: ConditionReady, Status: metav1.ConditionUnknown}, {Type: ConditionActive, Status: metav1.ConditionUnknown}, {Type: ConditionFallback, Status: metav1.ConditionUnknown}, {Type: ConditionPaused, Status: metav1.ConditionUnknown}}
100113
}
101114

102115
// IsTrue is true if the condition is True
@@ -147,6 +160,14 @@ func (c *Conditions) SetFallbackCondition(status metav1.ConditionStatus, reason
147160
c.setCondition(ConditionFallback, status, reason, message)
148161
}
149162

163+
// SetPausedCondition modifies Paused Condition according to input parameters
164+
func (c *Conditions) SetPausedCondition(status metav1.ConditionStatus, reason string, message string) {
165+
if *c == nil {
166+
c = GetInitializedConditions()
167+
}
168+
c.setCondition(ConditionPaused, status, reason, message)
169+
}
170+
150171
// GetActiveCondition returns Condition of type Active
151172
func (c *Conditions) GetActiveCondition() Condition {
152173
if *c == nil {
@@ -163,14 +184,22 @@ func (c *Conditions) GetReadyCondition() Condition {
163184
return c.getCondition(ConditionReady)
164185
}
165186

166-
// GetFallbackCondition returns Condition of type Ready
187+
// GetFallbackCondition returns Condition of type Fallback
167188
func (c *Conditions) GetFallbackCondition() Condition {
168189
if *c == nil {
169190
c = GetInitializedConditions()
170191
}
171192
return c.getCondition(ConditionFallback)
172193
}
173194

195+
// GetPausedCondition returns Condition of type Paused
196+
func (c *Conditions) GetPausedCondition() Condition {
197+
if *c == nil {
198+
c = GetInitializedConditions()
199+
}
200+
return c.getCondition(ConditionPaused)
201+
}
202+
174203
func (c Conditions) getCondition(conditionType ConditionType) Condition {
175204
for i := range c {
176205
if c[i].Type == conditionType {

apis/keda/v1alpha1/scaledobject_types.go

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status"
3535
// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status"
3636
// +kubebuilder:printcolumn:name="Fallback",type="string",JSONPath=".status.conditions[?(@.type==\"Fallback\")].status"
37+
// +kubebuilder:printcolumn:name="Paused",type="string",JSONPath=".status.conditions[?(@.type==\"Paused\")].status"
3738
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
3839

3940
// ScaledObject is a specification for a ScaledObject resource

config/crd/bases/keda.sh_scaledobjects.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ spec:
4444
- jsonPath: .status.conditions[?(@.type=="Fallback")].status
4545
name: Fallback
4646
type: string
47+
- jsonPath: .status.conditions[?(@.type=="Paused")].status
48+
name: Paused
49+
type: string
4750
- jsonPath: .metadata.creationTimestamp
4851
name: Age
4952
type: date

controllers/keda/hpa.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,21 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger l
183183

184184
// deleteAndCreateHpa delete old HPA and create new one
185185
func (r *ScaledObjectReconciler) renameHPA(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2.HorizontalPodAutoscaler, gvkr *kedav1alpha1.GroupVersionKindResource) error {
186-
logger.Info("Deleting old HPA", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", foundHpa.Name)
186+
if err := r.deleteHPA(ctx, logger, scaledObject, foundHpa); err != nil {
187+
return err
188+
}
189+
return r.createAndDeployNewHPA(ctx, logger, scaledObject, gvkr)
190+
}
191+
192+
// deleteHpa delete existing HPA
193+
func (r *ScaledObjectReconciler) deleteHPA(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2.HorizontalPodAutoscaler) error {
194+
logger.Info("Deleting existing HPA", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", foundHpa.Name)
187195
if err := r.Client.Delete(ctx, foundHpa); err != nil {
188196
logger.Error(err, "Failed to delete old HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
189197
return err
190198
}
191199

192-
return r.createAndDeployNewHPA(ctx, logger, scaledObject, gvkr)
200+
return nil
193201
}
194202

195203
// getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject

controllers/keda/scaledobject_controller.go

+72-22
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
146146
return ctrl.Result{}, nil
147147
}
148148
// Error reading the object - requeue the request.
149-
reqLogger.Error(err, "Failed to get ScaledObject")
149+
reqLogger.Error(err, "failed to get ScaledObject")
150150
return ctrl.Result{}, err
151151
}
152152

@@ -172,9 +172,9 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
172172
}
173173
}
174174

175-
// reconcile ScaledObject and set status appropriately
176-
msg, err := r.reconcileScaledObject(ctx, reqLogger, scaledObject)
177175
conditions := scaledObject.Status.Conditions.DeepCopy()
176+
// reconcile ScaledObject and set status appropriately
177+
msg, err := r.reconcileScaledObject(ctx, reqLogger, scaledObject, &conditions)
178178
if err != nil {
179179
reqLogger.Error(err, msg)
180180
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
@@ -197,7 +197,31 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
197197
}
198198

199199
// reconcileScaledObject implements reconciler logic for ScaledObject
200-
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (string, error) {
200+
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, conditions *kedav1alpha1.Conditions) (string, error) {
201+
// Check the presence of "autoscaling.keda.sh/paused-replicas" annotation on the scaledObject (since the presence of this annotation will pause
202+
// autoscaling no matter what number of replicas is provided), and if so, stop the scale loop and delete the HPA on the scaled object.
203+
_, pausedAnnotationFound := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
204+
if pausedAnnotationFound {
205+
if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
206+
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
207+
}
208+
if scaledObject.Status.PausedReplicaCount != nil {
209+
msg := kedav1alpha1.ScaledObjectConditionPausedMessage
210+
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
211+
msg = "failed to stop the scale loop for paused ScaledObject"
212+
return msg, err
213+
}
214+
if deleted, err := r.ensureHPAForScaledObjectIsDeleted(ctx, logger, scaledObject); !deleted {
215+
msg = "failed to delete HPA for paused ScaledObject"
216+
return msg, err
217+
}
218+
conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg)
219+
return msg, nil
220+
}
221+
} else if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
222+
conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectUnpaused", "pause annotation removed for ScaledObject")
223+
}
224+
201225
// Check scale target Name is specified
202226
if scaledObject.Spec.ScaleTargetRef.Name == "" {
203227
err := fmt.Errorf("ScaledObject.spec.scaleTargetRef.name is missing")
@@ -207,7 +231,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
207231
// Check the label needed for Metrics servers is present on ScaledObject
208232
err := r.ensureScaledObjectLabel(ctx, logger, scaledObject)
209233
if err != nil {
210-
return "Failed to update ScaledObject with scaledObjectName label", err
234+
return "failed to update ScaledObject with scaledObjectName label", err
211235
}
212236

213237
// Check if resource targeted for scaling exists and exposes /scale subresource
@@ -229,7 +253,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
229253
// Create a new HPA or update existing one according to ScaledObject
230254
newHPACreated, err := r.ensureHPAForScaledObjectExists(ctx, logger, scaledObject, &gvkr)
231255
if err != nil {
232-
return "Failed to ensure HPA is correctly created for ScaledObject", err
256+
return "failed to ensure HPA is correctly created for ScaledObject", err
233257
}
234258
scaleObjectSpecChanged := false
235259
if !newHPACreated {
@@ -238,17 +262,21 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
238262
// (we can omit this check if a new HPA was created, which fires new ScaleLoop anyway)
239263
scaleObjectSpecChanged, err = r.scaledObjectGenerationChanged(logger, scaledObject)
240264
if err != nil {
241-
return "Failed to check whether ScaledObject's Generation was changed", err
265+
return "failed to check whether ScaledObject's Generation was changed", err
242266
}
243267
}
244268

245269
// Notify ScaleHandler if a new HPA was created or if ScaledObject was updated
246270
if newHPACreated || scaleObjectSpecChanged {
247271
if r.requestScaleLoop(ctx, logger, scaledObject) != nil {
248-
return "Failed to start a new scale loop with scaling logic", err
272+
return "failed to start a new scale loop with scaling logic", err
249273
}
250274
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
251275
}
276+
277+
if pausedAnnotationFound && conditions.GetPausedCondition().Status != metav1.ConditionTrue {
278+
return "ScaledObject paused replicas are being scaled", fmt.Errorf("ScaledObject paused replicas are being scaled")
279+
}
252280
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
253281
}
254282

@@ -273,7 +301,7 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, lo
273301
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
274302
gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
275303
if err != nil {
276-
logger.Error(err, "Failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
304+
logger.Error(err, "failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
277305
return gvkr, err
278306
}
279307
gvkString := gvkr.GVKString()
@@ -299,11 +327,11 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
299327
unstruct.SetGroupVersionKind(gvkr.GroupVersionKind())
300328
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
301329
// resource doesn't exist
302-
logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
330+
logger.Error(err, "target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
303331
return gvkr, err
304332
}
305333
// resource exist but doesn't expose /scale subresource
306-
logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
334+
logger.Error(errScale, "target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
307335
return gvkr, errScale
308336
}
309337
isScalableCache.Store(gr.String(), true)
@@ -395,12 +423,7 @@ func (r *ScaledObjectReconciler) checkReplicaCountBoundsAreValid(scaledObject *k
395423

396424
// ensureHPAForScaledObjectExists ensures that in cluster exist up-to-date HPA for specified ScaledObject, returns true if a new HPA was created
397425
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (bool, error) {
398-
var hpaName string
399-
if scaledObject.Status.HpaName != "" {
400-
hpaName = scaledObject.Status.HpaName
401-
} else {
402-
hpaName = getHPAName(scaledObject)
403-
}
426+
hpaName := getHPANameOnEnsure(scaledObject)
404427
foundHpa := &autoscalingv2.HorizontalPodAutoscaler{}
405428
// Check if HPA for this ScaledObject already exists
406429
err := r.Client.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
@@ -414,7 +437,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont
414437
// new HPA created successfully -> notify Reconcile function so it could fire a new ScaleLoop
415438
return true, nil
416439
} else if err != nil {
417-
logger.Error(err, "Failed to get HPA from cluster")
440+
logger.Error(err, "failed to get HPA from cluster")
418441
return false, err
419442
}
420443

@@ -431,13 +454,40 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont
431454
// HPA was found -> let's check if we need to update it
432455
err = r.updateHPAIfNeeded(ctx, logger, scaledObject, foundHpa, gvkr)
433456
if err != nil {
434-
logger.Error(err, "Failed to check HPA for possible update")
457+
logger.Error(err, "failed to check HPA for possible update")
435458
return false, err
436459
}
437460

438461
return false, nil
439462
}
440463

464+
// ensureHPAForScaledObjectIsDeleted ensures that in cluster any HPA for specified ScaledObject is deleted, returns true if no HPA exists
465+
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectIsDeleted(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
466+
hpaName := getHPANameOnEnsure(scaledObject)
467+
foundHpa := &autoscalingv2.HorizontalPodAutoscaler{}
468+
// Check if HPA for this ScaledObject already exists
469+
err := r.Client.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
470+
if err != nil && errors.IsNotFound(err) {
471+
return true, nil
472+
} else if err != nil {
473+
logger.Error(err, "failed to get HPA from cluster")
474+
return false, err
475+
}
476+
477+
if err := r.deleteHPA(ctx, logger, scaledObject, foundHpa); err != nil {
478+
logger.Error(err, "failed to delete HPA from cluster")
479+
return false, err
480+
}
481+
return true, nil
482+
}
483+
484+
func getHPANameOnEnsure(scaledObject *kedav1alpha1.ScaledObject) string {
485+
if scaledObject.Status.HpaName != "" {
486+
return scaledObject.Status.HpaName
487+
}
488+
return getHPAName(scaledObject)
489+
}
490+
441491
func isHpaRenamed(scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2.HorizontalPodAutoscaler) bool {
442492
// if HPA name defined in SO -> check if equals to the found HPA
443493
if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig != nil && scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig.Name != "" {
@@ -453,7 +503,7 @@ func (r *ScaledObjectReconciler) requestScaleLoop(ctx context.Context, logger lo
453503

454504
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
455505
if err != nil {
456-
logger.Error(err, "Error getting key for scaledObject")
506+
logger.Error(err, "error getting key for scaledObject")
457507
return err
458508
}
459509

@@ -471,7 +521,7 @@ func (r *ScaledObjectReconciler) requestScaleLoop(ctx context.Context, logger lo
471521
func (r *ScaledObjectReconciler) stopScaleLoop(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
472522
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
473523
if err != nil {
474-
logger.Error(err, "Error getting key for scaledObject")
524+
logger.Error(err, "error getting key for scaledObject")
475525
return err
476526
}
477527

@@ -487,7 +537,7 @@ func (r *ScaledObjectReconciler) stopScaleLoop(ctx context.Context, logger logr.
487537
func (r *ScaledObjectReconciler) scaledObjectGenerationChanged(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
488538
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
489539
if err != nil {
490-
logger.Error(err, "Error getting key for scaledObject")
540+
logger.Error(err, "error getting key for scaledObject")
491541
return true, err
492542
}
493543

0 commit comments

Comments
 (0)