-
-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathdjango_example.py
More file actions
187 lines (153 loc) · 5.71 KB
/
django_example.py
File metadata and controls
187 lines (153 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import json
from http import HTTPStatus
from django.http import HttpResponse
from django.urls import path
from django.urls import register_converter
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from pydantic import ValidationError
from scim2_models import Context
from scim2_models import Error
from scim2_models import ListResponse
from scim2_models import PatchOp
from scim2_models import ResponseParameters
from scim2_models import SearchRequest
from scim2_models import UniquenessException
from scim2_models import User
from .integrations import delete_record
from .integrations import from_scim_user
from .integrations import get_record
from .integrations import list_records
from .integrations import save_record
from .integrations import to_scim_user
# -- setup-start --
def scim_response(payload, status=HTTPStatus.OK):
"""Build a Django response with the SCIM media type."""
return HttpResponse(
payload,
status=status,
content_type="application/scim+json",
)
# -- setup-end --
# -- refinements-start --
# -- converters-start --
class UserConverter:
regex = "[^/]+"
def to_python(self, id):
try:
return get_record(id)
except KeyError:
raise ValueError
def to_url(self, record):
return record["id"]
register_converter(UserConverter, "user")
# -- converters-end --
# -- validation-helper-start --
def scim_validation_error(error):
"""Turn Pydantic validation errors into a SCIM error response."""
scim_error = Error.from_validation_error(error.errors()[0])
return scim_response(scim_error.model_dump_json(), scim_error.status)
# -- validation-helper-end --
# -- uniqueness-helper-start --
def scim_uniqueness_error(error):
"""Turn uniqueness errors into a SCIM 409 response."""
scim_error = UniquenessException(detail=str(error)).to_error()
return scim_response(scim_error.model_dump_json(), HTTPStatus.CONFLICT)
# -- uniqueness-helper-end --
# -- error-handler-start --
def handler404(request, exception):
"""Turn Django 404 errors into SCIM error responses."""
scim_error = Error(status=404, detail=str(exception))
return scim_response(scim_error.model_dump_json(), HTTPStatus.NOT_FOUND)
# -- error-handler-end --
# -- refinements-end --
# -- endpoints-start --
# -- single-resource-start --
@method_decorator(csrf_exempt, name="dispatch")
class UserView(View):
"""Handle GET, PATCH and DELETE on one SCIM user resource."""
def get(self, request, app_record):
try:
req = ResponseParameters.model_validate(request.GET.dict())
except ValidationError as error:
return scim_validation_error(error)
scim_user = to_scim_user(app_record)
return scim_response(
scim_user.model_dump_json(
scim_ctx=Context.RESOURCE_QUERY_RESPONSE,
attributes=req.attributes,
excluded_attributes=req.excluded_attributes,
)
)
def delete(self, request, app_record):
delete_record(app_record["id"])
return scim_response("", HTTPStatus.NO_CONTENT)
def patch(self, request, app_record):
try:
patch = PatchOp[User].model_validate(
json.loads(request.body),
scim_ctx=Context.RESOURCE_PATCH_REQUEST,
)
except ValidationError as error:
return scim_validation_error(error)
scim_user = to_scim_user(app_record)
patch.patch(scim_user)
updated_record = from_scim_user(scim_user)
try:
save_record(updated_record)
except ValueError as error:
return scim_uniqueness_error(error)
return scim_response(
scim_user.model_dump_json(scim_ctx=Context.RESOURCE_PATCH_RESPONSE)
)
# -- single-resource-end --
# -- collection-start --
@method_decorator(csrf_exempt, name="dispatch")
class UsersView(View):
"""Handle GET and POST on the SCIM users collection."""
def get(self, request):
try:
req = SearchRequest.model_validate(request.GET.dict())
except ValidationError as error:
return scim_validation_error(error)
all_records = list_records()
page = all_records[req.start_index_0 : req.stop_index_0]
resources = [to_scim_user(record) for record in page]
response = ListResponse[User](
total_results=len(all_records),
start_index=req.start_index or 1,
items_per_page=len(resources),
resources=resources,
)
return scim_response(
response.model_dump_json(
scim_ctx=Context.RESOURCE_QUERY_RESPONSE,
attributes=req.attributes,
excluded_attributes=req.excluded_attributes,
)
)
def post(self, request):
try:
request_user = User.model_validate(
json.loads(request.body),
scim_ctx=Context.RESOURCE_CREATION_REQUEST,
)
except ValidationError as error:
return scim_validation_error(error)
app_record = from_scim_user(request_user)
try:
save_record(app_record)
except ValueError as error:
return scim_uniqueness_error(error)
response_user = to_scim_user(app_record)
return scim_response(
response_user.model_dump_json(scim_ctx=Context.RESOURCE_CREATION_RESPONSE),
HTTPStatus.CREATED,
)
urlpatterns = [
path("scim/v2/Users", UsersView.as_view(), name="scim_users"),
path("scim/v2/Users/<user:app_record>", UserView.as_view(), name="scim_user"),
]
# -- collection-end --
# -- endpoints-end --