Skip to content

Commit 1b57958

Browse files
committed
refactor: move URN validation logic from base.py to scim_object.py
1 parent c5b1b1c commit 1b57958

File tree

4 files changed

+177
-121
lines changed

4 files changed

+177
-121
lines changed

scim2_models/base.py

Lines changed: 3 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from inspect import isclass
2+
from typing import TYPE_CHECKING
23
from typing import Any
34
from typing import Optional
45
from typing import get_args
@@ -28,82 +29,8 @@
2829

2930
from .utils import UNION_TYPES
3031

31-
32-
def validate_model_attribute(model: type["BaseModel"], attribute_base: str) -> None:
33-
"""Validate that an attribute name or a sub-attribute path exist for a given model."""
34-
from scim2_models.base import BaseModel
35-
36-
attribute_name, *sub_attribute_blocks = attribute_base.split(".")
37-
sub_attribute_base = ".".join(sub_attribute_blocks)
38-
39-
aliases = {field.validation_alias for field in model.model_fields.values()}
40-
41-
if normalize_attribute_name(attribute_name) not in aliases:
42-
raise ValueError(
43-
f"Model '{model.__name__}' has no attribute named '{attribute_name}'"
44-
)
45-
46-
if sub_attribute_base:
47-
attribute_type = model.get_field_root_type(attribute_name)
48-
49-
if not attribute_type or not issubclass(attribute_type, BaseModel):
50-
raise ValueError(
51-
f"Attribute '{attribute_name}' is not a complex attribute, and cannot have a '{sub_attribute_base}' sub-attribute"
52-
)
53-
54-
validate_model_attribute(attribute_type, sub_attribute_base)
55-
56-
57-
def extract_schema_and_attribute_base(attribute_urn: str) -> tuple[str, str]:
58-
# Extract the schema urn part and the attribute name part from attribute
59-
# name, as defined in :rfc:`RFC7644 §3.10 <7644#section-3.10>`.
60-
61-
*urn_blocks, attribute_base = attribute_urn.split(":")
62-
schema = ":".join(urn_blocks)
63-
return schema, attribute_base
64-
65-
66-
def validate_attribute_urn(
67-
attribute_name: str,
68-
default_resource: Optional[type["BaseModel"]] = None,
69-
resource_types: Optional[list[type["BaseModel"]]] = None,
70-
) -> str:
71-
"""Validate that an attribute urn is valid or not.
72-
73-
:param attribute_name: The attribute urn to check.
74-
:default_resource: The default resource if `attribute_name` is not an absolute urn.
75-
:resource_types: The available resources in which to look for the attribute.
76-
:return: The normalized attribute URN.
77-
"""
78-
from scim2_models.rfc7643.resource import Resource
79-
80-
if not resource_types:
81-
resource_types = []
82-
83-
if default_resource and default_resource not in resource_types:
84-
resource_types.append(default_resource)
85-
86-
default_schema = (
87-
default_resource.model_fields["schemas"].default[0]
88-
if default_resource
89-
else None
90-
)
91-
92-
schema: Optional[Any]
93-
schema, attribute_base = extract_schema_and_attribute_base(attribute_name)
94-
if not schema:
95-
schema = default_schema
96-
97-
if not schema:
98-
raise ValueError("No default schema and relative URN")
99-
100-
resource = Resource.get_by_schema(resource_types, schema)
101-
if not resource:
102-
raise ValueError(f"No resource matching schema '{schema}'")
103-
104-
validate_model_attribute(resource, attribute_base)
105-
106-
return f"{schema}:{attribute_base}"
32+
if TYPE_CHECKING:
33+
pass
10734

10835

