Skip to content

Commit 937cb53

Browse files
committed
fix: stop scale loop for pause Scaledbject (issue #4253)
Signed-off-by: Tobo Atchou <[email protected]>
1 parent fc3dad7 commit 937cb53

File tree

4 files changed

+159
-30
lines changed

4 files changed

+159
-30
lines changed

controllers/keda/scaledobject_controller.go

+22-14
Original file line numberDiff line numberDiff line change
@@ -200,22 +200,26 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
200200
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, conditions *kedav1alpha1.Conditions) (string, error) {
201201
// Check the presence of "autoscaling.keda.sh/paused-replicas" annotation on the scaledObject (since the presence of this annotation will pause
202202
// autoscaling no matter what number of replicas is provided), and if so, stop the scale loop and delete the HPA on the scaled object.
203-
_, paused := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
204-
if paused {
205-
logger.Info("ScaledObject is paused, so skipping the request.")
206-
msg := kedav1alpha1.ScaledObjectConditionPausedMessage
207-
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
208-
msg = "failed to stop the scale loop for paused ScaledObject"
209-
conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectStopScaleLoopFailed", msg)
210-
return msg, err
203+
_, pausedAnnotationFound := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
204+
if pausedAnnotationFound {
205+
if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
206+
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
211207
}
212-
if deleted, err := r.ensureHPAForScaledObjectIsDeleted(ctx, logger, scaledObject); !deleted {
213-
msg = "failed to delete HPA for paused ScaledObject"
214-
conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectHPADeleteFailed", msg)
215-
return msg, err
208+
if scaledObject.Status.PausedReplicaCount != nil {
209+
msg := kedav1alpha1.ScaledObjectConditionPausedMessage
210+
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
211+
msg = "failed to stop the scale loop for paused ScaledObject"
212+
return msg, err
213+
}
214+
if deleted, err := r.ensureHPAForScaledObjectIsDeleted(ctx, logger, scaledObject); !deleted {
215+
msg = "failed to delete HPA for paused ScaledObject"
216+
return msg, err
217+
}
218+
conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg)
219+
return msg, nil
216220
}
217-
conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg)
218-
return msg, nil
221+
} else if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
222+
conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectUnpaused", "pause annotation removed for ScaledObject")
219223
}
220224

221225
// Check scale target Name is specified
@@ -269,6 +273,10 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
269273
}
270274
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
271275
}
276+
277+
if pausedAnnotationFound && conditions.GetPausedCondition().Status != metav1.ConditionTrue {
278+
return "ScaledObject paused replicas are being scaled", fmt.Errorf("ScaledObject paused replicas are being scaled")
279+
}
272280
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
273281
}
274282

controllers/keda/scaledobject_controller_test.go

+122-8
Original file line numberDiff line numberDiff line change
@@ -843,8 +843,101 @@ var _ = Describe("ScaledObjectController", func() {
843843
}, 1*time.Minute).Should(Equal(2))
844844
})
845845

