-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathutil.py
More file actions
343 lines (272 loc) · 9.19 KB
/
util.py
File metadata and controls
343 lines (272 loc) · 9.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import collections.abc
import dataclasses
import datetime
import enum
import functools
import json
import logging
import re
import urllib.parse
import aiohttp.web
import dateutil.parser
import semver
import yaml
import cnudie.retrieve_async
import oci.model as om
import ocm
import ocm.iter
import version as versionutil
logger = logging.getLogger(__name__)
def parse_yaml_file(path: str) -> dict:
with open(path) as file:
return yaml.safe_load(file)
def urlparse(url: str) -> urllib.parse.ParseResult:
if '://' not in url:
url = f'x://{url}'
return urllib.parse.urlparse(url)
def urljoin(*parts):
if len(parts) == 1:
return parts[0]
first = parts[0]
last = parts[-1]
middle = parts[1:-1]
first = first.rstrip('/')
middle = list(map(lambda s: s.strip('/'), middle))
last = last.lstrip('/')
return '/'.join([first] + middle + [last])
@functools.cache
def normalise_url_to_second_and_tld(url: str):
hostname = urlparse(url).hostname
parts = hostname.strip('.').split('.')
if parts[0] == 'api':
parts = parts[1:]
# hack: discard `api` subdomain (specific to github.com)
return '.'.join(parts).lower()
def purge_callables_from_dict(data) -> dict:
if isinstance(data, str):
return data
elif isinstance(data, dict):
return dict(
(k, purge_callables_from_dict(v))
for k, v in data.items()
if not isinstance(v, collections.abc.Callable)
)
elif isinstance(data, collections.abc.Iterable):
return [
purge_callables_from_dict(o) for o in data if not isinstance(o, collections.abc.Callable)
]
return data
def dict_serialisation(data) -> dict:
if isinstance(data, enum.Enum):
return data.value
elif isinstance(data, str):
return data
elif isinstance(data, (datetime.date, datetime.datetime)):
return data.isoformat()
elif isinstance(data, datetime.timedelta):
return data.total_seconds()
elif dataclasses.is_dataclass(data):
return dict_serialisation(dataclasses.asdict(data))
elif isinstance(data, dict):
return dict((k, dict_serialisation(v)) for k, v in data.items())
elif isinstance(data, collections.abc.Iterable):
return [dict_serialisation(o) for o in data]
return data
def dict_to_json_factory(data: dict) -> str:
return json.dumps(dict_serialisation(data))
async def retrieve_component_descriptor(
component_id: ocm.ComponentIdentity,
/,
component_descriptor_lookup: cnudie.retrieve_async.ComponentDescriptorLookupById,
ocm_repository_lookup: ocm.OcmRepositoryLookup | None = None,
) -> ocm.ComponentDescriptor:
try:
if ocm_repository_lookup:
return await component_descriptor_lookup(
component_id,
ocm_repository_lookup=ocm_repository_lookup,
)
return await component_descriptor_lookup(component_id)
except om.OciImageNotFoundException:
err_str = (
f'Component descriptor "{component_id.name}" in version "{component_id.version}" not '
'found'
)
if ocm_repository_lookup:
ocm_repos = list(ocm_repository_lookup(component_id.name))
err_str += f' in OCM repositories: {ocm_repos}'
logger.debug(err_str)
raise aiohttp.web.HTTPNotFound(
reason='Component descriptor not found',
text=err_str,
)
def artefact_node_to_str(
artefact_node: ocm.iter.Node | ocm.iter.ArtefactNode,
) -> str:
component_id = artefact_node.component.identity()
if isinstance(artefact_node, ocm.iter.SourceNode):
artefact_id = artefact_node.artefact.identity(peers=artefact_node.component.sources)
else:
artefact_id = artefact_node.artefact.identity(peers=artefact_node.component.resources)
return f'{component_id.name}:{component_id.version}_{artefact_id}'
def get_enum_value_or_raise(
value: object,
enum_type: type,
):
try:
return enum_type(value)
except ValueError:
raise aiohttp.web.HTTPBadRequest(
text=(
f'Bad value for {enum_type=}: {value=}. '
f'Allowed values: {[val.value for val in enum_type]}'
),
)
def param(
params: dict,
name: str,
*,
required: bool = False,
default=None,
):
if required and default:
raise ValueError('there must be no default value if `required` is set to `True`')
if name not in params:
if required:
raise aiohttp.web.HTTPBadRequest(
reason='Missing parameter',
text=f'The {name} parameter is required.',
)
return default
return params[name]
def param_as_bool(
params: dict,
name: str,
*,
required: bool = False,
default: bool = False,
) -> bool:
# taken from requests' `request.get_param_as_bool`
true_strings = ('true', 'True', 't', 'yes', 'y', '1', 'on')
false_strings = ('false', 'False', 'f', 'no', 'n', '0', 'off')
val = str(
param(
params=params,
name=name,
required=required,
default=default,
),
)
if val in true_strings:
return True
elif val in false_strings:
return False
raise aiohttp.web.HTTPBadRequest(
reason='Invalid parameter',
text=f'The value of the paramter {name} must be a boolean value.',
)
def error_description(
error_id: str,
**kwargs,
) -> str:
"""
Used to create a uniform error JSON response, containing a unique `error_id` as well as optional
extra parameters to enrich the error description. Callers will still have to set the correct
content type `application/json` for the error object.
"""
return dict_to_json_factory(
{
'error_id': error_id,
**kwargs,
},
)
def get_creation_date(component: ocm.Component) -> datetime.datetime:
"""
Trys to extract creation date from creationTime attribute and if not set from label with name
"cloud.gardener/ocm/creation-date".
Raises KeyError, if both is not successful.
"""
if creationTime := component.creationTime:
return dateutil.parser.isoparse(creationTime)
creation_label: ocm.Label | None = component.find_label('cloud.gardener/ocm/creation-date')
if not creation_label:
raise KeyError(
'The attribute creation time, as well as the',
'label named "cloud.gardener/ocm/creation-date", were not set.',
)
else:
return dateutil.parser.isoparse(creation_label.value)
def convert_to_timedelta(
time: str | int,
default_unit: str = 'd',
) -> datetime.timedelta:
seconds_per_unit = {
's': 1,
'sec': 1,
'm': 60,
'min': 60,
'h': 60 * 60,
'hr': 60 * 60,
'd': 60 * 60 * 24,
'w': 60 * 60 * 24 * 7,
'y': 60 * 60 * 24 * 365,
'yr': 60 * 60 * 24 * 365,
}
unit = None
if match := re.match(r'([0-9]+)\s*([a-z]*)', str(time).strip(), re.IGNORECASE):
time, unit = match.groups()
seconds = int(time) * seconds_per_unit[unit or default_unit]
return datetime.timedelta(seconds=seconds)
def pluralise(
word: str,
count: int,
) -> str:
if count == 1:
return word
word = re.sub('y$', 'ie', word)
word = re.sub('s$', 'se', word)
return word + 's'
def merge_dicts(base: dict, *other: dict, list_semantics='merge'):
"""
merges copies of the given dict instances and returns the merge result.
The arguments remain unmodified. However, it must be possible to copy them
using `copy.deepcopy`.
Merging is done using the `deepmerge` module. In case of merge conflicts, values from
`other` overwrite values from `base`.
By default, different from the original implementation, a merge will be applied to
lists. This results in deduplication retaining element order. The elements from `other` are
appended to those from `base`.
"""
if base is None:
raise ValueError('base must not be None')
if other is None:
raise ValueError('other must not be None')
from deepmerge import Merger
if list_semantics == 'merge':
# monkey-patch merge-strategy for lists
list_merge_strategy = Merger.PROVIDED_TYPE_STRATEGIES[list]
list_merge_strategy.strategy_merge = lambda c, p, base, other: (
list(base) + [e for e in other if e not in base]
)
strategy_cfg = [(list, ['merge']), (dict, ['merge'])]
merger = Merger(strategy_cfg, ['override'], ['override'])
elif list_semantics is None:
strategy_cfg = [(dict, ['merge'])]
merger = Merger(strategy_cfg, ['override'], ['override'])
else:
raise NotImplementedError
from copy import deepcopy
return functools.reduce(
lambda b, o: merger.merge(b, deepcopy(o)),
[base, *other],
{},
)
def version_sorting_key(
version: str,
fallback_version: str = '0.0',
) -> semver.VersionInfo:
try:
version = versionutil.parse_to_semver(version)
except ValueError:
version = versionutil.parse_to_semver(fallback_version)
return version