10936
def contains_attribute_or_subattributes(

scim2_models/rfc7643/resource.py

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
from ..attributes import is_complex_attribute
2626
from ..base import BaseModel
2727
from ..base import BaseModelType
28+
from ..context import Context
2829
from ..reference import Reference
2930
from ..scim_object import ScimObject
31+
from ..scim_object import validate_attribute_urn
3032
from ..utils import UNION_TYPES
3133
from ..utils import normalize_attribute_name
3234

@@ -208,15 +210,17 @@ def get_extension_model(
208210

209211
@staticmethod
210212
def get_by_schema(
211-
resource_types: list[type[BaseModel]], schema: str, with_extensions: bool = True
212-
) -> Optional[type]:
213+
resource_types: list[type["Resource"]],
214+
schema: str,
215+
with_extensions: bool = True,
216+
) -> Optional[Union[type["Resource"], type["Extension"]]]:
213217
"""Given a resource type list and a schema, find the matching resource type."""
214-
by_schema = {
218+
by_schema: dict[str, Union[type[Resource], type[Extension]]] = {
215219
resource_type.model_fields["schemas"].default[0].lower(): resource_type
216220
for resource_type in (resource_types or [])
217221
}
218222
if with_extensions:
219-
for resource_type in list(by_schema.values()):
223+
for resource_type in resource_types:
220224
by_schema.update(
221225
{
222226
schema.lower(): extension
@@ -228,7 +232,7 @@ def get_by_schema(
228232

229233
@staticmethod
230234
def get_by_payload(
231-
resource_types: list[type], payload: dict, **kwargs: Any
235+
resource_types: list[type["Resource"]], payload: dict, **kwargs: Any
232236
) -> Optional[type]:
233237
"""Given a resource type list and a payload, find the matching resource type."""
234238
if not payload or not payload.get("schemas"):
@@ -260,6 +264,74 @@ def from_schema(cls, schema: "Schema") -> type["Resource"]:
260264

261265
return make_python_model(schema, cls)
262266

267+
def _prepare_model_dump(
268+
self,
269+
scim_ctx: Optional[Context] = Context.DEFAULT,
270+
attributes: Optional[list[str]] = None,
271+
excluded_attributes: Optional[list[str]] = None,
272+
**kwargs: Any,
273+
) -> dict[str, Any]:
274+
kwargs = super()._prepare_model_dump(scim_ctx, **kwargs)
275+
kwargs["context"]["scim_attributes"] = [
276+
validate_attribute_urn(attribute, self.__class__)
277+
for attribute in (attributes or [])
278+
]
279+
kwargs["context"]["scim_excluded_attributes"] = [
280+
validate_attribute_urn(attribute, self.__class__)
281+
for attribute in (excluded_attributes or [])
282+
]
283+
return kwargs
284+
285+
def model_dump(
286+
self,
287+
*args: Any,
288+
scim_ctx: Optional[Context] = Context.DEFAULT,
289+
attributes: Optional[list[str]] = None,
290+
excluded_attributes: Optional[list[str]] = None,
291+
**kwargs: Any,
292+
) -> dict:
293+
"""Create a model representation that can be included in SCIM messages by using Pydantic :code:`BaseModel.model_dump`.
294+
295+
:param scim_ctx: If a SCIM context is passed, some default values of
296+
Pydantic :code:`BaseModel.model_dump` are tuned to generate valid SCIM
297+
messages. Pass :data:`None` to get the default Pydantic behavior.
298+
:param attributes: A multi-valued list of strings indicating the names of resource
299+
attributes to return in the response, overriding the set of attributes that
300+
would be returned by default.
301+
:param excluded_attributes: A multi-valued list of strings indicating the names of resource
302+
attributes to be removed from the default set of attributes to return.
303+
"""
304+
dump_kwargs = self._prepare_model_dump(
305+
scim_ctx, attributes, excluded_attributes, **kwargs
306+
)
307+
if scim_ctx:
308+
dump_kwargs.setdefault("mode", "json")
309+
return super(ScimObject, self).model_dump(*args, **dump_kwargs)
310+
311+
def model_dump_json(
312+
self,
313+
*args: Any,
314+
scim_ctx: Optional[Context] = Context.DEFAULT,
315+
attributes: Optional[list[str]] = None,
316+
excluded_attributes: Optional[list[str]] = None,
317+
**kwargs: Any,
318+
) -> str:
319+
"""Create a JSON model representation that can be included in SCIM messages by using Pydantic :code:`BaseModel.model_dump_json`.
320+
321+
:param scim_ctx: If a SCIM context is passed, some default values of
322+
Pydantic :code:`BaseModel.model_dump` are tuned to generate valid SCIM
323+
messages. Pass :data:`None` to get the default Pydantic behavior.
324+
:param attributes: A multi-valued list of strings indicating the names of resource
325+
attributes to return in the response, overriding the set of attributes that
326+
would be returned by default.
327+
:param excluded_attributes: A multi-valued list of strings indicating the names of resource
328+
attributes to be removed from the default set of attributes to return.
329+
"""
330+
dump_kwargs = self._prepare_model_dump(
331+
scim_ctx, attributes, excluded_attributes, **kwargs
332+
)
333+
return super(ScimObject, self).model_dump_json(*args, **dump_kwargs)
334+
263335

264336
AnyResource = TypeVar("AnyResource", bound="Resource")
265337

scim2_models/scim_object.py

Lines changed: 83 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,93 @@
11
"""Base SCIM object classes with schema identification."""
22

3+
from typing import TYPE_CHECKING
34
from typing import Annotated
45
from typing import Any
56
from typing import Optional
67

78
from .annotations import Required
89
from .base import BaseModel
9-
from .base import validate_attribute_urn
1010
from .context import Context
11+
from .utils import normalize_attribute_name
12+
13+
if TYPE_CHECKING:
14+
from .rfc7643.resource import Resource
15+
16+
17+
def validate_model_attribute(model: type["BaseModel"], attribute_base: str) -> None:
18+
"""Validate that an attribute name or a sub-attribute path exist for a given model."""
19+
attribute_name, *sub_attribute_blocks = attribute_base.split(".")
20+
sub_attribute_base = ".".join(sub_attribute_blocks)
21+
22+
aliases = {field.validation_alias for field in model.model_fields.values()}
23+
24+
if normalize_attribute_name(attribute_name) not in aliases:
25+
raise ValueError(
26+
f"Model '{model.__name__}' has no attribute named '{attribute_name}'"
27+
)
28+
29+
if sub_attribute_base:
30+
attribute_type = model.get_field_root_type(attribute_name)
31+
32+
if not attribute_type or not issubclass(attribute_type, BaseModel):
33+
raise ValueError(
34+
f"Attribute '{attribute_name}' is not a complex attribute, and cannot have a '{sub_attribute_base}' sub-attribute"
35+
)
36+
37+
validate_model_attribute(attribute_type, sub_attribute_base)
38+
39+
40+
def extract_schema_and_attribute_base(attribute_urn: str) -> tuple[str, str]:
41+
"""Extract the schema urn part and the attribute name part from attribute name.
42+
43+
As defined in :rfc:`RFC7644 §3.10 <7644#section-3.10>`.
44+
"""
45+
*urn_blocks, attribute_base = attribute_urn.split(":")
46+
schema = ":".join(urn_blocks)
47+
return schema, attribute_base
48+
49+
50+
def validate_attribute_urn(
51+
attribute_name: str,
52+
default_resource: Optional[type["Resource"]] = None,
53+
resource_types: Optional[list[type["Resource"]]] = None,
54+
) -> str:
55+
"""Validate that an attribute urn is valid or not.
56+
57+
:param attribute_name: The attribute urn to check.
58+
:default_resource: The default resource if `attribute_name` is not an absolute urn.
59+
:resource_types: The available resources in which to look for the attribute.
60+
:return: The normalized attribute URN.
61+
"""
62+
from .rfc7643.resource import Resource
63+
64+
if not resource_types:
65+
resource_types = []
66+
67+
if default_resource and default_resource not in resource_types:
68+
resource_types.append(default_resource)
69+
70+
default_schema = (
71+
default_resource.model_fields["schemas"].default[0]
72+
if default_resource
73+
else None
74+
)
75+
76+
schema: Optional[Any]
77+
schema, attribute_base = extract_schema_and_attribute_base(attribute_name)
78+
if not schema:
79+
schema = default_schema
80+
81+
if not schema:
82+
raise ValueError("No default schema and relative URN")
83+
84+
resource = Resource.get_by_schema(resource_types, schema)
85+
if not resource:
86+
raise ValueError(f"No resource matching schema '{schema}'")
87+
88+
validate_model_attribute(resource, attribute_base)
89+
90+
return f"{schema}:{attribute_base}"
1191

1292

1393
class ScimObject(BaseModel):
@@ -17,46 +97,19 @@ class ScimObject(BaseModel):
1797
SCIM schemas that define the attributes present in the current JSON
1898
structure."""
1999

20-
def _prepare_model_dump(
21-
self,
22-
scim_ctx: Optional[Context] = Context.DEFAULT,
23-
attributes: Optional[list[str]] = None,
24-
excluded_attributes: Optional[list[str]] = None,
25-
**kwargs: Any,
26-
) -> dict[str, Any]:
27-
kwargs = super()._prepare_model_dump(scim_ctx, **kwargs)
28-
kwargs["context"]["scim_attributes"] = [
29-
validate_attribute_urn(attribute, self.__class__)
30-
for attribute in (attributes or [])
31-
]
32-
kwargs["context"]["scim_excluded_attributes"] = [
33-
validate_attribute_urn(attribute, self.__class__)
34-
for attribute in (excluded_attributes or [])
35-
]
36-
return kwargs
37-
38100
def model_dump(
39101
self,
40102
*args: Any,
41103
scim_ctx: Optional[Context] = Context.DEFAULT,
42-
attributes: Optional[list[str]] = None,
43-
excluded_attributes: Optional[list[str]] = None,
44104
**kwargs: Any,
45105
) -> dict:
46106
"""Create a model representation that can be included in SCIM messages by using Pydantic :code:`BaseModel.model_dump`.
47107
48108
:param scim_ctx: If a SCIM context is passed, some default values of
49109
Pydantic :code:`BaseModel.model_dump` are tuned to generate valid SCIM
50110
messages. Pass :data:`None` to get the default Pydantic behavior.
51-
:param attributes: A multi-valued list of strings indicating the names of resource
52-
attributes to return in the response, overriding the set of attributes that
53-
would be returned by default.
54-
:param excluded_attributes: A multi-valued list of strings indicating the names of resource
55-
attributes to be removed from the default set of attributes to return.
56111
"""
57-
dump_kwargs = self._prepare_model_dump(
58-
scim_ctx, attributes, excluded_attributes, **kwargs
59-
)
112+
dump_kwargs = self._prepare_model_dump(scim_ctx, **kwargs)
60113
if scim_ctx:
61114
dump_kwargs.setdefault("mode", "json")
62115
return super(BaseModel, self).model_dump(*args, **dump_kwargs)
@@ -65,22 +118,13 @@ def model_dump_json(
65118
self,
66119
*args: Any,
67120
scim_ctx: Optional[Context] = Context.DEFAULT,
68-
attributes: Optional[list[str]] = None,
69-
excluded_attributes: Optional[list[str]] = None,
70121
**kwargs: Any,
71122
) -> str:
72123
"""Create a JSON model representation that can be included in SCIM messages by using Pydantic :code:`BaseModel.model_dump_json`.
73124
74125
:param scim_ctx: If a SCIM context is passed, some default values of
75126
Pydantic :code:`BaseModel.model_dump` are tuned to generate valid SCIM
76127
messages. Pass :data:`None` to get the default Pydantic behavior.
77-
:param attributes: A multi-valued list of strings indicating the names of resource
78-
attributes to return in the response, overriding the set of attributes that
79-
would be returned by default.
80-
:param excluded_attributes: A multi-valued list of strings indicating the names of resource
81-
attributes to be removed from the default set of attributes to return.
82128
"""
83-
dump_kwargs = self._prepare_model_dump(
84-
scim_ctx, attributes, excluded_attributes, **kwargs
85-
)
129+
dump_kwargs = self._prepare_model_dump(scim_ctx, **kwargs)
86130
return super(BaseModel, self).model_dump_json(*args, **dump_kwargs)

0 commit comments

Comments
 (0)