From 731f5c3f632c60e5a00d7632c63de4bf0123e906 Mon Sep 17 00:00:00 2001 From: jwierzbo Date: Wed, 22 Jun 2022 20:36:26 +0200 Subject: [PATCH 1/7] Gt-23 ArangoTask handler --- pkg/apis/deployment/v1/arango_task_spec.go | 14 +++- pkg/apis/deployment/v1/arango_task_status.go | 10 ++- pkg/apis/deployment/v1/deployment_status.go | 3 + pkg/apis/deployment/v1/plan.go | 11 +++ .../deployment/v1/zz_generated.deepcopy.go | 45 +++++++++- .../deployment/v2alpha1/arango_task_spec.go | 14 +++- .../deployment/v2alpha1/arango_task_status.go | 10 ++- .../deployment/v2alpha1/deployment_status.go | 3 + pkg/apis/deployment/v2alpha1/plan.go | 11 +++ .../v2alpha1/zz_generated.deepcopy.go | 45 +++++++++- pkg/deployment/context_impl.go | 75 +++++++++++------ pkg/deployment/reconcile/action_ping.go | 56 +++++++++++++ pkg/deployment/reconcile/context.go | 2 + pkg/deployment/reconcile/plan_builder.go | 6 +- pkg/deployment/reconcile/plan_builder_task.go | 69 ++++++++++++++++ pkg/deployment/reconcile/plan_builder_test.go | 5 ++ pkg/deployment/reconcile/plan_executor.go | 82 +++++++++++++++++++ pkg/deployment/reconcile/timeouts.go | 17 ++-- pkg/deployment/resources/inspector/at_v1.go | 11 +++ .../k8sutil/inspector/arangotask/v1/loader.go | 3 + 20 files changed, 440 insertions(+), 52 deletions(-) create mode 100644 pkg/deployment/reconcile/action_ping.go create mode 100644 pkg/deployment/reconcile/plan_builder_task.go diff --git a/pkg/apis/deployment/v1/arango_task_spec.go b/pkg/apis/deployment/v1/arango_task_spec.go index fbdec3d4d..43b2eb3e0 100644 --- a/pkg/apis/deployment/v1/arango_task_spec.go +++ b/pkg/apis/deployment/v1/arango_task_spec.go @@ -24,6 +24,11 @@ import "encoding/json" type ArangoTaskType string +const ( + // ArangoTaskPingType send ping to the db server + ArangoTaskPingType ArangoTaskType = "Ping" +) + type ArangoTaskDetails []byte func (a ArangoTaskDetails) MarshalJSON() ([]byte, error) { @@ -68,7 +73,12 @@ var _ json.Unmarshaler = &ArangoTaskDetails{} var _ json.Marshaler = ArangoTaskDetails{} type ArangoTaskSpec struct { - Type ArangoTaskType `json:"type,omitempty"` + Type ArangoTaskType `json:"type,required"` + DeploymentName string `json:"deploymentName,required"` + Details ArangoTaskDetails `json:"details,omitempty"` +} - Details ArangoTaskDetails `json:"details,omitempty"` +type ArangoTaskPing struct { + DurationSeconds int `json:"durationSeconds"` + Counts int `json:"counts"` } diff --git a/pkg/apis/deployment/v1/arango_task_status.go b/pkg/apis/deployment/v1/arango_task_status.go index a793b0dc0..1ca88e7cf 100644 --- a/pkg/apis/deployment/v1/arango_task_status.go +++ b/pkg/apis/deployment/v1/arango_task_status.go @@ -31,8 +31,12 @@ const ( ) type ArangoTaskStatus struct { - AcceptedSpec *ArangoTaskSpec `json:"acceptedSpec,omitempty"` + AcceptedSpec *ArangoTaskSpec `json:"acceptedSpec,omitempty"` + State ArangoTaskState `json:"state,omitempty"` + ActionsState []ArangoActionState `json:"actionsState,omitempty"` +} - State ArangoTaskState `json:"state,omitempty"` - Details ArangoTaskDetails `json:"details,omitempty"` +type ArangoActionState struct { + ActionId string `json:"actionId,omitempty"` + State ArangoTaskState `json:"state,omitempty"` } diff --git a/pkg/apis/deployment/v1/deployment_status.go b/pkg/apis/deployment/v1/deployment_status.go index 49ae26061..cb2decf32 100644 --- a/pkg/apis/deployment/v1/deployment_status.go +++ b/pkg/apis/deployment/v1/deployment_status.go @@ -67,6 +67,9 @@ type DeploymentStatus struct { // ResourcesPlan to update this deployment. Executed before plan, after highPlan ResourcesPlan Plan `json:"resourcesPlan,omitempty"` + // TaskPlan to update this deployment. Executed as the last one + TaskPlan Plan `json:"taskPlan,omitempty"` + // AcceptedSpec contains the last specification that was accepted by the operator. AcceptedSpec *DeploymentSpec `json:"accepted-spec,omitempty"` diff --git a/pkg/apis/deployment/v1/plan.go b/pkg/apis/deployment/v1/plan.go index 1d09b37b9..f9842f8d7 100644 --- a/pkg/apis/deployment/v1/plan.go +++ b/pkg/apis/deployment/v1/plan.go @@ -195,6 +195,11 @@ const ( // Resources ActionTypeResourceSync ActionType = "ResourceSync" + + // ArangoTask actions + + // ActionTypePing it a mock to check if the action flow is working + ActionTypePing ActionType = "Ping" ) const ( @@ -327,6 +332,12 @@ func (a Action) SetImage(image string) Action { return a } +// SetTaskID sets the TaskID field to the given value and returns the modified action. +func (a Action) SetTaskID(taskID types.UID) Action { + a.TaskID = taskID + return a +} + // IsStarted returns true if the action has been started already. func (a Action) IsStarted() bool { return !a.StartTime.IsZero() diff --git a/pkg/apis/deployment/v1/zz_generated.deepcopy.go b/pkg/apis/deployment/v1/zz_generated.deepcopy.go index de901a5ae..210486767 100644 --- a/pkg/apis/deployment/v1/zz_generated.deepcopy.go +++ b/pkg/apis/deployment/v1/zz_generated.deepcopy.go @@ -91,6 +91,22 @@ func (in ActionTimeouts) DeepCopy() ActionTimeouts { return *out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ArangoActionState) DeepCopyInto(out *ArangoActionState) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArangoActionState. +func (in *ArangoActionState) DeepCopy() *ArangoActionState { + if in == nil { + return nil + } + out := new(ArangoActionState) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ArangoClusterSynchronization) DeepCopyInto(out *ArangoClusterSynchronization) { *out = *in @@ -660,6 +676,22 @@ func (in *ArangoTaskList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ArangoTaskPing) DeepCopyInto(out *ArangoTaskPing) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArangoTaskPing. +func (in *ArangoTaskPing) DeepCopy() *ArangoTaskPing { + if in == nil { + return nil + } + out := new(ArangoTaskPing) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ArangoTaskSpec) DeepCopyInto(out *ArangoTaskSpec) { *out = *in @@ -689,9 +721,9 @@ func (in *ArangoTaskStatus) DeepCopyInto(out *ArangoTaskStatus) { *out = new(ArangoTaskSpec) (*in).DeepCopyInto(*out) } - if in.Details != nil { - in, out := &in.Details, &out.Details - *out = make(ArangoTaskDetails, len(*in)) + if in.ActionsState != nil { + in, out := &in.ActionsState, &out.ActionsState + *out = make([]ArangoActionState, len(*in)) copy(*out, *in) } return @@ -1140,6 +1172,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.TaskPlan != nil { + in, out := &in.TaskPlan, &out.TaskPlan + *out = make(Plan, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.AcceptedSpec != nil { in, out := &in.AcceptedSpec, &out.AcceptedSpec *out = new(DeploymentSpec) diff --git a/pkg/apis/deployment/v2alpha1/arango_task_spec.go b/pkg/apis/deployment/v2alpha1/arango_task_spec.go index b2f884626..40a09cfea 100644 --- a/pkg/apis/deployment/v2alpha1/arango_task_spec.go +++ b/pkg/apis/deployment/v2alpha1/arango_task_spec.go @@ -24,6 +24,11 @@ import "encoding/json" type ArangoTaskType string +const ( + // ArangoTaskPingType send ping to the db server + ArangoTaskPingType ArangoTaskType = "Ping" +) + type ArangoTaskDetails []byte func (a ArangoTaskDetails) MarshalJSON() ([]byte, error) { @@ -68,7 +73,12 @@ var _ json.Unmarshaler = &ArangoTaskDetails{} var _ json.Marshaler = ArangoTaskDetails{} type ArangoTaskSpec struct { - Type ArangoTaskType `json:"type,omitempty"` + Type ArangoTaskType `json:"type,required"` + DeploymentName string `json:"deploymentName,required"` + Details ArangoTaskDetails `json:"details,omitempty"` +} - Details ArangoTaskDetails `json:"details,omitempty"` +type ArangoTaskPing struct { + DurationSeconds int `json:"durationSeconds"` + Counts int `json:"counts"` } diff --git a/pkg/apis/deployment/v2alpha1/arango_task_status.go b/pkg/apis/deployment/v2alpha1/arango_task_status.go index c333beea7..a63816ef5 100644 --- a/pkg/apis/deployment/v2alpha1/arango_task_status.go +++ b/pkg/apis/deployment/v2alpha1/arango_task_status.go @@ -31,8 +31,12 @@ const ( ) type ArangoTaskStatus struct { - AcceptedSpec *ArangoTaskSpec `json:"acceptedSpec,omitempty"` + AcceptedSpec *ArangoTaskSpec `json:"acceptedSpec,omitempty"` + State ArangoTaskState `json:"state,omitempty"` + ActionsState []ArangoActionState `json:"actionsState,omitempty"` +} - State ArangoTaskState `json:"state,omitempty"` - Details ArangoTaskDetails `json:"details,omitempty"` +type ArangoActionState struct { + ActionId string `json:"actionId,omitempty"` + State ArangoTaskState `json:"state,omitempty"` } diff --git a/pkg/apis/deployment/v2alpha1/deployment_status.go b/pkg/apis/deployment/v2alpha1/deployment_status.go index 770c22ff9..d6115e501 100644 --- a/pkg/apis/deployment/v2alpha1/deployment_status.go +++ b/pkg/apis/deployment/v2alpha1/deployment_status.go @@ -67,6 +67,9 @@ type DeploymentStatus struct { // ResourcesPlan to update this deployment. Executed before plan, after highPlan ResourcesPlan Plan `json:"resourcesPlan,omitempty"` + // TaskPlan to update this deployment. Executed as the last one + TaskPlan Plan `json:"taskPlan,omitempty"` + // AcceptedSpec contains the last specification that was accepted by the operator. AcceptedSpec *DeploymentSpec `json:"accepted-spec,omitempty"` diff --git a/pkg/apis/deployment/v2alpha1/plan.go b/pkg/apis/deployment/v2alpha1/plan.go index 527bd8aac..c0567ada5 100644 --- a/pkg/apis/deployment/v2alpha1/plan.go +++ b/pkg/apis/deployment/v2alpha1/plan.go @@ -195,6 +195,11 @@ const ( // Resources ActionTypeResourceSync ActionType = "ResourceSync" + + // ArangoTask actions + + // ActionTypePing it a mock to check if the action flow is working + ActionTypePing ActionType = "Ping" ) const ( @@ -327,6 +332,12 @@ func (a Action) SetImage(image string) Action { return a } +// SetTaskID sets the TaskID field to the given value and returns the modified action. +func (a Action) SetTaskID(taskID types.UID) Action { + a.TaskID = taskID + return a +} + // IsStarted returns true if the action has been started already. func (a Action) IsStarted() bool { return !a.StartTime.IsZero() diff --git a/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go b/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go index e2ac79bf7..ab91a257d 100644 --- a/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go @@ -91,6 +91,22 @@ func (in ActionTimeouts) DeepCopy() ActionTimeouts { return *out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ArangoActionState) DeepCopyInto(out *ArangoActionState) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArangoActionState. +func (in *ArangoActionState) DeepCopy() *ArangoActionState { + if in == nil { + return nil + } + out := new(ArangoActionState) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ArangoClusterSynchronization) DeepCopyInto(out *ArangoClusterSynchronization) { *out = *in @@ -660,6 +676,22 @@ func (in *ArangoTaskList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ArangoTaskPing) DeepCopyInto(out *ArangoTaskPing) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArangoTaskPing. +func (in *ArangoTaskPing) DeepCopy() *ArangoTaskPing { + if in == nil { + return nil + } + out := new(ArangoTaskPing) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ArangoTaskSpec) DeepCopyInto(out *ArangoTaskSpec) { *out = *in @@ -689,9 +721,9 @@ func (in *ArangoTaskStatus) DeepCopyInto(out *ArangoTaskStatus) { *out = new(ArangoTaskSpec) (*in).DeepCopyInto(*out) } - if in.Details != nil { - in, out := &in.Details, &out.Details - *out = make(ArangoTaskDetails, len(*in)) + if in.ActionsState != nil { + in, out := &in.ActionsState, &out.ActionsState + *out = make([]ArangoActionState, len(*in)) copy(*out, *in) } return @@ -1140,6 +1172,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.TaskPlan != nil { + in, out := &in.TaskPlan, &out.TaskPlan + *out = make(Plan, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.AcceptedSpec != nil { in, out := &in.AcceptedSpec, &out.AcceptedSpec *out = new(DeploymentSpec) diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index b94cead4e..a29463588 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -25,47 +25,33 @@ import ( "crypto/tls" "net" nhttp "net/http" + "sort" "strconv" "time" - "github.com/arangodb/kube-arangodb/pkg/util/globals" - - "k8s.io/apimachinery/pkg/types" - - "github.com/arangodb/kube-arangodb/pkg/deployment/patch" - - "github.com/arangodb/kube-arangodb/pkg/deployment/reconcile" - - "github.com/arangodb/kube-arangodb/pkg/util/errors" - inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" - - "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" - - "github.com/arangodb/kube-arangodb/pkg/operator/scope" - - "github.com/arangodb/kube-arangodb/pkg/deployment/features" - - "github.com/arangodb/go-driver/http" - "github.com/arangodb/go-driver/jwt" - "github.com/arangodb/kube-arangodb/pkg/deployment/pod" - "github.com/arangodb/kube-arangodb/pkg/util/constants" - - apiErrors "k8s.io/apimachinery/pkg/api/errors" - - core "k8s.io/api/core/v1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/arangodb/arangosync-client/client" "github.com/arangodb/arangosync-client/tasks" driver "github.com/arangodb/go-driver" "github.com/arangodb/go-driver/agency" + "github.com/arangodb/go-driver/http" + "github.com/arangodb/go-driver/jwt" backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/apis/shared" "github.com/arangodb/kube-arangodb/pkg/deployment/acs/sutil" + "github.com/arangodb/kube-arangodb/pkg/deployment/features" + "github.com/arangodb/kube-arangodb/pkg/deployment/patch" + "github.com/arangodb/kube-arangodb/pkg/deployment/pod" + "github.com/arangodb/kube-arangodb/pkg/deployment/reconcile" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" "github.com/arangodb/kube-arangodb/pkg/deployment/resources" + "github.com/arangodb/kube-arangodb/pkg/operator/scope" + "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" + "github.com/arangodb/kube-arangodb/pkg/util/constants" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/globals" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" persistentvolumeclaimv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/persistentvolumeclaim/v1" podv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/pod/v1" poddisruptionbudgetv1beta1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1beta1" @@ -74,7 +60,12 @@ import ( serviceaccountv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/serviceaccount/v1" servicemonitorv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/servicemonitor/v1" "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/rs/zerolog/log" + core "k8s.io/api/core/v1" + apiErrors "k8s.io/apimachinery/pkg/api/errors" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) var _ resources.Context = &Deployment{} @@ -87,6 +78,36 @@ func (d *Deployment) GetBackup(ctx context.Context, backup string) (*backupApi.A return d.deps.Client.Arango().BackupV1().ArangoBackups(d.Namespace()).Get(ctxChild, backup, meta.GetOptions{}) } +// GetNextTask returns the next unprocessed task for the deployment. +func (d *Deployment) GetNextTask(ctx context.Context) (*api.ArangoTask, error) { + tasksCache, err := d.acs.Cache().ArangoTask().V1() + if err != nil { + d.log.Err(err).Error("Failed to get ArangoTask cache") + return nil, err + } + + nextTaskFilter := func(at *api.ArangoTask) bool { + if at.Spec.DeploymentName == d.name && at.Status.State == api.ArangoTaskUnknownState { + return true + } + return false + } + tasks := tasksCache.Filter(nextTaskFilter) + + if len(tasks) == 0 { + return nil, nil + } else if len(tasks) == 1 { + return tasks[0], nil + } else { + // find the oldest task + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].CreationTimestamp.Before(&tasks[j].CreationTimestamp) + }) + + return tasks[0], nil + } +} + // GetAPIObject returns the deployment as k8s object. func (d *Deployment) GetAPIObject() k8sutil.APIObject { return d.apiObject diff --git a/pkg/deployment/reconcile/action_ping.go b/pkg/deployment/reconcile/action_ping.go new file mode 100644 index 000000000..11952a700 --- /dev/null +++ b/pkg/deployment/reconcile/action_ping.go @@ -0,0 +1,56 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package reconcile + +import ( + "context" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" +) + +func init() { + registerAction(api.ActionTypePing, newPing, pingTimeout) +} + +func newPing(action api.Action, actionCtx ActionContext) Action { + a := &pingDbServerAction{} + + a.actionImpl = newActionImplDefRef(action, actionCtx) + + return a +} + +type pingDbServerAction struct { + actionImpl +} + +func (a *pingDbServerAction) Start(ctx context.Context) (bool, error) { + if a.action.TaskID == "" { + a.log.Error("taskName parameter is missing") + return true, nil + } + + return false, nil +} + +func (a *pingDbServerAction) CheckProgress(ctx context.Context) (bool, bool, error) { + return true, false, nil +} \ No newline at end of file diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index 3caac6c10..f4d80bcb8 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -63,4 +63,6 @@ type Context interface { GetBackup(ctx context.Context, backup string) (*backupApi.ArangoBackup, error) // GetAuthentication return authentication for members GetAuthentication() conn.Auth + // GetNextTask returns the next task to be executed + GetNextTask(ctx context.Context) (*api.ArangoTask, error) } diff --git a/pkg/deployment/reconcile/plan_builder.go b/pkg/deployment/reconcile/plan_builder.go index 691265380..bf8793218 100644 --- a/pkg/deployment/reconcile/plan_builder.go +++ b/pkg/deployment/reconcile/plan_builder.go @@ -40,5 +40,9 @@ const ( // get the status in line with the specification. // If a plan already exists, nothing is done. func (d *Reconciler) CreatePlan(ctx context.Context) (error, bool) { - return d.generatePlan(ctx, d.generatePlanFunc(d.createHighPlan, plannerHigh{}), d.generatePlanFunc(d.createResourcesPlan, plannerResources{}), d.generatePlanFunc(d.createNormalPlan, plannerNormal{})) + return d.generatePlan(ctx, d.generatePlanFunc(d.createHighPlan, plannerHigh{}), + d.generatePlanFunc(d.createResourcesPlan, plannerResources{}), + d.generatePlanFunc(d.createNormalPlan, plannerNormal{}), + d.generatePlanFunc(d.createTaskPlan, plannerTask{}), + ) } diff --git a/pkg/deployment/reconcile/plan_builder_task.go b/pkg/deployment/reconcile/plan_builder_task.go new file mode 100644 index 000000000..7349a90b2 --- /dev/null +++ b/pkg/deployment/reconcile/plan_builder_task.go @@ -0,0 +1,69 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package reconcile + +import ( + "context" + "github.com/arangodb/kube-arangodb/pkg/deployment/actions" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" +) + +// createTaskPlan creates a plan for an unprocessed, the oldest task +// If a plan already exists, the given plan is returned with false. +// Otherwise, the new plan is returned with a boolean true. +func (r *Reconciler) createTaskPlan(ctx context.Context, apiObject k8sutil.APIObject, + currentPlan api.Plan, spec api.DeploymentSpec, + status api.DeploymentStatus, + builderCtx PlanBuilderContext) (api.Plan, api.BackOff, bool) { + if !currentPlan.IsEmpty() { + // Plan already exists, complete that first + return currentPlan, nil, false + } + + var plan api.Plan + task, err := r.context.GetNextTask(ctx) + if err != nil { + r.log.Error("Failed to get next task: %v", err) + return plan, nil, false + } + + if task == nil { + return plan, nil, false + } + + r.log.Info("Starting processing task: %s", task.Name) + switch task.Spec.Type { + case api.ArangoTaskPingType: + plan = r.createPingPlan(task) + default: + r.log.Error("Unknown task type: %s", task.Spec.Type) + } + + return plan, status.BackOff, true +} + +func (r *Reconciler) createPingPlan(task *api.ArangoTask) api.Plan { + return api.Plan{ + actions.NewClusterAction(api.ActionTypePing, "Pinging database server").SetTaskID(task.UID), + } +} diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index 3f7c1742a..19694bbb7 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -85,6 +85,11 @@ type testContext struct { Inspector inspectorInterface.Inspector } +func (c *testContext) GetNextTask(ctx context.Context) (*api.ArangoTask, error) { + //TODO implement me + panic("implement me") +} + func (c *testContext) GetAgencyHealth() (agencyCache.Health, bool) { //TODO implement me panic("implement me") diff --git a/pkg/deployment/reconcile/plan_executor.go b/pkg/deployment/reconcile/plan_executor.go index 10acc126b..7f3c74643 100644 --- a/pkg/deployment/reconcile/plan_executor.go +++ b/pkg/deployment/reconcile/plan_executor.go @@ -23,6 +23,7 @@ package reconcile import ( "context" "fmt" + "sort" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -112,6 +113,26 @@ func (p plannerResources) Set(deployment *api.DeploymentStatus, plan api.Plan) b return false } +type plannerTask struct { +} + +func (p plannerTask) Type() string { + return "task" +} + +func (p plannerTask) Get(deployment *api.DeploymentStatus) api.Plan { + return deployment.TaskPlan +} + +func (p plannerTask) Set(deployment *api.DeploymentStatus, plan api.Plan) bool { + if !deployment.TaskPlan.Equal(plan) { + deployment.TaskPlan = plan + return true + } + + return false +} + // ExecutePlan tries to execute the plan as far as possible. // Returns true when it has to be called again soon. // False otherwise. @@ -137,6 +158,13 @@ func (d *Reconciler) ExecutePlan(ctx context.Context) (bool, error) { callAgain = true } + if again, err := d.executePlanStatus(ctx, plannerTask{}); err != nil { + d.planLogger.Err(err).Error("Execution of task plan failed") + return false, errors.WithStack(err) + } else if again { + callAgain = true + } + return callAgain, nil } @@ -181,6 +209,7 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl planAction := plan[0] action, actionContext := d.createAction(planAction) + task := d.getTaskFromAction(ctx, planAction) done, abort, recall, retry, err := d.executeAction(ctx, planAction, action) if err != nil { @@ -192,6 +221,8 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl planAction.Type.String(), pg.Type()).Set(0.0) actionsFailedMetrics.WithLabelValues(d.context.GetName(), planAction.Type.String(), pg.Type()).Inc() + + d.updateTaskStatus(ctx, planAction, task, api.ArangoTaskFailedState) return nil, false, errors.WithStack(err) } @@ -201,10 +232,14 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl planAction.Type.String(), pg.Type()).Set(0.0) actionsFailedMetrics.WithLabelValues(d.context.GetName(), planAction.Type.String(), pg.Type()).Inc() + + d.updateTaskStatus(ctx, planAction, task, api.ArangoTaskFailedState) return nil, true, nil } if done { + d.updateTaskStatus(ctx, planAction, task, api.ArangoTaskSuccessState) + if planAction.IsStarted() { // The below metrics was increased in the previous iteration, so it should be decreased now. // If the action hasn't been started in this iteration then the metrics have not been increased. @@ -247,6 +282,7 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl return nil, false, errors.WithStack(err) } } else { + d.updateTaskStatus(ctx, planAction, task, api.ArangoTaskRunningState) if !plan[0].IsStarted() { // The action has been started in this iteration, but it is not finished yet. actionsCurrentPlan.WithLabelValues(d.context.GetName(), planAction.Group.AsRole(), planAction.MemberID, @@ -336,3 +372,49 @@ func (d *Reconciler) createAction(action api.Action) (Action, ActionContext) { return f(action, actionCtx), actionCtx } + +func (d *Reconciler) getTaskFromAction(ctx context.Context, action api.Action) *api.ArangoTask { + if action.TaskID == "" { + return nil + } + + tasks, err := d.context.ACS().Cache().ArangoTask().V1() + if err != nil { + d.log.Err(err).Error("Failed to get ArangoTask cache") + return nil + } + + task, exist := tasks.GetSimpleById(action.TaskID) + if !exist { + d.log.Error("ArangoTask not found") + return nil + } + return task +} + +func (d *Reconciler) updateTaskStatus(ctx context.Context, action api.Action, task *api.ArangoTask, state api.ArangoTaskState) { + if task == nil { + return + } + + task.Status.State = state + + ind := sort.Search(len(task.Status.ActionsState), func(i int) bool { + return task.Status.ActionsState[i].ActionId == action.ID + }) + + if ind < len(task.Status.ActionsState) && task.Status.ActionsState[ind].ActionId == action.ID { + task.Status.ActionsState[ind].State = state + } else { + task.Status.ActionsState = append(task.Status.ActionsState, api.ArangoActionState{ActionId: action.ID, State: state}) + } + + cache := d.context.ACS().CurrentClusterCache() + var err error + if task, err = cache.Client().Arango().DatabaseV1().ArangoTasks(cache.Namespace()).UpdateStatus(ctx, task, metav1.UpdateOptions{}); err != nil { + if err != nil { + d.log.Err(err).Error("Failed to update ArangoTask") + return + } + } +} diff --git a/pkg/deployment/reconcile/timeouts.go b/pkg/deployment/reconcile/timeouts.go index 084f0a9db..16207893c 100644 --- a/pkg/deployment/reconcile/timeouts.go +++ b/pkg/deployment/reconcile/timeouts.go @@ -23,20 +23,21 @@ package reconcile import "time" const ( + defaultTimeout = time.Minute * 10 + addMemberTimeout = time.Minute * 10 + backupRestoreTimeout = time.Minute * 15 cleanoutMemberTimeout = time.Hour * 12 - removeMemberTimeout = time.Minute * 15 - recreateMemberTimeout = time.Minute * 15 operationTLSCACertificateTimeout = time.Minute * 30 - rotateMemberTimeout = time.Minute * 15 + pingTimeout = time.Minute * 15 pvcResizeTimeout = time.Minute * 30 pvcResizedTimeout = time.Minute * 15 - backupRestoreTimeout = time.Minute * 15 + recreateMemberTimeout = time.Minute * 15 + removeMemberTimeout = time.Minute * 15 + rotateMemberTimeout = time.Minute * 15 shutdownMemberTimeout = time.Minute * 30 + shutdownTimeout = time.Second * 15 + tlsSNIUpdateTimeout = time.Minute * 10 upgradeMemberTimeout = time.Hour * 6 waitForMemberUpTimeout = time.Minute * 30 - tlsSNIUpdateTimeout = time.Minute * 10 - defaultTimeout = time.Minute * 10 - - shutdownTimeout = time.Second * 15 ) diff --git a/pkg/deployment/resources/inspector/at_v1.go b/pkg/deployment/resources/inspector/at_v1.go index 2e12216d1..5627d47e2 100644 --- a/pkg/deployment/resources/inspector/at_v1.go +++ b/pkg/deployment/resources/inspector/at_v1.go @@ -26,8 +26,10 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/util/errors" ins "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangotask/v1" + apiErrors "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) func (p *arangoTasksInspector) V1() (ins.Inspector, error) { @@ -99,6 +101,15 @@ func (p *arangoTasksInspectorV1) GetSimple(name string) (*api.ArangoTask, bool) return arangoTask, true } +func (p *arangoTasksInspectorV1) GetSimpleById(id types.UID) (*api.ArangoTask, bool) { + for _, task := range p.arangoTasks { + if task.UID == id { + return task, true + } + } + return nil, false +} + func (p *arangoTasksInspectorV1) Iterate(action ins.Action, filters ...ins.Filter) error { for _, arangoTask := range p.arangoTasks { if err := p.iterateArangoTask(arangoTask, action, filters...); err != nil { diff --git a/pkg/util/k8sutil/inspector/arangotask/v1/loader.go b/pkg/util/k8sutil/inspector/arangotask/v1/loader.go index 05f83ddf3..00cc8747b 100644 --- a/pkg/util/k8sutil/inspector/arangotask/v1/loader.go +++ b/pkg/util/k8sutil/inspector/arangotask/v1/loader.go @@ -21,6 +21,8 @@ package v1 import ( + "k8s.io/apimachinery/pkg/types" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/gvk" ) @@ -30,6 +32,7 @@ type Inspector interface { ListSimple() []*api.ArangoTask GetSimple(name string) (*api.ArangoTask, bool) + GetSimpleById(id types.UID) (*api.ArangoTask, bool) Filter(filters ...Filter) []*api.ArangoTask Iterate(action Action, filters ...Filter) error Read() ReadInterface From c568e774a03b4b1cb347c42695272e9ba0738d92 Mon Sep 17 00:00:00 2001 From: jwierzbo Date: Mon, 27 Jun 2022 13:14:53 +0200 Subject: [PATCH 2/7] refresh arangotask cache on every update --- pkg/deployment/reconcile/plan_executor.go | 34 ++++++++++------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/pkg/deployment/reconcile/plan_executor.go b/pkg/deployment/reconcile/plan_executor.go index 7f3c74643..694034b44 100644 --- a/pkg/deployment/reconcile/plan_executor.go +++ b/pkg/deployment/reconcile/plan_executor.go @@ -207,9 +207,7 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl // Take first action planAction := plan[0] - action, actionContext := d.createAction(planAction) - task := d.getTaskFromAction(ctx, planAction) done, abort, recall, retry, err := d.executeAction(ctx, planAction, action) if err != nil { @@ -222,7 +220,7 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl actionsFailedMetrics.WithLabelValues(d.context.GetName(), planAction.Type.String(), pg.Type()).Inc() - d.updateTaskStatus(ctx, planAction, task, api.ArangoTaskFailedState) + d.updateTaskStatus(ctx, planAction, api.ArangoTaskFailedState) return nil, false, errors.WithStack(err) } @@ -233,12 +231,12 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl actionsFailedMetrics.WithLabelValues(d.context.GetName(), planAction.Type.String(), pg.Type()).Inc() - d.updateTaskStatus(ctx, planAction, task, api.ArangoTaskFailedState) + d.updateTaskStatus(ctx, planAction, api.ArangoTaskFailedState) return nil, true, nil } if done { - d.updateTaskStatus(ctx, planAction, task, api.ArangoTaskSuccessState) + d.updateTaskStatus(ctx, planAction, api.ArangoTaskSuccessState) if planAction.IsStarted() { // The below metrics was increased in the previous iteration, so it should be decreased now. @@ -282,7 +280,7 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl return nil, false, errors.WithStack(err) } } else { - d.updateTaskStatus(ctx, planAction, task, api.ArangoTaskRunningState) + d.updateTaskStatus(ctx, planAction, api.ArangoTaskRunningState) if !plan[0].IsStarted() { // The action has been started in this iteration, but it is not finished yet. actionsCurrentPlan.WithLabelValues(d.context.GetName(), planAction.Group.AsRole(), planAction.MemberID, @@ -373,27 +371,26 @@ func (d *Reconciler) createAction(action api.Action) (Action, ActionContext) { return f(action, actionCtx), actionCtx } -func (d *Reconciler) getTaskFromAction(ctx context.Context, action api.Action) *api.ArangoTask { +func (d *Reconciler) updateTaskStatus(ctx context.Context, action api.Action, state api.ArangoTaskState) { if action.TaskID == "" { - return nil + return + } + + err := d.context.ACS().Cache().ArangoTask().Refresh(ctx) + if err != nil { + d.log.Err(err).Error("Failed to refresh ArangoTask") + return } - tasks, err := d.context.ACS().Cache().ArangoTask().V1() + tasksCache, err := d.context.ACS().Cache().ArangoTask().V1() if err != nil { d.log.Err(err).Error("Failed to get ArangoTask cache") - return nil + return } - task, exist := tasks.GetSimpleById(action.TaskID) + task, exist := tasksCache.GetSimpleById(action.TaskID) if !exist { d.log.Error("ArangoTask not found") - return nil - } - return task -} - -func (d *Reconciler) updateTaskStatus(ctx context.Context, action api.Action, task *api.ArangoTask, state api.ArangoTaskState) { - if task == nil { return } @@ -410,7 +407,6 @@ func (d *Reconciler) updateTaskStatus(ctx context.Context, action api.Action, ta } cache := d.context.ACS().CurrentClusterCache() - var err error if task, err = cache.Client().Arango().DatabaseV1().ArangoTasks(cache.Namespace()).UpdateStatus(ctx, task, metav1.UpdateOptions{}); err != nil { if err != nil { d.log.Err(err).Error("Failed to update ArangoTask") From 6b40333b7daec947f7bd0c03614659d0733ad03d Mon Sep 17 00:00:00 2001 From: jwierzbo Date: Mon, 27 Jun 2022 13:18:55 +0200 Subject: [PATCH 3/7] fix typo --- pkg/apis/deployment/v1/plan.go | 2 +- pkg/apis/deployment/v2alpha1/plan.go | 2 +- pkg/deployment/reconcile/action_ping.go | 10 +++++----- pkg/deployment/reconcile/plan_executor.go | 2 +- pkg/deployment/resources/inspector/at_v1.go | 2 +- pkg/util/k8sutil/inspector/arangotask/v1/loader.go | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/apis/deployment/v1/plan.go b/pkg/apis/deployment/v1/plan.go index f9842f8d7..59c7409bd 100644 --- a/pkg/apis/deployment/v1/plan.go +++ b/pkg/apis/deployment/v1/plan.go @@ -198,7 +198,7 @@ const ( // ArangoTask actions - // ActionTypePing it a mock to check if the action flow is working + // ActionTypePing is a mock to check if the action flow is working ActionTypePing ActionType = "Ping" ) diff --git a/pkg/apis/deployment/v2alpha1/plan.go b/pkg/apis/deployment/v2alpha1/plan.go index c0567ada5..bac3f5c2d 100644 --- a/pkg/apis/deployment/v2alpha1/plan.go +++ b/pkg/apis/deployment/v2alpha1/plan.go @@ -198,7 +198,7 @@ const ( // ArangoTask actions - // ActionTypePing it a mock to check if the action flow is working + // ActionTypePing is a mock to check if the action flow is working ActionTypePing ActionType = "Ping" ) diff --git a/pkg/deployment/reconcile/action_ping.go b/pkg/deployment/reconcile/action_ping.go index 11952a700..1be3462b6 100644 --- a/pkg/deployment/reconcile/action_ping.go +++ b/pkg/deployment/reconcile/action_ping.go @@ -31,18 +31,18 @@ func init() { } func newPing(action api.Action, actionCtx ActionContext) Action { - a := &pingDbServerAction{} + a := &pingAction{} a.actionImpl = newActionImplDefRef(action, actionCtx) return a } -type pingDbServerAction struct { +type pingAction struct { actionImpl } -func (a *pingDbServerAction) Start(ctx context.Context) (bool, error) { +func (a *pingAction) Start(ctx context.Context) (bool, error) { if a.action.TaskID == "" { a.log.Error("taskName parameter is missing") return true, nil @@ -51,6 +51,6 @@ func (a *pingDbServerAction) Start(ctx context.Context) (bool, error) { return false, nil } -func (a *pingDbServerAction) CheckProgress(ctx context.Context) (bool, bool, error) { +func (a *pingAction) CheckProgress(ctx context.Context) (bool, bool, error) { return true, false, nil -} \ No newline at end of file +} diff --git a/pkg/deployment/reconcile/plan_executor.go b/pkg/deployment/reconcile/plan_executor.go index 694034b44..007be20ed 100644 --- a/pkg/deployment/reconcile/plan_executor.go +++ b/pkg/deployment/reconcile/plan_executor.go @@ -388,7 +388,7 @@ func (d *Reconciler) updateTaskStatus(ctx context.Context, action api.Action, st return } - task, exist := tasksCache.GetSimpleById(action.TaskID) + task, exist := tasksCache.GetSimpleByID(action.TaskID) if !exist { d.log.Error("ArangoTask not found") return diff --git a/pkg/deployment/resources/inspector/at_v1.go b/pkg/deployment/resources/inspector/at_v1.go index 5627d47e2..a91024066 100644 --- a/pkg/deployment/resources/inspector/at_v1.go +++ b/pkg/deployment/resources/inspector/at_v1.go @@ -101,7 +101,7 @@ func (p *arangoTasksInspectorV1) GetSimple(name string) (*api.ArangoTask, bool) return arangoTask, true } -func (p *arangoTasksInspectorV1) GetSimpleById(id types.UID) (*api.ArangoTask, bool) { +func (p *arangoTasksInspectorV1) GetSimpleByID(id types.UID) (*api.ArangoTask, bool) { for _, task := range p.arangoTasks { if task.UID == id { return task, true diff --git a/pkg/util/k8sutil/inspector/arangotask/v1/loader.go b/pkg/util/k8sutil/inspector/arangotask/v1/loader.go index 00cc8747b..fa19d6154 100644 --- a/pkg/util/k8sutil/inspector/arangotask/v1/loader.go +++ b/pkg/util/k8sutil/inspector/arangotask/v1/loader.go @@ -32,7 +32,7 @@ type Inspector interface { ListSimple() []*api.ArangoTask GetSimple(name string) (*api.ArangoTask, bool) - GetSimpleById(id types.UID) (*api.ArangoTask, bool) + GetSimpleByID(id types.UID) (*api.ArangoTask, bool) Filter(filters ...Filter) []*api.ArangoTask Iterate(action Action, filters ...Filter) error Read() ReadInterface From 2d156fbd4cabd952fd9d2e4eb7f810d606b813cc Mon Sep 17 00:00:00 2001 From: jwierzbo Date: Mon, 27 Jun 2022 13:41:14 +0200 Subject: [PATCH 4/7] add UT --- pkg/deployment/reconcile/plan_builder_test.go | 41 +++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index 19694bbb7..09b10b16b 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -23,6 +23,7 @@ package reconcile import ( "context" "fmt" + "github.com/arangodb/kube-arangodb/pkg/deployment/resources" "io/ioutil" "testing" "time" @@ -49,7 +50,6 @@ import ( "github.com/arangodb/kube-arangodb/pkg/deployment/patch" pod2 "github.com/arangodb/kube-arangodb/pkg/deployment/pod" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" - "github.com/arangodb/kube-arangodb/pkg/deployment/resources" "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" "github.com/arangodb/kube-arangodb/pkg/logging" "github.com/arangodb/kube-arangodb/pkg/util" @@ -81,13 +81,13 @@ type testContext struct { PVC *core.PersistentVolumeClaim PVCErr error RecordedEvent *k8sutil.Event + ArangoTask *api.ArangoTask Inspector inspectorInterface.Inspector } func (c *testContext) GetNextTask(ctx context.Context) (*api.ArangoTask, error) { - //TODO implement me - panic("implement me") + return c.ArangoTask, nil } func (c *testContext) GetAgencyHealth() (agencyCache.Health, bool) { @@ -696,6 +696,7 @@ type testCase struct { ExpectedError error ExpectedPlan api.Plan ExpectedHighPlan api.Plan + ExpectedTaskPlan api.Plan ExpectedLog string ExpectedEvent *k8sutil.Event @@ -761,6 +762,17 @@ func TestCreatePlan(t *testing.T) { addAgentsToStatus(t, &deploymentTemplate.Status, 3) deploymentTemplate.Spec.SetDefaults("createPlanTest") + arangoTaskTemplate := &api.ArangoTask{ + ObjectMeta: meta.ObjectMeta{ + Name: "test_task", + Namespace: tests.FakeNamespace, + }, + Spec: api.ArangoTaskSpec{ + Type: api.ArangoTaskPingType, + DeploymentName: "test_depl", + }, + } + testCases := []testCase{ { Name: "Can not create plan for single deployment", @@ -1173,6 +1185,18 @@ func TestCreatePlan(t *testing.T) { }, ExpectedLog: "Creating scale-down plan", }, + { + Name: "ArangoTask - ping type", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + ArangoTask: arangoTaskTemplate, + }, + + ExpectedTaskPlan: []api.Action{ + actions.NewClusterAction(api.ActionTypePing, "Pinging database server"), + }, + ExpectedLog: "Pinging database server", + }, } for _, testCase := range testCases { @@ -1242,6 +1266,17 @@ func TestCreatePlan(t *testing.T) { assert.Equal(t, v.Reason, status.Plan[i].Reason) } } + + if len(testCase.ExpectedTaskPlan) > 0 { + require.Len(t, status.TaskPlan, len(testCase.ExpectedTaskPlan)) + for i, v := range testCase.ExpectedTaskPlan { + assert.Equal(t, v.Type, status.TaskPlan[i].Type) + assert.Equal(t, v.Group, status.TaskPlan[i].Group) + if v.Reason != "*" { + assert.Equal(t, v.Reason, status.TaskPlan[i].Reason) + } + } + } }) } } From dc968d67da0812398a25489decceb992882def11 Mon Sep 17 00:00:00 2001 From: jwierzbo Date: Mon, 27 Jun 2022 13:57:34 +0200 Subject: [PATCH 5/7] fix fmt errors --- pkg/deployment/reconcile/plan_builder_task.go | 2 +- pkg/deployment/reconcile/plan_builder_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/deployment/reconcile/plan_builder_task.go b/pkg/deployment/reconcile/plan_builder_task.go index 7349a90b2..ef693e7b7 100644 --- a/pkg/deployment/reconcile/plan_builder_task.go +++ b/pkg/deployment/reconcile/plan_builder_task.go @@ -22,9 +22,9 @@ package reconcile import ( "context" - "github.com/arangodb/kube-arangodb/pkg/deployment/actions" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/arangodb/kube-arangodb/pkg/deployment/actions" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index 09b10b16b..92d831707 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -23,7 +23,6 @@ package reconcile import ( "context" "fmt" - "github.com/arangodb/kube-arangodb/pkg/deployment/resources" "io/ioutil" "testing" "time" @@ -37,9 +36,9 @@ import ( "k8s.io/client-go/kubernetes" "github.com/arangodb/arangosync-client/client" + "github.com/arangodb/go-driver" "github.com/arangodb/go-driver/agency" - "github.com/arangodb/go-driver" backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/deployment/acs" @@ -50,6 +49,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/deployment/patch" pod2 "github.com/arangodb/kube-arangodb/pkg/deployment/pod" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" + "github.com/arangodb/kube-arangodb/pkg/deployment/resources" "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" "github.com/arangodb/kube-arangodb/pkg/logging" "github.com/arangodb/kube-arangodb/pkg/util" From b9fc321082be340f53f4ba1613fa6d97e74003d0 Mon Sep 17 00:00:00 2001 From: jwierzbo Date: Mon, 27 Jun 2022 15:18:49 +0200 Subject: [PATCH 6/7] extend ping action flow --- pkg/apis/deployment/v1/arango_task_spec.go | 4 +-- .../deployment/v2alpha1/arango_task_spec.go | 4 +-- pkg/deployment/reconcile/action_ping.go | 34 +++++++++++++++++++ pkg/deployment/reconcile/plan_executor.go | 4 +++ 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/pkg/apis/deployment/v1/arango_task_spec.go b/pkg/apis/deployment/v1/arango_task_spec.go index 43b2eb3e0..c564ed765 100644 --- a/pkg/apis/deployment/v1/arango_task_spec.go +++ b/pkg/apis/deployment/v1/arango_task_spec.go @@ -73,8 +73,8 @@ var _ json.Unmarshaler = &ArangoTaskDetails{} var _ json.Marshaler = ArangoTaskDetails{} type ArangoTaskSpec struct { - Type ArangoTaskType `json:"type,required"` - DeploymentName string `json:"deploymentName,required"` + Type ArangoTaskType `json:"type"` + DeploymentName string `json:"deploymentName"` Details ArangoTaskDetails `json:"details,omitempty"` } diff --git a/pkg/apis/deployment/v2alpha1/arango_task_spec.go b/pkg/apis/deployment/v2alpha1/arango_task_spec.go index 40a09cfea..4f5cab5b4 100644 --- a/pkg/apis/deployment/v2alpha1/arango_task_spec.go +++ b/pkg/apis/deployment/v2alpha1/arango_task_spec.go @@ -73,8 +73,8 @@ var _ json.Unmarshaler = &ArangoTaskDetails{} var _ json.Marshaler = ArangoTaskDetails{} type ArangoTaskSpec struct { - Type ArangoTaskType `json:"type,required"` - DeploymentName string `json:"deploymentName,required"` + Type ArangoTaskType `json:"type"` + DeploymentName string `json:"deploymentName"` Details ArangoTaskDetails `json:"details,omitempty"` } diff --git a/pkg/deployment/reconcile/action_ping.go b/pkg/deployment/reconcile/action_ping.go index 1be3462b6..bacb06c58 100644 --- a/pkg/deployment/reconcile/action_ping.go +++ b/pkg/deployment/reconcile/action_ping.go @@ -22,6 +22,7 @@ package reconcile import ( "context" + "time" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" ) @@ -52,5 +53,38 @@ func (a *pingAction) Start(ctx context.Context) (bool, error) { } func (a *pingAction) CheckProgress(ctx context.Context) (bool, bool, error) { + if a.action.TaskID == "" { + a.log.Error("taskName parameter is missing") + return false, true, nil + } + + tasksCache, err := a.actionCtx.ACS().Cache().ArangoTask().V1() + if err != nil { + a.log.Err(err).Error("Failed to get ArangoTask cache") + return false, false, err + } + + task, exist := tasksCache.GetSimpleByID(a.action.TaskID) + if !exist { + a.log.Error("ArangoTask not found") + return false, false, err + } + + if task.Spec.Details != nil { + pingBody := api.ArangoTaskPing{} + if err := task.Spec.Details.Get(&pingBody); err != nil { + a.log.Err(err).Error("Failed to parse ArangoTaskPing content") + return false, false, err + } + + if pingBody.DurationSeconds != 0 { + a.log.Info("Checking ArangoTaskPing duration limits") + upTo := a.action.CreationTime.Add(time.Duration(pingBody.DurationSeconds) * time.Second) + if time.Now().Before(upTo) { + return false, false, nil + } + } + } + return true, false, nil } diff --git a/pkg/deployment/reconcile/plan_executor.go b/pkg/deployment/reconcile/plan_executor.go index 007be20ed..00acfe9f4 100644 --- a/pkg/deployment/reconcile/plan_executor.go +++ b/pkg/deployment/reconcile/plan_executor.go @@ -396,6 +396,10 @@ func (d *Reconciler) updateTaskStatus(ctx context.Context, action api.Action, st task.Status.State = state + if task.Status.AcceptedSpec == nil { + task.Status.AcceptedSpec = &task.Spec + } + ind := sort.Search(len(task.Status.ActionsState), func(i int) bool { return task.Status.ActionsState[i].ActionId == action.ID }) From bae9ef0527ed7fb1fa9687df54546472581bd67d Mon Sep 17 00:00:00 2001 From: jwierzbo Date: Mon, 27 Jun 2022 15:39:03 +0200 Subject: [PATCH 7/7] rebase --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b81b45cf..3a6928a30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ - (Bugfix) Ensure Wait actions to be present after AddMember - (Documentation) Refactor metrics (Part 1) - (Bugfix) Extend Agency HealthCheck for replace +- (Feature) ArangoTask handler (Ping type) ## [1.2.13](https://github.com/arangodb/kube-arangodb/tree/1.2.13) (2022-06-07) - (Bugfix) Fix arangosync members state inspection