846+
// Fix issue 4253
847+
It("scaledobject paused condition status changes to true on annotation", func() {
848+
// Create the scaling target.
849+
deploymentName := "toggled-to-paused-annotation-name"
850+
soName := "so-" + deploymentName
851+
err := k8sClient.Create(context.Background(), generateDeployment(deploymentName))
852+
Expect(err).ToNot(HaveOccurred())
853+
854+
// Create the ScaledObject without specifying name.
855+
so := &kedav1alpha1.ScaledObject{
856+
ObjectMeta: metav1.ObjectMeta{
857+
Name: soName,
858+
Namespace: "default",
859+
},
860+
Spec: kedav1alpha1.ScaledObjectSpec{
861+
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
862+
Name: deploymentName,
863+
},
864+
Advanced: &kedav1alpha1.AdvancedConfig{
865+
HorizontalPodAutoscalerConfig: &kedav1alpha1.HorizontalPodAutoscalerConfig{},
866+
},
867+
Triggers: []kedav1alpha1.ScaleTriggers{
868+
{
869+
Type: "cron",
870+
Metadata: map[string]string{
871+
"timezone": "UTC",
872+
"start": "0 * * * *",
873+
"end": "1 * * * *",
874+
"desiredReplicas": "1",
875+
},
876+
},
877+
},
878+
},
879+
}
880+
pollingInterval := int32(5)
881+
so.Spec.PollingInterval = &pollingInterval
882+
err = k8sClient.Create(context.Background(), so)
883+
Expect(err).ToNot(HaveOccurred())
884+
885+
// And validate that hpa is created.
886+
hpa := &autoscalingv2.HorizontalPodAutoscaler{}
887+
Eventually(func() error {
888+
return k8sClient.Get(context.Background(), types.NamespacedName{Name: fmt.Sprintf("keda-hpa-%s", soName), Namespace: "default"}, hpa)
889+
}).ShouldNot(HaveOccurred())
890+
891+
// wait so's ready condition Ready
892+
Eventually(func() metav1.ConditionStatus {
893+
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
894+
if err != nil {
895+
return metav1.ConditionUnknown
896+
}
897+
return so.Status.Conditions.GetReadyCondition().Status
898+
}).Should(Equal(metav1.ConditionTrue))
899+
900+
Eventually(func() metav1.ConditionStatus {
901+
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
902+
if err != nil {
903+
return metav1.ConditionTrue
904+
}
905+
return so.Status.Conditions.GetPausedCondition().Status
906+
}, 5*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown)))
907+
908+
// set annotation
909+
Eventually(func() error {
910+
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
911+
Expect(err).ToNot(HaveOccurred())
912+
annotations := make(map[string]string)
913+
annotations[kedacontrollerutil.PausedReplicasAnnotation] = "1"
914+
so.SetAnnotations(annotations)
915+
pollingInterval := int32(6)
916+
so.Spec.PollingInterval = &pollingInterval
917+
return k8sClient.Update(context.Background(), so)
918+
}).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).ShouldNot(HaveOccurred())
919+
testLogger.Info("annotation is set")
920+
921+
// validate annotation is set correctly
922+
Eventually(func() bool {
923+
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
924+
Expect(err).ToNot(HaveOccurred())
925+
_, paused := so.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
926+
return paused
927+
}).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue())
928+
929+
Eventually(func() metav1.ConditionStatus {
930+
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
931+
if err != nil {
932+
return metav1.ConditionUnknown
933+
}
934+
return so.Status.Conditions.GetPausedCondition().Status
935+
}).WithTimeout(2 * time.Minute).WithPolling(10 * time.Second).Should(Equal(metav1.ConditionTrue))
936+
})
937+
846938
// Fix issue 4253
847939
It("deletes hpa when scaledobject has pause annotation", func() {
940+
pausedReplicasCountForAnnotation := "1"
848941
// Create the scaling target.
849942
deploymentName := "to-be-paused-name"
850943
soName := "so-" + deploymentName
@@ -857,7 +950,7 @@ var _ = Describe("ScaledObjectController", func() {
857950
Name: soName,
858951
Namespace: "default",
859952
Annotations: map[string]string{
860-
kedacontrollerutil.PausedReplicasAnnotation: "0",
953+
kedacontrollerutil.PausedReplicasAnnotation: pausedReplicasCountForAnnotation,
861954
},
862955
},
863956
Spec: kedav1alpha1.ScaledObjectSpec{
@@ -880,6 +973,8 @@ var _ = Describe("ScaledObjectController", func() {
880973
},
881974
},
882975
}
976+
pollingInterval := int32(5)
977+
so.Spec.PollingInterval = &pollingInterval
883978
err = k8sClient.Create(context.Background(), so)
884979
Expect(err).ToNot(HaveOccurred())
885980

@@ -895,14 +990,27 @@ var _ = Describe("ScaledObjectController", func() {
895990
// validate annotation is set correctly
896991
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
897992
Expect(err).ToNot(HaveOccurred())
898-
_, paused := so.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
993+
pausedReplicasCount, paused := so.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
899994
Expect(paused).To(Equal(true))
995+
Expect(pausedReplicasCount).To(Equal(pausedReplicasCountForAnnotation))
900996

901-
// And validate that hpa is deleted.
902-
hpa := &autoscalingv2.HorizontalPodAutoscaler{}
903-
Eventually(func() error {
904-
return k8sClient.Get(context.Background(), types.NamespacedName{Name: fmt.Sprintf("keda-hpa-%s", soName), Namespace: "default"}, hpa)
905-
}).Should(HaveOccurred())
997+
// wait so's ready condition Ready
998+
Eventually(func() metav1.ConditionStatus {
999+
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
1000+
if err != nil {
1001+
return metav1.ConditionUnknown
1002+
}
1003+
return so.Status.Conditions.GetReadyCondition().Status
1004+
}).Should(Equal(metav1.ConditionTrue))
1005+
1006+
// wait so's paused condition True
1007+
Eventually(func() metav1.ConditionStatus {
1008+
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
1009+
if err != nil {
1010+
return metav1.ConditionUnknown
1011+
}
1012+
return so.Status.Conditions.GetPausedCondition().Status
1013+
}).Should(Equal(metav1.ConditionTrue))
9061014

9071015
// wait so's Paused condition true
9081016
Eventually(func() metav1.ConditionStatus {
@@ -911,7 +1019,13 @@ var _ = Describe("ScaledObjectController", func() {
9111019
return metav1.ConditionUnknown
9121020
}
9131021
return so.Status.Conditions.GetPausedCondition().Status
914-
}, 1*time.Minute).Should(Equal(metav1.ConditionTrue))
1022+
}).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).Should(Equal(metav1.ConditionTrue))
1023+
1024+
// And validate that hpa is deleted.
1025+
hpa := &autoscalingv2.HorizontalPodAutoscaler{}
1026+
Eventually(func() error {
1027+
return k8sClient.Get(context.Background(), types.NamespacedName{Name: fmt.Sprintf("keda-hpa-%s", soName), Namespace: "default"}, hpa)
1028+
}).Should(HaveOccurred())
9151029
})
9161030
})
9171031

