-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathentities.py
More file actions
171 lines (139 loc) · 7.42 KB
/
entities.py
File metadata and controls
171 lines (139 loc) · 7.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
from collections.abc import Callable
from functools import partial, reduce
from typing import Any, TYPE_CHECKING
from urllib.parse import quote
from requests import Response
from cloudfoundry_client.errors import InvalidEntity
from cloudfoundry_client.common_objects import JsonObject, Request, Pagination
if TYPE_CHECKING:
from cloudfoundry_client.client import CloudFoundryClient
class Entity(JsonObject):
def __init__(self, target_endpoint: str, client: "CloudFoundryClient", *args, **kwargs):
super().__init__(*args, **kwargs)
self.target_endpoint = target_endpoint
self.client = client
try:
if not (isinstance(self.get("entity"), dict)):
raise InvalidEntity(**self)
for attribute, value in list(self["entity"].items()):
domain_name, suffix = attribute.rpartition("_")[::2]
if suffix == "url":
manager_name = domain_name if domain_name.endswith("s") else "%ss" % domain_name
try:
other_manager = getattr(client.v2, manager_name)
except AttributeError:
# generic manager
other_manager = EntityManager(target_endpoint, client, "")
if domain_name.endswith("s"):
new_method = partial(other_manager._list, value)
else:
new_method = partial(other_manager._get, value)
new_method.__name__ = domain_name
setattr(self, domain_name, new_method)
except KeyError:
raise InvalidEntity(**self)
EntityBuilder = Callable[[list[tuple[str, Any]]], Entity]
class EntityManager(object):
list_query_parameters = ["page", "results-per-page", "order-direction"]
list_multi_parameters = ["order-by"]
timestamp_parameters = ["timestamp"]
def __init__(
self, target_endpoint: str, client: "CloudFoundryClient", entity_uri: str, entity_builder: EntityBuilder | None = None
):
self.target_endpoint = target_endpoint
self.entity_uri = entity_uri
self.client = client
self.entity_builder = (
entity_builder if entity_builder is not None else lambda pairs: Entity(target_endpoint, client, pairs)
)
def _list(self, requested_path: str, entity_builder: EntityBuilder | None = None, **kwargs) -> Pagination[Entity]:
url_requested = self._get_url_filtered("%s%s" % (self.target_endpoint, requested_path), **kwargs)
current_builder = self._get_entity_builder(entity_builder)
response_json = self._read_response(self.client.get(url_requested), JsonObject)
return Pagination(response_json, response_json.get("total_results", 0),
self._next_page,
lambda page: page["resources"],
lambda json_object: current_builder(list(json_object.items())))
def _next_page(self, current_page: JsonObject) -> JsonObject | None:
next_url = current_page.get("next_url")
if next_url is None:
return None
url_requested = "%s%s" % (self.target_endpoint, next_url)
return self._read_response(self.client.get(url_requested), JsonObject)
def _create(self, data: dict, **kwargs) -> Entity:
url = "%s%s" % (self.target_endpoint, self.entity_uri)
return self._post(url, data, **kwargs)
def _update(self, resource_id: str, data: dict, **kwargs):
url = "%s%s/%s" % (self.target_endpoint, self.entity_uri, resource_id)
return self._put(url, data, **kwargs)
def _remove(self, resource_id: str, **kwargs):
url = "%s%s/%s" % (self.target_endpoint, self.entity_uri, resource_id)
self._delete(url, **kwargs)
def _get(self, requested_path: str, entity_builder: EntityBuilder | None = None) -> Entity:
url = "%s%s" % (self.target_endpoint, requested_path)
response = self.client.get(url)
return self._read_response(response, entity_builder)
def _post(self, url: str, data: dict | None = None, **kwargs):
response = self.client.post(url, json=data, **kwargs)
return self._read_response(response)
def _put(self, url: str, data: dict | None = None, **kwargs):
response = self.client.put(url, json=data, **kwargs)
return self._read_response(response)
def _delete(self, url: str, **kwargs):
self.client.delete(url, **kwargs)
def __iter__(self) -> Pagination[Entity]:
return self.list()
def __getitem__(self, entity_guid) -> Entity:
return self.get(entity_guid)
def list(self, **kwargs) -> Pagination[Entity]:
return self._list(self.entity_uri, **kwargs)
def get_first(self, **kwargs) -> Entity | None:
kwargs.setdefault("results-per-page", 1)
for entity in self._list(self.entity_uri, **kwargs):
return entity
return None
def get(self, entity_id: str, *extra_paths) -> Entity:
if len(extra_paths) == 0:
requested_path = "%s/%s" % (self.entity_uri, entity_id)
else:
requested_path = "%s/%s/%s" % (self.entity_uri, entity_id, "/".join(extra_paths))
return self._get(requested_path)
def _read_response(self, response: Response, other_entity_builder: EntityBuilder | None = None):
entity_builder = self._get_entity_builder(other_entity_builder)
result = response.json(object_pairs_hook=JsonObject)
return entity_builder(list(result.items()))
@staticmethod
def _request(**mandatory_parameters) -> Request:
return Request(**mandatory_parameters)
def _get_entity_builder(self, entity_builder: EntityBuilder | None) -> EntityBuilder:
if entity_builder is None:
return self.entity_builder
else:
return entity_builder
def _get_url_filtered(self, url: str, **kwargs) -> str:
def _append_encoded_parameter(parameters: list[str], args: tuple[str, Any]) -> list[str]:
parameter_name, parameter_value = args[0], args[1]
if parameter_name in self.list_query_parameters:
parameters.append("%s=%s" % (parameter_name, str(parameter_value)))
elif parameter_name in self.list_multi_parameters:
value_list = parameter_value
if not isinstance(value_list, (list, tuple)):
value_list = [value_list]
for value in value_list:
parameters.append("%s=%s" % (parameter_name, str(value)))
elif parameter_name in self.timestamp_parameters:
if isinstance(args[1], dict):
operator_list = args[1].keys()
for operator in operator_list:
parameters.append("q=%s" % quote("%s%s%s" % (parameter_name, operator, args[1][operator])))
else:
parameters.append("q=%s" % quote("%s:%s" % (parameter_name, str(parameter_value))))
elif isinstance(parameter_value, (list, tuple)):
parameters.append("q=%s" % quote("%s IN %s" % (parameter_name, ",".join(parameter_value))))
else:
parameters.append("q=%s" % quote("%s:%s" % (parameter_name, str(parameter_value))))
return parameters
if len(kwargs) > 0:
return "%s?%s" % (url, "&".join(reduce(_append_encoded_parameter, sorted(list(kwargs.items())), [])))
else:
return url