Skip to content

Commit 9f1bd2b

Browse files
authored
[Feature] [Platform] Shutdown migration to CE (#1776)
1 parent 8af5309 commit 9f1bd2b

File tree

7 files changed

+275
-4
lines changed

7 files changed

+275
-4
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
- (Feature) (Platform) Platform Requirements support
3030
- (Improvement) Drop slash requirement from ArangoRoute
3131
- (Feature) (Networking) Pass through Server Header
32+
- (Feature) (Platform) Shutdown migration to CE
3233

3334
## [1.2.43](https://github.com/arangodb/kube-arangodb/tree/1.2.43) (2024-10-14)
3435
- (Feature) ArangoRoute CRD

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ Flags:
195195
--kubernetes.max-batch-size int Size of batch during objects read (default 256)
196196
--kubernetes.qps float32 Number of queries per second for k8s API (default 15)
197197
--log.format string Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used (default "pretty")
198-
--log.level stringArray Set log levels in format <level> or <logger>=<level>. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info])
198+
--log.level stringArray Set log levels in format <level> or <logger>=<level>. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info])
199199
--log.sampling If true, operator will try to minimize duplication of logging events (default true)
200200
--memory-limit uint Define memory limit for hard shutdown and the dump of goroutines. Used for testing
201201
--metrics.excluded-prefixes stringArray List of the excluded metrics prefixes

docs/cli/arangodb_operator.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ Flags:
8080
--kubernetes.max-batch-size int Size of batch during objects read (default 256)
8181
--kubernetes.qps float32 Number of queries per second for k8s API (default 15)
8282
--log.format string Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used (default "pretty")
83-
--log.level stringArray Set log levels in format <level> or <logger>=<level>. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info])
83+
--log.level stringArray Set log levels in format <level> or <logger>=<level>. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info])
8484
--log.sampling If true, operator will try to minimize duplication of logging events (default true)
8585
--memory-limit uint Define memory limit for hard shutdown and the dump of goroutines. Used for testing
8686
--metrics.excluded-prefixes stringArray List of the excluded metrics prefixes
+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package shutdown
22+
23+
import (
24+
"context"
25+
"fmt"
26+
"strconv"
27+
28+
"google.golang.org/grpc"
29+
"google.golang.org/grpc/credentials/insecure"
30+
core "k8s.io/api/core/v1"
31+
apiErrors "k8s.io/apimachinery/pkg/api/errors"
32+
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/client-go/kubernetes"
34+
35+
pbSharedV1 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition"
36+
pbShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1/definition"
37+
"github.com/arangodb/kube-arangodb/pkg/logging"
38+
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
39+
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
40+
"github.com/arangodb/kube-arangodb/pkg/operatorV2/operation"
41+
"github.com/arangodb/kube-arangodb/pkg/util"
42+
"github.com/arangodb/kube-arangodb/pkg/util/constants"
43+
)
44+
45+
var logger = logging.Global().RegisterAndGetLogger("platform-pod-shutdown", logging.Info)
46+
47+
type handler struct {
48+
kubeClient kubernetes.Interface
49+
50+
eventRecorder event.RecorderInstance
51+
52+
operator operator.Operator
53+
}
54+
55+
func (h *handler) Name() string {
56+
return Kind()
57+
}
58+
59+
func (h *handler) Handle(ctx context.Context, item operation.Item) error {
60+
pod, err := util.WithKubernetesContextTimeoutP2A2(ctx, h.kubeClient.CoreV1().Pods(item.Namespace).Get, item.Name, meta.GetOptions{})
61+
if err != nil {
62+
if apiErrors.IsNotFound(err) {
63+
return nil
64+
}
65+
66+
return err
67+
}
68+
69+
// If not annotated, stop execution
70+
if _, ok := pod.Annotations[constants.AnnotationShutdownManagedContainer]; !ok {
71+
return nil
72+
}
73+
74+
for _, container := range pod.Status.ContainerStatuses {
75+
v, ok := pod.Annotations[fmt.Sprintf("%s/%s", constants.AnnotationShutdownCoreContainer, container.Name)]
76+
if !ok {
77+
continue
78+
}
79+
80+
switch v {
81+
case constants.AnnotationShutdownCoreContainerModeWait:
82+
if container.State.Terminated == nil {
83+
// Container is not yet stopped, skip shutdown
84+
return nil
85+
}
86+
}
87+
}
88+
89+
// All containers, which are expected to shutdown, are down
90+
91+
for _, container := range pod.Status.ContainerStatuses {
92+
v, ok := pod.Annotations[fmt.Sprintf("%s/%s", constants.AnnotationShutdownContainer, container.Name)]
93+
if !ok {
94+
continue
95+
}
96+
97+
// We did not reach running state, nothing to do
98+
if container.State.Running == nil {
99+
continue
100+
}
101+
102+
port, ok := h.getContainerPort(pod.Spec.Containers, container.Name, v)
103+
if !ok {
104+
// We did not find port, continue
105+
continue
106+
}
107+
108+
if port.ContainerPort == 0 {
109+
continue
110+
}
111+
112+
if pod.Status.PodIP == "" {
113+
continue
114+
}
115+
116+
if err := util.WithKubernetesContextTimeoutP1A1(ctx, h.invokeShutdown, fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)); err != nil {
117+
logger.WrapObj(item).Err(err).Str("container", container.Name).Debug("Unable to send shutdown request")
118+
}
119+
120+
logger.WrapObj(item).Str("container", container.Name).Debug("Shutdown request sent")
121+
}
122+
123+
// Always return nil
124+
return nil
125+
}
126+
127+
func (h *handler) CanBeHandled(item operation.Item) bool {
128+
return item.Group == Group() &&
129+
item.Version == Version() &&
130+
item.Kind == Kind()
131+
}
132+
133+
func (h *handler) getContainerPort(containers []core.Container, container, port string) (core.ContainerPort, bool) {
134+
if v, err := strconv.Atoi(port); err == nil {
135+
return core.ContainerPort{
136+
ContainerPort: int32(v),
137+
}, true
138+
}
139+
140+
for _, c := range containers {
141+
if c.Name != container {
142+
continue
143+
}
144+
145+
for _, p := range c.Ports {
146+
if p.Name == port {
147+
return p, true
148+
}
149+
}
150+
}
151+
152+
return core.ContainerPort{}, false
153+
}
154+
155+
func (h *handler) invokeShutdown(ctx context.Context, addr string) error {
156+
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
157+
if err != nil {
158+
return err
159+
}
160+
161+
defer conn.Close()
162+
163+
client := pbShutdownV1.NewShutdownV1Client(conn)
164+
165+
if _, err := client.Shutdown(ctx, &pbSharedV1.Empty{}); err != nil {
166+
return err
167+
}
168+
169+
return nil
170+
}
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package shutdown
22+
23+
func Kind() string {
24+
return "Pod"
25+
}
26+
27+
func Group() string {
28+
return ""
29+
}
30+
31+
func Version() string {
32+
return "v1"
33+
}
+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package shutdown
22+
23+
import (
24+
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/client-go/informers"
26+
"k8s.io/client-go/kubernetes"
27+
28+
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
29+
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
30+
"github.com/arangodb/kube-arangodb/pkg/util/constants"
31+
)
32+
33+
// RegisterInformer into operator
34+
func RegisterInformer(operator operator.Operator, recorder event.Recorder, kubeClient kubernetes.Interface, informer informers.SharedInformerFactory) error {
35+
if err := operator.RegisterInformer(informer.Core().V1().Pods().Informer(),
36+
Group(),
37+
Version(),
38+
Kind(), func(obj meta.Object) bool {
39+
if anns := obj.GetAnnotations(); len(anns) != 0 {
40+
if _, ok := anns[constants.AnnotationShutdownManagedContainer]; ok {
41+
return true
42+
}
43+
}
44+
return false
45+
}); err != nil {
46+
return err
47+
}
48+
49+
h := &handler{
50+
kubeClient: kubeClient,
51+
52+
eventRecorder: recorder.NewInstance(Group(), Version(), Kind()),
53+
54+
operator: operator,
55+
}
56+
57+
if err := operator.RegisterHandler(h); err != nil {
58+
return err
59+
}
60+
61+
return nil
62+
}

