-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtest_servers.py
More file actions
209 lines (190 loc) · 11.4 KB
/
test_servers.py
File metadata and controls
209 lines (190 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# pylint: disable=protected-access
from unittest import mock
import munch
from cterasdk.common import Object
from cterasdk.core import servers
from cterasdk.core.types import AmazonS3
from cterasdk import exceptions
from tests.ut.core.admin import base_admin
class TestCoreServers(base_admin.BaseCoreTest):
def setUp(self):
super().setUp()
self._server = 'ctera'
self._task_id = 12345
self._task_name = 'Zones Synchronizer'
self._task_start_time = '2022-05-06T10:42:00'
self._task_end_time = self._task_start_time
self._task_elapsed_time = 12345
self._task_status = 'success'
self._task_message = 'Updated 12345 templates successfully'
self._task_ref = f'servers/{self._server}/bgTasks/{self._task_id}'
def test_list_servers_default_attrs(self):
with mock.patch("cterasdk.core.servers.query.iterator") as query_iterator_mock:
servers.Servers(self._global_admin).list_servers()
query_iterator_mock.assert_called_once_with(self._global_admin, '/servers', mock.ANY)
expected_query_params = base_admin.BaseCoreTest._create_query_params(include=servers.Servers.default,
start_from=0, count_limit=50)
actual_query_params = query_iterator_mock.call_args[0][2]
self._assert_equal_objects(actual_query_params, expected_query_params)
def test_get_server_background_tasks(self):
self._init_global_admin(get_response=[TestCoreServers._create_task_object(id=self._task_id,
name=self._task_name,
startTime=self._task_start_time,
endTime=self._task_end_time,
elapsedTime=self._task_elapsed_time,
status=self._task_status,
progstring=self._task_message
)])
ret = servers.Servers(self._global_admin).tasks.background(self._server)
self._global_admin.api.get.assert_called_once_with(f'/servers/{self._server}/bgTasks')
self.assertEqual(ret[0].ref, self._task_ref)
def test_get_server_scheduled_tasks(self):
self._init_global_admin(get_response=[TestCoreServers._create_task_object(id=self._task_id,
name=self._task_name,
startTime=self._task_start_time
)])
servers.Servers(self._global_admin).tasks.scheduled(self._server)
self._global_admin.api.get.assert_called_once_with(f'/servers/{self._server}/schedTasks')
def test_get_servers_success(self):
get_multi_response = munch.Munch({'name': self._server})
self._init_global_admin(get_multi_response=get_multi_response)
ret = servers.Servers(self._global_admin).get(self._server)
self._global_admin.api.get_multi.assert_called_once_with(f'/servers/{self._server}', mock.ANY)
expected_include = ['/' + attr for attr in servers.Servers.default]
actual_include = self._global_admin.api.get_multi.call_args[0][1]
self.assertEqual(len(expected_include), len(actual_include))
for attr in expected_include:
self.assertIn(attr, actual_include)
self.assertEqual(ret.name, self._server)
def test_get_servers_failure(self):
get_multi_response = munch.Munch({'name': None})
self._init_global_admin(get_multi_response=get_multi_response)
with self.assertRaises(exceptions.ObjectNotFoundException) as error:
servers.Servers(self._global_admin).get(self._server)
self._global_admin.api.get_multi.assert_called_once_with(f'/servers/{self._server}', mock.ANY)
expected_include = ['/' + attr for attr in servers.Servers.default]
actual_include = self._global_admin.api.get_multi.call_args[0][1]
self.assertEqual(len(expected_include), len(actual_include))
for attr in expected_include:
self.assertIn(attr, actual_include)
self.assertEqual(f'Object not found: /servers/{self._server}', str(error.exception))
def test_modify_server_not_found(self):
self._init_global_admin()
self._global_admin.api.get = mock.MagicMock(side_effect=exceptions.CTERAException())
with self.assertRaises(exceptions.CTERAException) as error:
servers.Servers(self._global_admin).modify(self._server)
self._global_admin.api.get.assert_called_once_with(f'/servers/{self._server}')
self.assertEqual(f'Server not found: /servers/{self._server}', str(error.exception))
def test_modify_server_update_failure(self):
self._init_global_admin(get_response=self._server)
self._global_admin.api.put = mock.MagicMock(side_effect=exceptions.CTERAException())
with self.assertRaises(exceptions.CTERAException) as error:
servers.Servers(self._global_admin).modify(self._server)
ref = f'/servers/{self._server}'
self._global_admin.api.get.assert_called_once_with(ref)
self._global_admin.api.put.assert_called_once_with(f'/servers/{self._server}', self._server)
self.assertEqual(f'Server modification failed: {ref}', str(error.exception))
def test_modify_success(self):
new_server_name = 'server1'
public_ip = '192.168.90.1'
replica_base_object_ref = 'objs/server'
get_multi_response = munch.Munch({'name': self._server, 'baseObjectRef': replica_base_object_ref})
put_response = 'Success'
self._init_global_admin(get_response=munch.Munch({'name': self._server}), get_multi_response=get_multi_response,
put_response=put_response)
ret = servers.Servers(self._global_admin).modify(self._server, new_server_name, True, True, True, public_ip, False, self._server)
expected_param = TestCoreServers._create_server_object(new_server_name, True, True,
True, public_ip, False, True, replica_base_object_ref)
actual_param = self._global_admin.api.put.call_args[0][1]
self._assert_equal_objects(actual_param, expected_param)
self.assertEqual(ret, put_response)
def test_system_database(self):
with mock.patch("cterasdk.core.servers.query.run") as query_mock:
query_mock.return_value = munch.Munch({
'objects': [munch.Munch({'mainDB': True})]
})
server = servers.Servers(self._global_admin).system_database
expected_query_params = base_admin.BaseCoreTest._create_query_params(start_from=0, count_limit=50, filters=[
munch.Munch({
'field': 'mainDB',
'restriction': 'eq',
'_classname': 'BooleanFilter',
'value': True
})
])
actual_query_params = query_mock.call_args[0][2]
self._assert_equal_objects(actual_query_params, expected_query_params)
self.assertEqual(server.mainDB, True)
def test_enable_server_backup(self):
self._init_global_admin()
server_name = 'server'
with mock.patch("cterasdk.core.servers.Servers.system_database", new_callable=mock.PropertyMock) as query_mock:
query_mock.return_value = munch.Munch({'name': server_name, 'backupToBucket': None})
bucket, access, secret, endpoint = 'bucket-name', 'access', 'secret', 'www.endpoint.com'
bucket = AmazonS3(bucket, access, secret, endpoint, True, verify_ssl=False)
servers.Servers(self._global_admin).backup.enable(bucket, 60)
self._global_admin.api.put.assert_called_once_with(f'/servers/{server_name}', mock.ANY)
actual_param = self._global_admin.api.put.call_args[0][1]
expected_param = munch.Munch({
'enabled': True,
'exportSchedulePeriod': 60,
'details': TestCoreServers._create_database_backup_server_object(bucket)
})
self._assert_equal_objects(actual_param.backupToBucket, expected_param)
@staticmethod
def _create_database_backup_server_object(bucket):
return munch.Munch({
'storage': bucket.driver,
'bucket': bucket.bucket,
'accessKey': bucket.access_key,
'secretKey': bucket.secret_key,
'endPoint': bucket.endpoint,
'useHttps': bucket.https,
'trustAllCertificates': bucket.trust_all_certificates,
'masterHost': None,
'usePathStyleAddressing': False
})
def test_disable_server_backup(self):
self._init_global_admin()
server_name = 'server'
with mock.patch("cterasdk.core.servers.Servers.system_database", new_callable=mock.PropertyMock) as query_mock:
query_mock.return_value = munch.Munch({'name': server_name, 'backupToBucket': munch.Munch({'enabled': True})})
servers.Servers(self._global_admin).backup.disable()
self._global_admin.api.put.assert_called_once_with(f'/servers/{server_name}', mock.ANY)
actual_param = self._global_admin.api.put.call_args[0][1]
self._assert_equal_objects(actual_param.backupToBucket.enabled, False)
def test_server_backup_status(self):
self._init_global_admin()
with mock.patch("cterasdk.core.servers.Servers.system_database", new_callable=mock.PropertyMock) as query_mock:
query_mock.return_value = munch.Munch({'backupToBucket': munch.Munch({'status': 'Connected'})})
ret = servers.Servers(self._global_admin).backup.connected()
self.assertEqual(ret, True)
@staticmethod
def _create_server_object(name=None, app=None, preview=None, enable_public_ip=None,
public_ip=None, allow_user_login=None, enable_replication=None, replica_of=None):
server = Object()
if enable_replication is True and replica_of is not None:
server.replicationSettings = Object()
server.replicationSettings._classname = 'ServerReplicationSettings' # pylint: disable=protected-access
server.replicationSettings.replicationOf = replica_of
if enable_replication is False:
server.replicationSettings = None
if name is not None:
server.name = name
if app is not None:
server.isApplicationServer = app
if preview is not None:
server.renderingServer = preview
if enable_public_ip is True and public_ip is not None:
server.publicIpaddr = public_ip
elif enable_public_ip is False:
server.publicIpaddr = None
if allow_user_login is not None:
server.allowUserLogin = allow_user_login
return server
@staticmethod
def _create_task_object(**kwargs):
param = Object()
for key, value in kwargs.items():
setattr(param, key, value)
return param