forked from patroni/patroni
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_kubernetes.py
128 lines (104 loc) · 6.38 KB
/
test_kubernetes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import time
import unittest
from mock import Mock, patch
from patroni.dcs.kubernetes import Kubernetes, KubernetesError, k8s_client, k8s_watch, RetryFailedError
def mock_list_namespaced_config_map(self, *args, **kwargs):
metadata = {'resource_version': '1', 'labels': {'f': 'b'}, 'name': 'test-config',
'annotations': {'initialize': '123', 'config': '{}'}}
items = [k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))]
metadata.update({'name': 'test-leader', 'annotations': {'optime': '1234', 'leader': 'p-0', 'ttl': '30s'}})
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
metadata.update({'name': 'test-failover', 'annotations': {'leader': 'p-0'}})
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
metadata.update({'name': 'test-sync', 'annotations': {'leader': 'p-0'}})
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
metadata = k8s_client.V1ObjectMeta(resource_version='1')
return k8s_client.V1ConfigMapList(metadata=metadata, items=items)
def mock_list_namespaced_pod(self, *args, **kwargs):
metadata = k8s_client.V1ObjectMeta(resource_version='1', name='p-0', annotations={'status': '{}'})
items = [k8s_client.V1Pod(metadata=metadata)]
return k8s_client.V1PodList(items=items)
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', Mock())
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_config_map', Mock())
class TestKubernetes(unittest.TestCase):
@patch('kubernetes.config.load_kube_config', Mock())
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map)
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod)
def setUp(self):
self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, 'labels': {'f': 'b'}})
self.k.get_cluster()
def test_get_cluster(self):
with patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map), \
patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod), \
patch('time.time', Mock(return_value=time.time() + 31)):
self.k.get_cluster()
with patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', Mock(side_effect=Exception)):
self.assertRaises(KubernetesError, self.k.get_cluster)
@patch('kubernetes.config.load_kube_config', Mock())
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock())
def test_update_leader(self):
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertIsNotNone(k.update_leader('123'))
@patch('kubernetes.config.load_kube_config', Mock())
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock())
def test_update_leader_with_restricted_access(self):
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertIsNotNone(k.update_leader('123', True))
def test_take_leader(self):
self.k.take_leader()
self.k._leader_observed_record['leader'] = 'test'
self.k.patch_or_create = Mock(return_value=False)
self.k.take_leader()
def test_manual_failover(self):
with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', Mock(side_effect=RetryFailedError(''))):
self.k.manual_failover('foo', 'bar')
def test_set_config_value(self):
self.k.set_config_value('{}')
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', Mock(return_value=True))
def test_touch_member(self):
self.k.touch_member({'role': 'replica'})
self.k._name = 'p-1'
self.k.touch_member({'state': 'running', 'role': 'replica'})
self.k.touch_member({'state': 'stopped', 'role': 'master'})
def test_initialize(self):
self.k.initialize()
def test_delete_leader(self):
self.k.delete_leader()
def test_cancel_initialization(self):
self.k.cancel_initialization()
@patch.object(k8s_client.CoreV1Api, 'delete_collection_namespaced_config_map',
Mock(side_effect=k8s_client.rest.ApiException(403, '')))
def test_delete_cluster(self):
self.k.delete_cluster()
@patch('kubernetes.config.load_kube_config', Mock())
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints',
Mock(side_effect=[k8s_client.rest.ApiException(502, ''), k8s_client.rest.ApiException(500, '')]))
def test_delete_sync_state(self):
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertFalse(k.delete_sync_state())
def test_watch(self):
self.k.set_ttl(10)
self.k.watch(None, 0)
self.k.watch(None, 0)
with patch.object(k8s_watch.Watch, 'stream',
Mock(side_effect=[Exception, [], KeyboardInterrupt,
[{'raw_object': {'metadata': {'resourceVersion': '2'}}}]])):
self.assertFalse(self.k.watch('1', 2))
self.assertRaises(KeyboardInterrupt, self.k.watch, '1', 2)
self.assertTrue(self.k.watch('1', 2))
def test_set_history_value(self):
self.k.set_history_value('{}')
@patch('kubernetes.config.load_kube_config', Mock())
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', Mock(return_value=True))
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock())
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_service',
Mock(side_effect=[True, False, k8s_client.rest.ApiException(500, '')]))
def test__create_config_service(self):
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertIsNotNone(k.patch_or_create_config({'foo': 'bar'}))
self.assertIsNotNone(k.patch_or_create_config({'foo': 'bar'}))
k.touch_member({'state': 'running', 'role': 'replica'})