pkg/operator/operator.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/arangodb/kube-arangodb/pkg/handlers/job"
5151
"github.com/arangodb/kube-arangodb/pkg/handlers/networking/route"
5252
platformChart "github.com/arangodb/kube-arangodb/pkg/handlers/platform/chart"
53+
platformShutdown "github.com/arangodb/kube-arangodb/pkg/handlers/platform/shutdown"
5354
platformStorage "github.com/arangodb/kube-arangodb/pkg/handlers/platform/storage"
5455
"github.com/arangodb/kube-arangodb/pkg/handlers/policy"
5556
schedulerBatchJobHandler "github.com/arangodb/kube-arangodb/pkg/handlers/scheduler/batchjob"
@@ -350,7 +351,7 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st
350351
o.onStartOperatorV2Networking(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer)
351352
o.Dependencies.NetworkingProbe.SetReady()
352353
case platformOperator:
353-
o.onStartOperatorV2Platform(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer)
354+
o.onStartOperatorV2Platform(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer)
354355
o.Dependencies.PlatformProbe.SetReady()
355356
case schedulerOperator:
356357
o.onStartOperatorV2Scheduler(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer)
@@ -398,7 +399,7 @@ func (o *Operator) onStartOperatorV2Networking(operator operatorV2.Operator, rec
398399
}
399400
}
400401

401-
func (o *Operator) onStartOperatorV2Platform(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory) {
402+
func (o *Operator) onStartOperatorV2Platform(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory, kubeInformer informers.SharedInformerFactory) {
402403
checkFn := func() error {
403404
_, err := o.Client.Arango().PlatformV1alpha1().ArangoPlatformStorages(o.Namespace).List(context.Background(), meta.ListOptions{})
404405
return err
@@ -412,6 +413,10 @@ func (o *Operator) onStartOperatorV2Platform(operator operatorV2.Operator, recor
412413
if err := platformChart.RegisterInformer(operator, recorder, client, kubeClient, informer); err != nil {
413414
panic(err)
414415
}
416+
417+
if err := platformShutdown.RegisterInformer(operator, recorder, kubeClient, kubeInformer); err != nil {
418+
panic(err)
419+
}
415420
}
416421

417422
func (o *Operator) onStartOperatorV2Scheduler(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory, kubeInformer informers.SharedInformerFactory) {

0 commit comments

Comments
 (0)