Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

emit events on scaling #82

Merged
merged 5 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions charts/elasti/templates/operator-additional-access-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "list", "watch", "update", "patch", "delete", "create"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch", "update"]
- apiGroups: ["argoproj.io"]
resources: ["rollouts"]
verbs: ["get", "list", "watch", "update", "patch"]
Expand Down
7 changes: 1 addition & 6 deletions operator/internal/elastiserver/elastiServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,11 @@ func (s *Server) scaleTargetForService(ctx context.Context, serviceName, namespa
}
}

if err := s.scaleHandler.ScaleTargetFromZero(namespacedName, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas); err != nil {
if err := s.scaleHandler.ScaleTargetFromZero(namespacedName, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas, crd.CRDName); err != nil {
prom.TargetScaleCounter.WithLabelValues(serviceName, namespace, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, err.Error()).Inc()
return fmt.Errorf("scaleTargetForService - error: %w, targetRefKind: %s, targetRefName: %s", err, crd.Spec.ScaleTargetRef.Kind, crd.Spec.ScaleTargetRef.Name)
}
prom.TargetScaleCounter.WithLabelValues(serviceName, namespace, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, "success").Inc()

if err := s.scaleHandler.UpdateLastScaledUpTime(ctx, crd.CRDName, namespace); err != nil {
// not returning an error as scale up has been successful
s.logger.Error("failed to update LastScaledUpTime", zap.String("namespacedName", namespacedName.String()), zap.Error(err))
}

return nil
}
142 changes: 80 additions & 62 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/truefoundry/elasti/pkg/scaling/scalers"
"github.com/truefoundry/elasti/pkg/values"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -176,7 +176,7 @@ func (h *ScaleHandler) handleScaleToZero(ctx context.Context, es *v1alpha1.Elast
}
}

if err := h.ScaleTargetToZero(namespacedName, es.Spec.ScaleTargetRef.Kind); err != nil {
if err := h.ScaleTargetToZero(namespacedName, es.Spec.ScaleTargetRef.Kind, es.Name); err != nil {
return fmt.Errorf("failed to scale target to zero: %w", err)
}
return nil
Expand Down Expand Up @@ -228,15 +228,10 @@ func (h *ScaleHandler) handleScaleFromZero(ctx context.Context, es *v1alpha1.Ela
}
}

if err := h.ScaleTargetFromZero(namespacedName, es.Spec.ScaleTargetRef.Kind, es.Spec.MinTargetReplicas); err != nil {
if err := h.ScaleTargetFromZero(namespacedName, es.Spec.ScaleTargetRef.Kind, es.Spec.MinTargetReplicas, es.Name); err != nil {
return fmt.Errorf("failed to scale target from zero: %w", err)
}

if err := h.UpdateLastScaledUpTime(ctx, es.Name, es.Namespace); err != nil {
// not returning an error as scale up has been successful
h.logger.Error("failed to update LastScaledUpTime", zap.String("namespacedName", namespacedName.String()), zap.Error(err))
}

return nil
}

Expand All @@ -258,51 +253,74 @@ func (h *ScaleHandler) createScalerForTrigger(trigger *v1alpha1.ScaleTrigger) (s
}

// ScaleTargetFromZero scales the TargetRef to the provided replicas when it's at 0
func (h *ScaleHandler) ScaleTargetFromZero(namespacedName types.NamespacedName, targetKind string, replicas int32) error {
func (h *ScaleHandler) ScaleTargetFromZero(namespacedName types.NamespacedName, targetKind string, replicas int32, elastiServiceName string) error {
mutex := h.getMutexForScale(namespacedName.String())
mutex.Lock()
defer mutex.Unlock()

h.logger.Info("Scaling up from zero", zap.String("kind", targetKind), zap.String("namespacedName", namespacedName.String()), zap.Int32("replicas", replicas))

var err error
switch strings.ToLower(targetKind) {
case values.KindDeployments:
err := h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, replicas)
if err != nil {
return fmt.Errorf("ScaleTargetFromZero - Deployment: %w", err)
}
err = h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, replicas)
case values.KindRollout:
err := h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, replicas)
if err != nil {
return fmt.Errorf("ScaleTargetFromZero - Rollout: %w", err)
}
err = h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, replicas)
default:
return fmt.Errorf("unsupported target kind: %s", targetKind)
}

if err != nil {
eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Warning", "ScaleFromZeroFailed", fmt.Sprintf("Failed to scale %s from zero to %d replicas: %v", targetKind, replicas, err))
if eventErr != nil {
h.logger.Error("Failed to create failure event", zap.Error(eventErr))
}
return fmt.Errorf("ScaleTargetFromZero - %s: %w", targetKind, err)
}

eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Normal", "ScaledUpFromZero", fmt.Sprintf("Successfully scaled %s from zero to %d replicas", targetKind, replicas))
if eventErr != nil {
h.logger.Error("Failed to create success event", zap.Error(eventErr))
}

if err := h.UpdateLastScaledUpTime(context.Background(), elastiServiceName, namespacedName.Namespace); err != nil {
h.logger.Error("Failed to update LastScaledUpTime", zap.Error(err), zap.String("namespacedName", namespacedName.String()))
}

