-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathsecurity_resources_app.py
More file actions
272 lines (231 loc) · 11.5 KB
/
security_resources_app.py
File metadata and controls
272 lines (231 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
from logging import Logger
from typing import Optional
from videoipath_automation_tool.apps.security.model.domain_membership_model import LocalMemberships, ResourceType
from videoipath_automation_tool.apps.security.security_api import SecurityAPI
from videoipath_automation_tool.apps.security.security_exceptions import MembershipsNotFoundError
from videoipath_automation_tool.validators.device_id_including_virtual import validate_device_id_including_virtual
class SecurityResources:
def __init__(self, security_api: SecurityAPI, logger: Logger):
self._security_api = security_api
self._logger = logger
def get_device_memberships(self, device_id: str) -> LocalMemberships:
"""
Returns memberships for the given device ID.
Creates an empty membership object if none found.
Note:
This method does NOT verify whether the device actually exists in the system.
Make sure to perform that check separately if required by your use case.
Args:
device_id (str): The ID of the device to retrieve memberships for.
Returns:
LocalMemberships: The LocalMemberships object containing the device's memberships.
"""
validate_device_id_including_virtual(device_id)
try:
return self._security_api.get_memberships_by_type_and_id(ResourceType.DEVICE, device_id)
except MembershipsNotFoundError:
self._logger.info(
f"No memberships found for device ID: {device_id}, creating new Memberships object (without domains)."
)
return LocalMemberships.model_validate(
{
"_id": f"device:{device_id}",
"_vid": f"device:{device_id}",
"_rev": "",
"domains": [],
}
)
except Exception as e:
self._logger.error(f"Error retrieving memberships for device ID '{device_id}': {e}")
raise
def get_profile_memberships(self, profile_id: str) -> LocalMemberships:
"""
Returns memberships for the given profile ID.
Creates an empty membership object if none found.
Note:
This method does NOT verify whether the profile actually exists in the system.
Make sure to perform that check separately if required by your use case.
Args:
profile_id (str): The ID of the profile to retrieve memberships for.
Returns:
LocalMemberships: The LocalMemberships object containing the profile's memberships.
"""
try:
return self._security_api.get_memberships_by_type_and_id(ResourceType.PROFILE, profile_id)
except MembershipsNotFoundError:
self._logger.info(
f"No memberships found for profile ID: {profile_id}, creating new Memberships object (without domains)."
)
return LocalMemberships.model_validate(
{
"_id": f"profile:{profile_id}",
"_vid": f"profile:{profile_id}",
"_rev": "",
"domains": [],
}
)
except Exception as e:
self._logger.error(f"Error retrieving memberships for profile ID '{profile_id}': {e}")
raise
def update_memberships(self, memberships: LocalMemberships) -> Optional[LocalMemberships]:
"""
Updates the memberships for a resource.
Args:
memberships (LocalMemberships): The LocalMemberships object to update.
Returns:
Optional[LocalMemberships]: The updated LocalMemberships object, or None if all domains were removed.
Raises:
ValueError: If the membership ID or revision does not match the existing membership.
"""
try:
existing_membership = self._security_api.get_memberships_by_type_and_id(
memberships.resource_type, memberships.resource_id
)
except MembershipsNotFoundError:
existing_membership = None
return self._security_api.add_memberships(memberships)
if existing_membership is not None:
if existing_membership.rev != memberships.rev:
raise ValueError(
f"Membership with ID '{memberships.id}' has a different revision. "
f"Expected: {memberships.rev}, Found: {existing_membership.rev}"
)
if memberships.domains == []:
# Empty Domains will result in VALIDATION_ERROR: "Cannot update to empty domain set. Remove the key instead."
self._logger.info(
f"Membership for resource ID '{memberships.resource_id}' contains no domains. Removing membership via API instead of updating."
)
return self._security_api.remove_memberships(memberships)
return self._security_api.update_memberships(memberships)
def convert_domain_ids_to_names(self, domain_ids: list[str]) -> list[str]:
"""
Converts a list of domain IDs to their corresponding names.
Args:
domain_ids (list[str]): A list of domain IDs to convert.
Returns:
list[str]: A list of domain names corresponding to the provided IDs.
"""
domain_names = []
if len(domain_ids) == 0:
self._logger.warning("No domain IDs provided for conversion. Returning an empty list.")
return domain_names
elif len(domain_ids) == 1:
self._logger.debug(
f"Converting single domain ID '{domain_ids[0]}' to name by fetching this domain from the API."
)
domain = self._security_api.get_domain_by_id(domain_ids[0])
if domain:
return [domain.name]
else:
raise ValueError(f"Domain ID '{domain_ids[0]}' not found in the system, cannot convert to name.")
elif len(domain_ids) > 1:
self._logger.debug(
f"Converting multiple domain IDs {domain_ids} to names by fetching all domains from the API."
)
all_domains = self._security_api.get_all_domains()
domain_map = {domain.id: domain.name for domain in all_domains}
for domain_id in domain_ids:
if domain_id in domain_map:
domain_names.append(domain_map[domain_id])
else:
raise ValueError(
f"Domain ID '{domain_id}' not found in the system, cannot convert all IDs to names."
)
if len(set(domain_names)) != len(domain_names):
self._logger.warning(
"Duplicate domain names found in the conversion result! This may indicate that multiple IDs map to the same name. Be cautious when using these names."
)
return domain_names
else:
return domain_names # not nesessary during runtime, just to avoid IDE warnings
def convert_domain_names_to_ids(self, domain_names: list[str]) -> list[str]:
"""
Converts a list of domain names to their corresponding IDs.
Args:
domain_names (list[str]): A list of domain names to convert.
Returns:
list[str]: A list of domain IDs corresponding to the provided names.
"""
domain_ids = []
if len(domain_names) == 0:
self._logger.warning("No domain names provided for conversion. Returning an empty list.")
return domain_ids
elif len(domain_names) == 1:
self._logger.debug(
f"Converting single domain name '{domain_names[0]}' to ID by fetching this domain from the API."
)
domain = self._security_api.get_domain_by_name(domain_names[0])
if domain and domain.id:
self._logger.debug(f"Domain name '{domain_names[0]}' converted to ID '{domain.id}'.")
return [domain.id]
else:
raise ValueError(f"Domain name '{domain_names[0]}' not found in the system, cannot convert to ID.")
elif len(domain_names) > 1:
self._logger.debug(
f"Converting multiple domain names {domain_names} to IDs by fetching all domains from the API."
)
all_domains = self._security_api.get_all_domains()
seen = {}
duplicates = {}
for domain in all_domains:
if domain.name in seen:
duplicates.setdefault(domain.name, {seen[domain.name]}).add(domain.id)
else:
seen[domain.name] = domain.id
relevant_duplicates = {name: ids for name, ids in duplicates.items() if name in domain_names}
if relevant_duplicates:
dup_str = ", ".join(f"'{name}': {sorted(ids)}" for name, ids in relevant_duplicates.items())
raise ValueError(f"Duplicate domain names in input – cannot continue: {dup_str}")
domain_map = {domain.name: domain.id for domain in all_domains}
for domain_name in domain_names:
if domain_name in domain_map:
domain_ids.append(domain_map[domain_name])
else:
raise ValueError(
f"Domain name '{domain_name}' not found in the system, cannot convert all names to IDs."
)
return domain_ids
else:
return domain_ids # not nesessary during runtime, just to avoid IDE warnings
def add_domain_by_name_to_membership_object(
self, domain_name: str, membership: LocalMemberships
) -> LocalMemberships:
"""
Adds a domain to a membership object by its name.
Args:
domain_name (str): The name of the domain to add.
membership (LocalMemberships): The LocalMemberships object to which the domain will be added.
Returns:
LocalMemberships: The updated LocalMemberships object with the domain added.
"""
domain = self._security_api.get_domain_by_name(domain_name)
if not domain or not domain.id:
raise ValueError(f"Domain with name '{domain_name}' not found.")
if domain.id in membership.domains:
self._logger.info(
f"Domain '{domain_name}' is already in the membership for resource ID '{membership.resource_id}'. No changes made."
)
return membership
membership.domains.append(domain.id)
return membership
def remove_domain_by_name_from_membership_object(
self, domain_name: str, membership: LocalMemberships
) -> LocalMemberships:
"""
Removes a domain from a membership object by its name.
Args:
domain_name (str): The name of the domain to remove.
membership (LocalMemberships): The LocalMemberships object from which the domain will be removed.
Returns:
LocalMemberships: The updated LocalMemberships object with the domain removed.
"""
domain = self._security_api.get_domain_by_name(domain_name)
if not domain or not domain.id:
raise ValueError(f"Domain with name '{domain_name}' not found.")
if domain.id not in membership.domains:
self._logger.info(
f"Domain '{domain_name}' is not in the membership for resource ID '{membership.resource_id}'. No changes made."
)
return membership
membership.domains.remove(domain.id)
return membership