-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathentities.py
More file actions
314 lines (256 loc) · 13.2 KB
/
entities.py
File metadata and controls
314 lines (256 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
import functools
from collections.abc import Callable
from json import JSONDecodeError
from typing import Any, TypeVar, TYPE_CHECKING, Type
from urllib.parse import quote, urlparse
from requests import Response
from cloudfoundry_client.errors import InvalidEntity
from cloudfoundry_client.common_objects import JsonObject, Request, Pagination
if TYPE_CHECKING:
from cloudfoundry_client.client import CloudFoundryClient, V3
def plural(name: str) -> str:
return name if name.endswith("s") else "%ss" % name
class Entity(JsonObject):
def __init__(self, target_endpoint: str, client: "CloudFoundryClient", **kwargs):
super(Entity, self).__init__(**kwargs)
default_manager = self._default_manager(target_endpoint, client)
self._create_navigable_links(client.v3, default_manager)
self._create_navigable_included_entities(client.v3, default_manager)
def _create_navigable_links(self, v3_client: "V3", default_manager: "EntityManager") -> None:
try:
default_method = self._default_method()
for link_name, link in self.get("links", {}).items():
if link_name != "self":
link_method = link.get("method", "GET").lower()
manager_method = self._manager_method(link_name, link_method)
ref = link["href"]
if manager_method is not None:
manager_name = plural(link_name)
other_manager = getattr(v3_client, manager_name, default_manager)
new_method = functools.partial(getattr(other_manager, manager_method), ref)
else:
new_method = functools.partial(default_method, link_method, ref)
new_method.__name__ = link_name
setattr(self, link_name, new_method)
except KeyError:
raise InvalidEntity(**self)
def _create_navigable_included_entities(self, v3_client: "V3", default_manager: "EntityManager") -> None:
for entity_name, entity_data in self.get("_included", {}).items():
manager_name = plural(entity_name)
other_manager = getattr(v3_client, manager_name, default_manager)
entity_type = other_manager._get_entity_type(entity_name)
entity = entity_type(other_manager.target_endpoint, other_manager.client, **entity_data)
setattr(self, entity_name, functools.partial(lambda e: e, entity))
self.pop("_included", None)
@staticmethod
def _default_manager(target_endpoint: str, client: "CloudFoundryClient") -> "EntityManager":
return EntityManager(target_endpoint, client, "")
@staticmethod
def _default_method() -> Callable:
def default_method(m, u):
raise NotImplementedError("Unknown method %s for url %s" % (m, u))
return default_method
@staticmethod
def _manager_method(link_name: str, link_method: str) -> str | None:
if link_method == "get":
if link_name.endswith("s"):
return "_attempt_to_paginate"
else:
return "_get"
elif link_method == "post":
return "_post"
elif link_method == "put":
return "_put"
elif link_method == "delete":
return "_delete"
return None
class Relationship(JsonObject):
def __init__(self, guid: str | None):
super(Relationship, self).__init__(guid=guid)
class ToOneRelationship(JsonObject):
@staticmethod
def from_json_object(to_one_relationship: JsonObject):
if to_one_relationship is None:
return ToOneRelationship(None)
data = to_one_relationship.pop("data", None)
result = ToOneRelationship(None if data is None else data["guid"])
result.update(to_one_relationship)
return result
def __init__(self, guid: str | None):
super(ToOneRelationship, self).__init__(data=Relationship(guid))
self.guid = guid
class ToManyRelationship(JsonObject):
@staticmethod
def from_json_object(to_many_relations: JsonObject):
result = ToManyRelationship(*[relation["guid"] for relation in to_many_relations.pop("data")])
result.update(to_many_relations)
return result
def __init__(self, *guids: str):
super(ToManyRelationship, self).__init__(data=[Relationship(guid) for guid in guids])
self.guids = list(guids)
ENTITY_TYPE = TypeVar("ENTITY_TYPE", bound=Entity)
class EntityManager(object):
def __init__(self, target_endpoint: str, client: "CloudFoundryClient", entity_uri: str, entity_type: ENTITY_TYPE = Entity):
self.target_endpoint = target_endpoint
self.entity_uri = entity_uri
self.client = client
self.entity_type = entity_type
def _post(self, url: str, data: dict | None = None, files: Any = None, entity_type: ENTITY_TYPE = None) -> Entity:
response = self.client.post(url, json=data, files=files)
return self._read_response(response, entity_type)
def _get(self, url: str, entity_type: ENTITY_TYPE | None = None, **kwargs) -> Entity:
url_requested = EntityManager._get_url_with_encoded_params(url, **kwargs)
response = self.client.get(url_requested)
return self._read_response(response, entity_type)
def _put(self, url: str, data: dict, entity_type: ENTITY_TYPE | None = None) -> Entity:
response = self.client.put(url, json=data)
return self._read_response(response, entity_type)
def _patch(self, url: str, data: dict, entity_type: ENTITY_TYPE | None = None) -> Entity:
response = self.client.patch(url, json=data)
return self._read_response(response, entity_type)
def _delete(self, url: str) -> str | None:
response = self.client.delete(url)
return self._location(response)
@staticmethod
def _location(response):
try:
return response.headers["Location"]
except (AttributeError, KeyError):
return None
def _remove(self, resource_id: str, asynchronous: bool = True) -> str | None:
url = "%s%s/%s" % (self.target_endpoint, self.entity_uri, resource_id)
job_location = self._delete(url)
if job_location is not None:
job_guid = self._extract_job_guid(job_location)
if not asynchronous:
self.client.v3.jobs.wait_for_job_completion(job_guid)
else:
return job_guid
return None
@staticmethod
def _extract_job_guid(job_location):
job_url = urlparse(job_location)
job_guid = job_url.path.rsplit("/", 1)[-1]
return job_guid
def _list(self, requested_path: str, entity_type: ENTITY_TYPE | None = None, **kwargs) -> Pagination[Entity]:
url_requested = EntityManager._get_url_with_encoded_params("%s%s" % (self.target_endpoint, requested_path), **kwargs)
response_json = self._read_response(self.client.get(url_requested), JsonObject)
return self._pagination(response_json, entity_type)
def _attempt_to_paginate(self, url_requested: str, entity_type: ENTITY_TYPE | None = None) \
-> Pagination[Entity] | Entity:
response_json = self._read_response(self.client.get(url_requested), JsonObject)
if "resources" in response_json:
return self._pagination(response_json, entity_type)
else:
return response_json
def _pagination(self, page: JsonObject, entity_type: ENTITY_TYPE | None = None) -> Pagination[Entity]:
def _entity(json_object: JsonObject) -> Entity:
return self._entity(json_object, entity_type)
return Pagination(page,
page.get("pagination", {}).get("total_results", 0),
self._next_page,
lambda p: p["resources"],
_entity)
def _next_page(self, current_page: JsonObject) -> JsonObject | None:
pagination = current_page.get("pagination")
if (
pagination is None
or "next" not in pagination
or pagination["next"] is None
or pagination["next"].get("href") is None
):
return None
return self._read_response(self.client.get(current_page["pagination"]["next"]["href"]), JsonObject)
def _create(self, data: dict) -> Entity:
url = "%s%s" % (self.target_endpoint, self.entity_uri)
return self._post(url, data=data)
def _upload_bits(self, resource_id: str, filename: str) -> Entity:
url = "%s%s/%s/upload" % (self.target_endpoint, self.entity_uri, resource_id)
files = {"bits": (filename, open(filename, "rb"))}
return self._post(url, files=files)
def _update(self, resource_id: str, data: dict) -> Entity:
url = "%s%s/%s" % (self.target_endpoint, self.entity_uri, resource_id)
return self._patch(url, data)
def __iter__(self) -> Pagination[Entity]:
return self.list()
def __getitem__(self, entity_guid) -> Entity:
return self.get(entity_guid)
def __len__(self):
return self.len()
def len(self, **kwargs):
url_requested = EntityManager._get_url_with_encoded_params("%s%s" % (self.target_endpoint, self.entity_uri), **kwargs)
response_json = self._read_response(self.client.get(url_requested, JsonObject))
pagination = response_json.get("pagination")
if pagination is not None:
return pagination.get("total_results", 0)
else:
return 0
def list(self, **kwargs) -> Pagination[Entity]:
return self._list(self.entity_uri, **kwargs)
def get_first(self, **kwargs) -> Entity | None:
kwargs.setdefault("per_page", 1)
for entity in self._list(self.entity_uri, **kwargs):
return entity
return None
def get(self, entity_id: str, *extra_paths, **kwargs) -> Entity:
if len(extra_paths) == 0:
requested_path = "%s%s/%s" % (self.target_endpoint, self.entity_uri, entity_id)
else:
requested_path = "%s%s/%s/%s" % (self.target_endpoint, self.entity_uri, entity_id, "/".join(extra_paths))
return self._get(requested_path, **kwargs)
def _read_response(self, response: Response, entity_type: ENTITY_TYPE | None) -> JsonObject | Entity:
try:
result = response.json(object_pairs_hook=JsonObject)
except JSONDecodeError:
# assume that response is empty
result = {"links": {}}
if "Location" in response.headers:
result["links"]["job"] = {
"href": response.headers["Location"],
"method": "GET",
}
return self._entity(self._mixin_included_resources(result), entity_type)
@staticmethod
def _request(**mandatory_parameters) -> Request:
return Request(**mandatory_parameters)
def _mixin_included_resources(self, result: JsonObject) -> JsonObject:
if "included" not in result:
return result
for resource in result.get("resources", [result]):
self._include_resources(resource, result)
del result["included"]
return result
def _include_resources(self, resource: JsonObject, result: JsonObject) -> None:
for relationship_name, relationship in resource.get("relationships", {}).items():
relationship_guid = (relationship.get("data") or {}).get("guid")
included_resources = result["included"].get(plural(relationship_name))
if relationship_guid is not None and included_resources is not None:
included_resource = next((r for r in included_resources if relationship_guid == r.get("guid")), None)
if included_resource is not None:
self._include_resources(included_resource, result)
included = resource.setdefault("_included", {})
included.update({relationship_name: included_resource})
@staticmethod
def _get_entity_type(entity_name: str) -> Type[ENTITY_TYPE]:
return Entity
def _entity(self, result: JsonObject, entity_type: ENTITY_TYPE | None) -> JsonObject | Entity:
if "guid" in result or ("links" in result and "job" in result["links"]):
return (entity_type or self.entity_type)(self.target_endpoint, self.client, **result)
else:
return result
@staticmethod
def _get_url_with_encoded_params(url: str, **kwargs) -> str:
def _append_encoded_parameter(parameters: list[str], args: tuple[str, Any]) -> list[str]:
parameter_name, parameter_value = args[0], args[1]
if isinstance(parameter_value, (list, tuple)):
parameters.append("%s=%s" % (parameter_name, quote(",".join(parameter_value))))
elif isinstance(parameter_value, (dict)) and parameter_name == "fields":
for resource, key in parameter_value.items():
parameters.append("%s[%s]=%s" % (parameter_name, resource, ",".join(key)))
else:
parameters.append("%s=%s" % (parameter_name, quote(str(parameter_value))))
return parameters
if len(kwargs) > 0:
return "%s?%s" % (url, "&".join(functools.reduce(_append_encoded_parameter, sorted(list(kwargs.items())), [])))
else:
return url