This repository was archived by the owner on Jun 2, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapis.py
More file actions
608 lines (467 loc) · 19.6 KB
/
apis.py
File metadata and controls
608 lines (467 loc) · 19.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
"""Dynamic API command registration for the CLI."""
import datetime
import decimal
import importlib
import inspect
import json
import pkgutil
import types
from pathlib import Path
from typing import Annotated, Any, Optional, Union, get_args, get_origin
import classyclick
import click
from pydantic import BaseModel
from defectdojo_api_generated import api as api_package
from defectdojo_api_generated.client import DefectDojo
from defectdojo_api_generated.exceptions import BadRequestException
from defectdojo_api_generated.helpers import IteratorResult
from .cli import CLI
_PRIMITIVE_CLICK_TYPES = (
bool,
bytes,
str,
int,
float,
datetime.date,
datetime.datetime,
decimal.Decimal,
)
class API(CLI.SubGroup):
"""Interact directly with any API/method"""
def _iter_api_modules():
for _, module_name, _ in pkgutil.iter_modules(api_package.__path__):
if module_name == '__init__':
continue
module = importlib.import_module(f'{api_package.__name__}.{module_name}')
api_classes = [
value
for _, value in inspect.getmembers(module, inspect.isclass)
if value.__module__ == module.__name__ and value.__name__.endswith('Api')
]
if len(api_classes) != 1:
continue
yield module_name, api_classes[0]
def _iter_command_methods(api_class: type):
methods = sorted(name for name, member in inspect.getmembers(api_class, callable) if not name.startswith('_'))
method_set = set(methods)
for method in methods:
if method.endswith('_with_http_info') or method.endswith('_without_preload_content'):
continue
if method.endswith('list') and f'{method}_iterator' in method_set:
continue
command_name = method
if method.endswith('_iterator') and method[:-9] in method_set:
command_name = method[:-9]
yield command_name.replace('_', '-'), method
def _get_command_help(api_class: type, target_method: str) -> str:
doc_targets = [target_method]
if target_method.endswith('_iterator'):
doc_targets.append(target_method[:-9])
for method_name in doc_targets:
method = getattr(api_class, method_name, None)
doc = inspect.getdoc(method)
if doc:
return doc.splitlines()[0].strip()
return f'`{target_method}`.'
def _get_help_from_annotation(annotation: Any) -> Optional[str]:
if get_origin(annotation) is Annotated:
_, *metadata = get_args(annotation)
for item in metadata:
description = getattr(item, 'description', None)
if description:
return description
return None
def _is_primitive_click_type(annotation: Any) -> bool:
return inspect.isclass(annotation) and annotation in _PRIMITIVE_CLICK_TYPES
def _is_model_click_type(annotation: Any) -> bool:
return inspect.isclass(annotation) and issubclass(annotation, BaseModel)
def _is_file_upload_union(annotation: Any) -> bool:
origin = get_origin(annotation)
if origin is Annotated:
return _is_file_upload_union(get_args(annotation)[0])
if origin is Union or origin is getattr(types, 'UnionType', None):
args = [arg for arg in get_args(annotation) if arg is not type(None)]
normalized = set()
for arg in args:
arg_origin = get_origin(arg)
if arg_origin is Annotated:
arg = get_args(arg)[0]
arg_origin = get_origin(arg)
if arg is bytes:
normalized.add('bytes')
elif arg is str:
normalized.add('str')
elif arg_origin is tuple:
tuple_args = []
for item in get_args(arg):
item_origin = get_origin(item)
if item_origin is Annotated:
item = get_args(item)[0]
tuple_args.append(item)
if tuple_args == [str, bytes]:
normalized.add('tuple[str,bytes]')
return normalized == {'bytes', 'str', 'tuple[str,bytes]'}
return False
def _coerce_file_upload_value(value: Any):
if value is None:
return None
path = Path(value)
return (path.name, path.read_bytes())
def _convert_cli_value(name: str, value: Any, converter: Optional[type[BaseModel]], *, multiple: bool):
if name == 'file':
if multiple:
return tuple(_coerce_file_upload_value(item) for item in value)
return _coerce_file_upload_value(value)
if converter is None:
return value
if multiple:
return tuple(converter.model_validate_json(item) for item in value)
if value is None:
return None
return converter.model_validate_json(value)
def _get_click_type(
annotation: Any, *, parameter_name: Optional[str] = None
) -> tuple[Any, bool, Optional[type[BaseModel]]]:
multiple = False
current = annotation
while True:
if current is inspect.Signature.empty:
return Any, multiple, None
origin = get_origin(current)
if origin is Annotated:
current = get_args(current)[0]
continue
if origin is None:
if current is Any or _is_primitive_click_type(current):
return current, multiple, None
if _is_model_click_type(current):
return str, multiple, current
param_name = f' for parameter "{parameter_name}"' if parameter_name else ''
raise TypeError(
f'Unsupported CLI parameter type{param_name}: {current!r}. Only primitive types are supported.'
)
if origin is list:
multiple = True
list_args = get_args(current)
current = list_args[0] if list_args else Any
continue
if origin is tuple:
tuple_args = get_args(current)
current = tuple_args[0] if tuple_args else Any
continue
if origin is Union or origin is getattr(types, 'UnionType', None):
if _is_file_upload_union(current):
return Path, multiple, None
union_args = [arg for arg in get_args(current) if arg is not type(None)]
if not union_args:
return Any, multiple, None
current = union_args[0]
continue
if current is Any or _is_primitive_click_type(current):
return current, multiple, None
if _is_model_click_type(current):
return str, multiple, current
param_name = f' for parameter "{parameter_name}"' if parameter_name else ''
raise TypeError(f'Unsupported CLI parameter type{param_name}: {current!r}. Only primitive types are supported.')
def _get_class_annotation(click_type: Any) -> type:
if click_type is Any:
return str
return click_type
def _build_option(
click_type: Any,
*,
help_text: Optional[str],
required: bool = False,
default: Any = None,
multiple: bool = False,
short_name: Optional[str] = None,
):
option_kwargs = {'help': help_text}
if required:
option_kwargs['required'] = True
else:
option_kwargs['default'] = default
if click_type is not Any:
option_kwargs['type'] = click_type
if multiple:
option_kwargs['multiple'] = True
if short_name is not None:
return classyclick.Option(f'-{short_name}', default_parameter=False, **option_kwargs)
return classyclick.Option(**option_kwargs)
def _add_shared_output_options(namespace: dict[str, Any]) -> None:
namespace['json'] = classyclick.Option(help='Dump responses as JSON')
namespace['jq'] = classyclick.Option(help='Apply a JMESPath expression to each response item', default=None)
namespace['__annotations__']['json'] = bool
namespace['__annotations__']['jq'] = str
def _add_parameter_options(
namespace: dict[str, Any],
parameters: list[tuple[str, Any, Any, bool, Optional[type[BaseModel]]]],
*,
help_getter,
default_getter,
) -> None:
required_parameters = [item for item in parameters if default_getter(item) is inspect.Signature.empty]
optional_parameters = [item for item in parameters if default_getter(item) is not inspect.Signature.empty]
for name, source, click_type, multiple, _converter in required_parameters:
namespace['__annotations__'][name] = _get_class_annotation(click_type)
namespace[name] = _build_option(
click_type,
help_text=help_getter(source),
required=True,
multiple=multiple,
short_name=name if len(name) == 1 else None,
)
_add_shared_output_options(namespace)
for name, source, click_type, multiple, _converter in optional_parameters:
namespace['__annotations__'][name] = _get_class_annotation(click_type)
namespace[name] = _build_option(
click_type,
help_text=help_getter(source),
default=default_getter((name, source, click_type, multiple, _converter)),
multiple=multiple,
short_name=name if len(name) == 1 else None,
)
def _iter_command_parameters(api_class: type, target_method: str):
signature = inspect.signature(getattr(api_class, target_method))
for name, parameter in signature.parameters.items():
if name == 'self' or name.startswith('_'):
continue
if parameter.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
continue
yield name, parameter
def _get_request_model_type(annotation: Any) -> Optional[type[BaseModel]]:
_, _, converter = _get_click_type(annotation)
if converter is not None and converter.__name__.endswith('Request'):
return converter
return None
def _get_model_field_help(field: Any) -> Optional[str]:
return field.description or _get_help_from_annotation(field.annotation)
def _build_command_namespace(command_name: str, api_class: type, target_method: str) -> dict[str, Any]:
return {
'__config__': classyclick.Command.Config(
name=command_name,
help=_get_command_help(api_class, target_method),
),
'client': classyclick.ContextMeta('client'),
'__annotations__': {
'client': DefectDojo,
},
}
def _build_command_class(parent_class: type, namespace: dict[str, Any]) -> type:
return type(
'ApiCommand',
(parent_class,),
namespace,
)
def _collect_command_values(parameters, getter, *, should_include):
values = {}
for name, source, _, multiple, converter in parameters:
value = getter(name)
value = _convert_cli_value(name, value, converter, multiple=multiple)
if should_include(value=value, source=source, multiple=multiple):
values[name] = value
return values
def _render_api_result(result: Any, instance: Any) -> None:
_render_result(
result,
json_mode=instance.json,
jq_expression=instance.jq,
max_records=getattr(instance, 'max_records', None),
)
def _create_standard_command_call(api_class: type, target_method: str, command_parameters):
def __call__(self):
method = getattr(api_class(self.client.api_client), target_method)
kwargs = _collect_command_values(
command_parameters,
lambda name: getattr(self, name),
should_include=lambda *, value, source, multiple: value if multiple else value != source.default,
)
result = _invoke_api_method(method, **kwargs)
_render_api_result(result, self)
return __call__
def _create_model_command_call(api_class: type, target_method: str, field_definitions, model_class: type[BaseModel]):
def __call__(self):
method = getattr(api_class(self.client.api_client), target_method)
request_data = _collect_command_values(
field_definitions,
lambda name: getattr(self, name),
should_include=lambda *, value, source, multiple: value
if multiple
else value != source.default and value is not None,
)
request_model = model_class.model_validate(request_data)
result = _invoke_api_method(method, request_model)
_render_api_result(result, self)
return __call__
def _build_model_field_definitions(model_class: type[BaseModel]):
field_definitions = []
for field_name, field in model_class.model_fields.items():
click_type, multiple, converter = _get_click_type(field.annotation, parameter_name=field_name)
field_definitions.append((field_name, field, click_type, multiple, converter))
field_definitions.sort(key=lambda item: not item[1].is_required())
return field_definitions
def _build_model_field_command(
api_class: type,
command_name: str,
target_method: str,
*,
parent_class: type,
model_class: type[BaseModel],
) -> type:
field_definitions = _build_model_field_definitions(model_class)
namespace = _build_command_namespace(command_name, api_class, target_method)
namespace['__call__'] = _create_model_command_call(api_class, target_method, field_definitions, model_class)
_add_parameter_options(
namespace,
field_definitions,
help_getter=_get_model_field_help,
default_getter=lambda item: item[1].default if not item[1].is_required() else inspect.Signature.empty,
)
return _build_command_class(parent_class, namespace)
def _iter_output_items(result):
if isinstance(result, IteratorResult):
yield from _iter_output_items(result.result)
return
if isinstance(result, (list, tuple)):
for item in result:
yield from _iter_output_items(item)
return
if inspect.isgenerator(result):
for item in result:
yield from _iter_output_items(item)
return
yield result
def _to_jsonable(value: Any, *, exclude_none: bool = False) -> Any:
if isinstance(value, BaseModel):
return value.model_dump(mode='json', exclude_none=exclude_none)
if isinstance(value, dict):
return {
key: _to_jsonable(item, exclude_none=exclude_none)
for key, item in value.items()
if not (exclude_none and item is None)
}
if isinstance(value, (list, tuple)):
return [_to_jsonable(item, exclude_none=exclude_none) for item in value if not (exclude_none and item is None)]
return value
def _apply_jq(value: Any, jq_expression: Optional[str]) -> Any:
if not jq_expression:
return value
try:
import jmespath
except ModuleNotFoundError as exc:
raise click.ClickException('jq support requires the jmespath package') from exc
return jmespath.search(jq_expression, _to_jsonable(value))
def _get_bad_request_detail(exc: BadRequestException) -> str:
for candidate in (exc.data, exc.body, exc.reason):
if candidate is None:
continue
if isinstance(candidate, BaseModel):
candidate = candidate.model_dump(mode='json', exclude_none=True)
if isinstance(candidate, str):
try:
candidate = json.loads(candidate)
except json.JSONDecodeError:
if candidate.strip():
return candidate
continue
if isinstance(candidate, dict):
for key in ('detail', 'message', 'error'):
value = candidate.get(key)
if value:
return str(value)
continue
text = str(candidate).strip()
if text:
return text
return str(exc)
def _invoke_api_method(method, *args, **kwargs):
try:
return method(*args, **kwargs)
except BadRequestException as exc:
raise click.ClickException(_get_bad_request_detail(exc)) from exc
def _format_text_value(value: Any) -> str:
if isinstance(value, (dict, list, tuple, BaseModel)):
return json.dumps(_to_jsonable(value, exclude_none=True), ensure_ascii=False, default=str)
return str(value)
def _format_text_item(item: Any) -> str:
if isinstance(item, BaseModel):
payload = item.model_dump(mode='json', exclude_none=True)
elif isinstance(item, dict):
payload = {key: value for key, value in item.items() if value is not None}
elif isinstance(item, (list, tuple)):
return json.dumps(_to_jsonable(item, exclude_none=True), ensure_ascii=False, default=str)
else:
return str(item)
lines = []
for key, value in payload.items():
lines.append(f'{click.style(str(key), bold=True)}: {_format_text_value(value)}')
return '\n'.join(lines)
def _render_result(result: Any, *, json_mode: bool, jq_expression: Optional[str], max_records: Optional[int] = None):
div = False
emitted = 0
for item in _iter_output_items(result):
if max_records is not None and emitted >= max_records:
break
item = _apply_jq(item, jq_expression)
if json_mode:
click.echo(json.dumps(_to_jsonable(item), ensure_ascii=False, default=str))
else:
if div:
click.echo('\n---\n')
else:
div = True
click.echo(_format_text_item(item))
emitted += 1
def make_api_command(api_class: type, command_name: str, target_method: str, *, parent_class: type):
raw_parameters = list(_iter_command_parameters(api_class, target_method))
if len(raw_parameters) == 1:
_, parameter = raw_parameters[0]
request_model_type = _get_request_model_type(parameter.annotation)
if request_model_type is not None:
return _build_model_field_command(
api_class,
command_name,
target_method,
parent_class=parent_class,
model_class=request_model_type,
)
command_parameters = sorted(
[
(name, parameter, *_get_click_type(parameter.annotation, parameter_name=name))
for name, parameter in raw_parameters
],
key=lambda item: item[1].default is not inspect.Signature.empty,
)
namespace = _build_command_namespace(command_name, api_class, target_method)
namespace['__call__'] = _create_standard_command_call(api_class, target_method, command_parameters)
_add_parameter_options(
namespace,
command_parameters,
help_getter=lambda parameter: _get_help_from_annotation(parameter.annotation),
default_getter=lambda item: item[1].default,
)
if target_method.endswith('_iterator'):
namespace['__annotations__']['max_records'] = Optional[int]
namespace['max_records'] = classyclick.Option(
'-m',
type=click.IntRange(min=1),
default=None,
help='Maximum number of records to emit.',
)
return _build_command_class(parent_class, namespace)
def make_api_group(module_name: str, api_class: type) -> type:
group_name = module_name.removesuffix('_api').replace('_', '-')
command_methods = list(_iter_command_methods(api_class))
if len(command_methods) == 1:
_, target_method = command_methods[0]
return make_api_command(api_class, group_name, target_method, parent_class=API.Command)
class ApiGroup(API.SubGroup):
__config__ = classyclick.Group.Config(
name=group_name,
help=f'methods from `{api_class.__name__}`.',
)
for command_name, target_method in command_methods:
make_api_command(api_class, command_name, target_method, parent_class=ApiGroup.Command)
return ApiGroup
for _module_name, _api_class in _iter_api_modules():
make_api_group(_module_name, _api_class)