return nil
}

// ScaleTargetToZero scales the target to zero
// TODO: Emit k8s events
func (h *ScaleHandler) ScaleTargetToZero(namespacedName types.NamespacedName, targetKind string) error {
func (h *ScaleHandler) ScaleTargetToZero(namespacedName types.NamespacedName, targetKind string, elastiServiceName string) error {
mutex := h.getMutexForScale(namespacedName.String())
mutex.Lock()
defer mutex.Unlock()

h.logger.Info("Scaling down to zero", zap.String("kind", targetKind), zap.String("namespacedName", namespacedName.String()))

var err error
switch strings.ToLower(targetKind) {
case values.KindDeployments:
err := h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, 0)
if err != nil {
return fmt.Errorf("ScaleTargetToZero - Deployment: %w", err)
}
err = h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, 0)
case values.KindRollout:
err := h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, 0)
if err != nil {
return fmt.Errorf("ScaleTargetToZero - Rollout: %w", err)
}
err = h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, 0)
default:
return fmt.Errorf("unsupported target kind: %s", targetKind)
}

if err != nil {
eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Warning", "ScaleToZeroFailed", fmt.Sprintf("Failed to scale %s to zero: %v", targetKind, err))
if eventErr != nil {
h.logger.Error("Failed to create failure event", zap.Error(eventErr))
}
return fmt.Errorf("ScaleTargetToZero - %s: %w", targetKind, err)
}

eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Normal", "ScaledDownToZero", fmt.Sprintf("Successfully scaled %s to zero", targetKind))
if eventErr != nil {
h.logger.Error("Failed to create success event", zap.Error(eventErr))
}

return nil
}

Expand Down Expand Up @@ -354,53 +372,53 @@ func (h *ScaleHandler) ScaleArgoRollout(ns, targetName string, replicas int32) e
}

func (h *ScaleHandler) UpdateKedaScaledObjectPausedState(ctx context.Context, scaledObjectName, namespace string, paused bool) error {
scaledObject, err := h.kDynamicClient.Resource(values.ScaledObjectGVR).Namespace(namespace).Get(ctx, scaledObjectName, metav1.GetOptions{})
patchBytes := []byte(fmt.Sprintf(`{"metadata": {"annotations": {"%s": "%s"}}}`, kedaPausedAnnotation, strconv.FormatBool(paused)))
_, err := h.kDynamicClient.Resource(values.ScaledObjectGVR).Namespace(namespace).Patch(ctx, scaledObjectName, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("failed to get ScaledObject: %w", err)
}

annotations := scaledObject.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
return fmt.Errorf("failed to patch ScaledObject: %w", err)
}

annotations[kedaPausedAnnotation] = strconv.FormatBool(paused)
scaledObject.SetAnnotations(annotations)

_, err = h.kDynamicClient.Resource(values.ScaledObjectGVR).Namespace(namespace).Update(ctx, scaledObject, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update ScaledObject: %w", err)
}

return nil
}

func (h *ScaleHandler) UpdateLastScaledUpTime(ctx context.Context, crdName, namespace string) error {
elastiService, err := h.kDynamicClient.Resource(values.ElastiServiceGVR).
Namespace(namespace).
Get(ctx, crdName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get ElastiService: %w", err)
}

currentES := &v1alpha1.ElastiService{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(elastiService.Object, currentES); err != nil {
return fmt.Errorf("failed to convert unstructured to ElastiService: %w", err)
}
now := metav1.Now()
currentES.Status.LastScaledUpTime = &now
patchBytes := []byte(fmt.Sprintf(`{"status": {"lastScaledUpTime": "%s"}}`, now.Format(time.RFC3339Nano)))

obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentES)
_, err := h.kDynamicClient.Resource(values.ElastiServiceGVR).
Namespace(namespace).
Patch(ctx, crdName, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
return fmt.Errorf("failed to convert ElastiService to unstructured: %w", err)
return fmt.Errorf("failed to patch ElastiService status: %w", err)
}
return nil
}

_, err = h.kDynamicClient.Resource(values.ElastiServiceGVR).
Namespace(currentES.Namespace).
UpdateStatus(ctx, &unstructured.Unstructured{Object: obj}, metav1.UpdateOptions{})
// createEvent creates a new event on scaling up or down
func (h *ScaleHandler) createEvent(namespace, name, eventType, reason, message string) error {
h.logger.Info("createEvent", zap.String("eventType", eventType), zap.String("reason", reason), zap.String("message", message))
event := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
GenerateName: name + "-",
Namespace: namespace,
},
InvolvedObject: v1.ObjectReference{
APIVersion: "elasti.truefoundry.com/v1alpha1",
Kind: "ElastiService",
Name: name,
Namespace: namespace,
},
Type: eventType, // Normal or Warning
Reason: reason,
Message: message,
Action: "Scale",
Source: v1.EventSource{
Component: "elasti-operator",
},
}

_, err := h.kClient.CoreV1().Events(namespace).Create(context.TODO(), event, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to update ElastiService status: %w", err)
return fmt.Errorf("failed to create event: %w", err)
}

return nil
}
Loading