-
-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathschema.py
More file actions
293 lines (240 loc) · 10.1 KB
/
Copy pathschema.py
File metadata and controls
293 lines (240 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import re
from datetime import datetime
from enum import Enum
from typing import Annotated
from typing import Any
from typing import List # noqa : UP005,UP035
from typing import Literal
from typing import Optional
from typing import TypeVar
from typing import Union
from typing import get_origin
from pydantic import Field
from pydantic import create_model
from pydantic import field_validator
from pydantic.alias_generators import to_pascal
from pydantic.alias_generators import to_snake
from pydantic_core import Url
from ..annotations import CaseExact
from ..annotations import Mutability
from ..annotations import Required
from ..annotations import Returned
from ..annotations import Uniqueness
from ..attributes import ComplexAttribute
from ..attributes import is_complex_attribute
from ..base import BaseModel
from ..constants import RESERVED_WORDS
from ..reference import ExternalReference
from ..reference import Reference
from ..reference import URIReference
from ..utils import Base64Bytes
from ..utils import _normalize_attribute_name
from .resource import Resource
T = TypeVar("T", bound=BaseModel)
def _make_python_identifier(identifier: str) -> str:
"""Sanitize string to be a suitable Python/Pydantic class attribute name."""
sanitized = re.sub(r"\W|^(?=\d)", "", identifier)
if sanitized in RESERVED_WORDS:
sanitized = f"{sanitized}_"
return sanitized
def _make_python_model(
obj: Union["Schema", "Attribute"],
base: type[T],
) -> type[T]:
"""Build a Python model from a Schema or an Attribute object."""
if isinstance(obj, Attribute):
pydantic_attributes = {
to_snake(_make_python_identifier(attr.name)): attr._to_python()
for attr in (obj.sub_attributes or [])
if attr.name
}
else:
pydantic_attributes = {
to_snake(_make_python_identifier(attr.name)): attr._to_python()
for attr in (obj.attributes or [])
if attr.name
}
pydantic_attributes["schemas"] = (
Annotated[list[str], Required.true],
Field(default=[obj.id]),
)
if not obj.name:
raise ValueError("Schema or Attribute 'name' must be defined")
model_name = to_pascal(to_snake(obj.name))
model: type[T] = create_model(model_name, __base__=base, **pydantic_attributes) # type: ignore[call-overload]
# Set the ComplexType class as a member of the model
# e.g. make Member an attribute of Group
for attr_name in model.model_fields:
attr_type = model.get_field_root_type(attr_name)
if attr_type and is_complex_attribute(attr_type):
setattr(model, attr_type.__name__, attr_type)
return model
class Attribute(ComplexAttribute):
class Type(str, Enum):
string = "string"
complex = "complex"
boolean = "boolean"
decimal = "decimal"
integer = "integer"
date_time = "dateTime"
reference = "reference"
binary = "binary"
def _to_python(
self,
reference_types: Optional[list[str]] = None,
) -> type:
if self.value == self.reference and reference_types is not None:
if reference_types == ["external"]:
return Reference[ExternalReference]
if reference_types == ["uri"]:
return Reference[URIReference]
types = tuple(Literal[t] for t in reference_types)
return Reference[Union[types]] # type: ignore
attr_types = {
self.string: str,
self.boolean: bool,
self.decimal: float,
self.integer: int,
self.date_time: datetime,
self.binary: Base64Bytes,
self.complex: ComplexAttribute,
}
return attr_types[self.value]
@classmethod
def from_python(cls, pytype: type) -> "Attribute.Type":
if get_origin(pytype) == Reference:
return cls.reference
if pytype and is_complex_attribute(pytype):
return cls.complex
if pytype in (Required, CaseExact):
return cls.boolean
attr_types = {
str: cls.string,
bool: cls.boolean,
float: cls.decimal,
int: cls.integer,
datetime: cls.date_time,
Base64Bytes: cls.binary,
}
return attr_types.get(pytype, cls.string)
name: Annotated[
Optional[str], Mutability.read_only, Required.true, CaseExact.true
] = None
"""The attribute's name."""
type: Annotated[Optional[Type], Mutability.read_only, Required.true] = Field(
None, examples=[item.value for item in Type]
)
"""The attribute's data type."""
multi_valued: Annotated[Optional[bool], Mutability.read_only, Required.true] = None
"""A Boolean value indicating the attribute's plurality."""
description: Annotated[
Optional[str], Mutability.read_only, Required.false, CaseExact.true
] = None
"""The attribute's human-readable description."""
required: Annotated[Required, Mutability.read_only, Required.false] = Required.false
"""A Boolean value that specifies whether or not the attribute is
required."""
canonical_values: Annotated[
Optional[list[str]], Mutability.read_only, CaseExact.true
] = None
"""A collection of suggested canonical values that MAY be used (e.g.,
"work" and "home")."""
case_exact: Annotated[CaseExact, Mutability.read_only, Required.false] = (
CaseExact.false
)
"""A Boolean value that specifies whether or not a string attribute is case
sensitive."""
mutability: Annotated[
Mutability, Mutability.read_only, Required.false, CaseExact.true
] = Field(Mutability.read_write, examples=[item.value for item in Mutability])
"""A single keyword indicating the circumstances under which the value of
the attribute can be (re)defined."""
returned: Annotated[
Returned, Mutability.read_only, Required.false, CaseExact.true
] = Field(Returned.default, examples=[item.value for item in Returned])
"""A single keyword that indicates when an attribute and associated values
are returned in response to a GET request or in response to a PUT, POST, or
PATCH request."""
uniqueness: Annotated[
Uniqueness, Mutability.read_only, Required.false, CaseExact.true
] = Field(Uniqueness.none, examples=[item.value for item in Uniqueness])
"""A single keyword value that specifies how the service provider enforces
uniqueness of attribute values."""
reference_types: Annotated[
Optional[list[str]], Mutability.read_only, Required.false, CaseExact.true
] = None
"""A multi-valued array of JSON strings that indicate the SCIM resource
types that may be referenced."""
# for python 3.9 and 3.10 compatibility, this should be 'list' and not 'List'
sub_attributes: Annotated[Optional[List["Attribute"]], Mutability.read_only] = None # noqa: UP006
"""When an attribute is of type "complex", "subAttributes" defines a set of
sub-attributes."""
def _to_python(self) -> Optional[tuple[Any, Any]]:
"""Build tuple suited to be passed to pydantic 'create_model'."""
if not self.name or not self.type:
return None
attr_type = self.type._to_python(self.reference_types)
if attr_type == ComplexAttribute:
attr_type = _make_python_model(obj=self, base=attr_type)
if self.multi_valued:
attr_type = list[attr_type] # type: ignore
annotation = Annotated[
Optional[attr_type], # type: ignore
self.required,
self.case_exact,
self.mutability,
self.returned,
self.uniqueness,
]
field = Field(
description=self.description,
examples=self.canonical_values,
serialization_alias=self.name,
validation_alias=_normalize_attribute_name(self.name),
default=None,
)
return annotation, field
def get_attribute(self, attribute_name: str) -> Optional["Attribute"]:
"""Find an attribute by its name."""
for sub_attribute in self.sub_attributes or []:
if sub_attribute.name == attribute_name:
return sub_attribute
return None
def __getitem__(self, name: str) -> "Attribute":
"""Find an attribute by its name."""
if attribute := self.get_attribute(name):
return attribute
raise KeyError(f"This attribute has no '{name}' sub-attribute")
class Schema(Resource[Any]):
schemas: Annotated[list[str], Required.true] = [
"urn:ietf:params:scim:schemas:core:2.0:Schema"
]
id: Annotated[Optional[str], Mutability.read_only, Required.true] = None
"""The unique URI of the schema."""
name: Annotated[
Optional[str], Mutability.read_only, Returned.default, Required.true
] = None
"""The schema's human-readable name."""
description: Annotated[Optional[str], Mutability.read_only, Returned.default] = None
"""The schema's human-readable description."""
attributes: Annotated[
Optional[list[Attribute]], Mutability.read_only, Required.true
] = None
"""A complex type that defines service provider attributes and their
qualities via the following set of sub-attributes."""
@field_validator("id")
@classmethod
def urn_id(cls, value: str) -> str:
"""Ensure that schema ids are URI, as defined in RFC7643 §7."""
return str(Url(value))
def get_attribute(self, attribute_name: str) -> Optional[Attribute]:
"""Find an attribute by its name."""
for attribute in self.attributes or []:
if attribute.name == attribute_name:
return attribute
return None
def __getitem__(self, name: str) -> "Attribute": # type: ignore[override]
"""Find an attribute by its name."""
if attribute := self.get_attribute(name):
return attribute
raise KeyError(f"This schema has no '{name}' attribute")