pkg/scaling/executor/scale_scaledobjects.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
3838
logger := e.logger.WithValues("scaledobject.Name", scaledObject.Name,
3939
"scaledObject.Namespace", scaledObject.Namespace,
4040
"scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)
41-
4241
// Get the current replica count. As a special case, Deployments and StatefulSets fetch directly from the object so they can use the informer cache
4342
// to reduce API calls. Everything else uses the scale subresource.
4443
var currentScale *autoscalingv1.Scale
@@ -71,7 +70,6 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
7170
}
7271
currentReplicas = currentScale.Spec.Replicas
7372
}
74-
7573
// if the ScaledObject's triggers aren't in the error state,
7674
// but ScaledObject.Status.ReadyCondition is set not set to 'true' -> set it back to 'true'
7775
readyCondition := scaledObject.Status.Conditions.GetReadyCondition()
@@ -92,7 +90,6 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
9290
logger.Error(err, "error getting the paused replica count on the current ScaledObject.")
9391
return
9492
}
95-
9693
status := scaledObject.Status.DeepCopy()
9794
if pausedCount != nil {
9895
// Scale the target to the paused replica count
@@ -106,6 +103,8 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
106103
}
107104
return
108105
}
106+
}
107+
if *pausedCount != currentReplicas || status.PausedReplicaCount == nil {
109108
status.PausedReplicaCount = pausedCount
110109
err = kedautil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
111110
if err != nil {

tests/internals/pause_scaling/pause_scaling_test.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ var (
2626
scaledObjectName = fmt.Sprintf("%s-so", testName)
2727
maxReplicaCount = 1
2828
minReplicaCount = 0
29-
testScaleOutWaitMin = 5
30-
testPauseAtNWaitMin = 5
31-
testScaleInWaitMin = 5
29+
testScaleOutWaitMin = 1
30+
testPauseAtNWaitMin = 1
31+
testScaleInWaitMin = 1
3232
)
3333

3434
type templateData struct {
@@ -37,6 +37,7 @@ type templateData struct {
3737
ScaledObjectName string
3838
MonitoredDeploymentName string
3939
PausedReplicaCount int
40+
CooldownPeriod int
4041
}
4142

4243
const (
@@ -98,7 +99,7 @@ spec:
9899
pollingInterval: 5
99100
minReplicaCount: 0
100101
maxReplicaCount: 1
101-
cooldownPeriod: 10
102+
cooldownPeriod: {{.CooldownPeriod}}
102103
triggers:
103104
- type: kubernetes-workload
104105
metadata:
@@ -120,7 +121,7 @@ spec:
120121
pollingInterval: 5
121122
minReplicaCount: 0
122123
maxReplicaCount: 1
123-
cooldownPeriod: 10
124+
cooldownPeriod: {{.CooldownPeriod}}
124125
triggers:
125126
- type: kubernetes-workload
126127
metadata:
@@ -160,6 +161,7 @@ func getTemplateData() (templateData, []Template) {
160161
ScaledObjectName: scaledObjectName,
161162
MonitoredDeploymentName: monitoredDeploymentName,
162163
PausedReplicaCount: 0,
164+
CooldownPeriod: 10,
163165
}, []Template{
164166
{Name: "deploymentTemplate", Config: deploymentTemplate},
165167
{Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate},
@@ -178,15 +180,20 @@ func testPauseAt0(t *testing.T, kc *kubernetes.Clientset) {
178180

179181
func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) {
180182
t.Log("--- testing scale out ---")
183+
data.CooldownPeriod = 15
181184
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
182185

186+
assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, monitoredDeploymentName, testNamespace, 2, 60, testScaleOutWaitMin),
187+
"monitoredDeploymentName replica count should be 2 after %d minute(s)", testScaleOutWaitMin)
188+
183189
assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, testScaleOutWaitMin),
184190
"replica count should be 1 after %d minute(s)", testScaleOutWaitMin)
185191
}
186192

187193
func testPauseAtN(t *testing.T, kc *kubernetes.Clientset, data templateData, n int) {
188194
t.Log("--- testing pausing at N ---")
189195
data.PausedReplicaCount = n
196+
data.CooldownPeriod = 10
190197
KubectlApplyWithTemplate(t, data, "scaledObjectAnnotatedTemplate", scaledObjectAnnotatedTemplate)
191198

192199
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 0, testNamespace)
@@ -200,6 +207,7 @@ func testPauseAtN(t *testing.T, kc *kubernetes.Clientset, data templateData, n i
200207

201208
func testScaleIn(t *testing.T, kc *kubernetes.Clientset, data templateData) {
202209
t.Log("--- testing scale in ---")
210+
data.CooldownPeriod = 15
203211
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
204212

205213
assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, testScaleInWaitMin),

0 commit comments

Comments
 (0)