-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathutils.py
More file actions
374 lines (317 loc) · 14.2 KB
/
utils.py
File metadata and controls
374 lines (317 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
import logging
import re
from base64 import b64decode
from http import HTTPStatus
import jwt
from django.contrib.auth import get_user_model
from django.db import transaction
from django.http import HttpRequest
from django.http.response import JsonResponse
from django.utils import timezone
from oauth2_provider.models import (
AccessToken,
RefreshToken,
get_access_token_model,
get_application_model,
get_refresh_token_model,
)
from oauthlib.oauth2.rfc6749.errors import (
InvalidClientError,
InvalidGrantError,
InvalidRequestError,
)
from apps.authorization.models import DataAccessGrant
from apps.constants import (
APPLICATION_ONE_TIME_REFRESH_NOT_ALLOWED_MESG,
APPLICATION_TEMPORARILY_INACTIVE,
APPLICATION_THIRTEEN_MONTH_DATA_ACCESS_EXPIRED_MESG,
HHS_SERVER_LOGNAME_FMT,
)
from apps.dot_ext.constants import APPLICATION_THIRTEEN_MONTH_DATA_ACCESS_NOT_FOUND_MESG
from apps.dot_ext.models import Application
from apps.versions import VersionNotMatched, Versions
User = get_user_model()
log = logging.getLogger(HHS_SERVER_LOGNAME_FMT.format(__name__))
def remove_application_user_pair_tokens_data_access(application, user):
"""
Utility function to revoke and delete current application/user pair
access_token, refresh_token and DataAccessGrant records.
This is to be used related to changes in a choice for a beneficiary
to not share or an application to not require demographic scopes.
This ensures that previously created tokens, with full scopes, can not
be used when there is a change.
RETURN:
access_token_delete_cnt = Access tokens deleted.
refresh_token_delete_cnt = Refresh tokens deleted.
data_access_grant_delete_cnt = DataAccessGrant records deleted (0 or 1).
CALLED FROM:
apps.dot_ext.views.authorization.authorization.AuthorizationView.form_valid()
"""
with transaction.atomic():
# Get count of access tokens to be deleted.
access_token_delete_cnt = AccessToken.objects.filter(application=application, user=user).count()
# Delete DataAccessGrant record.
# NOTE: This also revokes/deletes access and only revokes refresh tokens via signal function.
data_access_grant_delete_cnt = DataAccessGrant.objects.filter(
application=application, beneficiary=user
).delete()[0]
# Delete refresh token records
refresh_token_delete_cnt = RefreshToken.objects.filter(application=application, user=user).delete()[0]
return (
data_access_grant_delete_cnt,
access_token_delete_cnt,
refresh_token_delete_cnt,
)
def get_application_from_meta(request) -> Application | None:
"""
Utility function to application from auth header.
This method will pull either the access token
or the client id from the header and use that
value to retrieve the application.
RETURN:
application or None
"""
request_meta = getattr(request, 'META', None)
client_id, ac = None, None
Application = get_application_model()
app = None
if request_meta:
auth_header = request_meta.get('HTTP_AUTHORIZATION', None)
if not auth_header:
auth_header = request_meta.get('Authorization', None)
if auth_header:
if 'Bearer' in auth_header:
ac = AccessToken.objects.get(token=auth_header.split(' ')[1])
else:
encoded_credentials = auth_header.split(' ')[1] # Removes 'Basic ' to isolate credentials
decoded_credentials = b64decode(encoded_credentials).decode('utf-8').split(':')
client_id = decoded_credentials[0]
try:
if client_id is not None:
app = Application.objects.get(client_id=client_id)
elif ac is not None:
app = Application.objects.get(id=ac.application_id)
except Application.DoesNotExist:
raise InvalidClientError(description='Application does not exist')
return app
def get_application_from_data(request):
"""
Utility function to get application from POST/GET data.
This method will pull the client_id, access token,
or refresh token from the request data and use that
value to retrieve the application.
RETURN:
application or None
"""
client_id, ac, rt, app = None, None, None, None
Application = get_application_model()
# Try and get the application via `client_id`
# If the client id comes in via GET or POST, we can try and look
# up the application via the client_id. If we find it, return it.
# If not, we have a bad request, because a client_id was present,
# but malformed in some way.
if request.GET.get('client_id'):
client_id = request.GET.get('client_id')
elif request.POST.get('client_id'):
client_id = request.POST.get('client_id')
if request.POST.get('client_assertion'):
# for client credentials flow, we need to get the client_id from the client_assertion
try:
token = request.POST.get('client_assertion')
auth_jwt = jwt.decode(token, options={'verify_signature': False})
client_assertion_client_id = auth_jwt.get('iss')
if client_id:
if client_id != client_assertion_client_id:
raise InvalidRequestError(
description='client_id param did not match client_id in JWT',
status_code=HTTPStatus.BAD_REQUEST,
)
else:
client_id = client_assertion_client_id
except jwt.PyJWTError:
raise InvalidRequestError(
description='Malformed client_assertion',
status_code=HTTPStatus.BAD_REQUEST,
)
try:
if client_id:
app = Application.objects.get(client_id=client_id)
return app
except Application.DoesNotExist:
raise InvalidClientError(
description='Application does not exist (client_id)',
status_code=HTTPStatus.BAD_REQUEST,
)
# Try via token
# If we manage to find an access token, but then not an application, we
# have a problem, and should return an error.
if request.POST.get('token', None):
ac = AccessToken.objects.get(token=request.POST.get('token', None))
try:
if ac is not None:
app = Application.objects.get(id=ac.application_id)
return app
except Application.DoesNotExist:
raise InvalidClientError(
description='Application does not exist (token)',
status_code=HTTPStatus.BAD_REQUEST,
)
# Try via refresh_token
# Finally, if we have a refresh token, but cannot find an app, that's not good.
if request.POST.get('refresh_token'):
rt = RefreshToken.objects.get(token=request.POST.get('refresh_token', None))
try:
if rt is not None:
app = Application.objects.get(id=rt.application_id)
return app
except Application.DoesNotExist:
raise InvalidClientError(
description='Application does not exist (refresh_token)',
status_code=HTTPStatus.BAD_REQUEST,
)
# Return None if we get to this point, partially to match behavior expected in tests.
return None
def get_application_from_request(request):
meta_app = get_application_from_meta(request)
data_app = get_application_from_data(request)
if meta_app and data_app and meta_app != data_app:
raise InvalidRequestError(
description='different app in headers than in request data',
status_code=HTTPStatus.BAD_REQUEST,
)
if meta_app is None:
return data_app
else:
return meta_app
def validate_app_is_active(request: HttpRequest) -> Application:
"""
Utility function to check that an application is an active, valid application.
This method will pull the application from the request and then check the active flag and the
data access grant (DA) validity.
Args:
request (HttpRequest): Django HttpRequest object
Raises:
InvalidClientError: Application can't refresh or isn't active
InvalidGrantError: Could not find a corresponding DAG, or DAG has expired
InvalidRequestError: Missing refresh token parameter
Returns:
Model: Application model
"""
app = get_application_from_request(request)
if not app:
raise InvalidClientError('App id failed')
# revoked access and expired auth period to a 401 error
if app.active:
# Is this for a token refresh request?
post_grant_type = request.POST.get('grant_type', None)
if post_grant_type == 'refresh_token':
# A ONE_TIME token is not allowed to be refreshed.
# In that instance, we raise an error that explicitly
# indicates that the user must re-authenticate.
# This is a FORBIDDEN error for the API consumer.
if app.has_one_time_only_data_access():
raise InvalidClientError(
description=APPLICATION_ONE_TIME_REFRESH_NOT_ALLOWED_MESG,
status_code=HTTPStatus.FORBIDDEN,
)
refresh_code = request.POST.get('refresh_token', None)
try:
refresh_token = RefreshToken.objects.get(token=refresh_code)
dag = DataAccessGrant.objects.get(beneficiary=refresh_token.user, application=app)
if dag:
# If we get a DAG, but it has expired, we pass back a message (again)
# saying the end user must re-authenticate.
if dag.has_expired():
# https://www.rfc-editor.org/rfc/rfc6750#section-3.1
# We will return a 401 (UNAUTHORIZED) because this is in keeping
# with the OAuth RFC.
raise InvalidGrantError(
description=APPLICATION_THIRTEEN_MONTH_DATA_ACCESS_EXPIRED_MESG,
status_code=HTTPStatus.UNAUTHORIZED,
)
except DataAccessGrant.DoesNotExist:
# In the event that we cannot find a DAG, we don't want to pass back too much information.
# We pass back a FORBIDDEN and a message saying as much (and, again, encouraging
# reauthentication).
raise InvalidGrantError(
description=APPLICATION_THIRTEEN_MONTH_DATA_ACCESS_NOT_FOUND_MESG,
status_code=HTTPStatus.FORBIDDEN,
)
except RefreshToken.DoesNotExist:
raise InvalidRequestError(
description='Missing refresh token parameter',
status_code=HTTPStatus.BAD_REQUEST,
)
else:
raise InvalidClientError(
description=APPLICATION_TEMPORARILY_INACTIVE.format(app.name),
status_code=HTTPStatus.FORBIDDEN,
)
return app
def json_response_from_oauth2_error(error):
"""
Given a oauthlib.oauth2.rfc6749.errors.* error this function
returns a corresponding django.http.response.JsonResponse response
"""
ret_data = {'status_code': error.status_code, 'error': error.error}
# Add optional description
if getattr(error, 'description', None):
ret_data['error_description'] = error.description
return JsonResponse(ret_data, status=error.status_code)
def get_api_version_number_from_url(url_path: str) -> int:
"""Utility function to extract what version of the API a URL is
If there are multiple occurrences of 'v{{VERSION}} in a url path,
only return the first one
EX. /v2/o/authorize will return v2.
Args:
url_path (str): The url being called that we want to extract the api version
Returns:
Optional[str]: Returns a string of v2
"""
match = re.search(r'/v(\d+)/', url_path, re.IGNORECASE)
if match:
version = int(match.group(1))
if version in Versions.supported_versions():
return version
else:
raise VersionNotMatched(f'{version} extracted from {url_path}')
return Versions.NOT_AN_API_VERSION
def validate_latin_extended_string(text: str) -> bool:
"""Checks if a string has all values (and at least one value) that fall within ascii, latin supplement, and extended:
https://en.wikipedia.org/wiki/Latin_Extended-A
Args:
text (str): the text to check
Returns:
bool: if all strings are encoded less than U+017F (383) and it is not empty
"""
return all(ord(char) <= 383 for char in text) and bool(text)
def revoke_prior_tokens_for_user_and_app_if_they_exist(user_id: int, app_id: int) -> None:
"""Revoke prior tokens for a user/app id pair to ensure that if a user has reauthorized
that prior tokens can't be used, in case any of those prior tokens have more scopes than
the newly created one
Args:
user_id (int): ID for the user who just re-authorized an app they have authorized previously
app_id (int): ID for the application the user just re-authorized for
"""
AccessToken = get_access_token_model()
RefreshToken = get_refresh_token_model()
prior_access_tokens = list(AccessToken.objects.filter(user=user_id, application=app_id).order_by('-created'))
# If there is only one access token for a user_id/app_id, we don't need to revoke any prior tokens
if len(prior_access_tokens) <= 1:
return
prior_access_tokens.pop(0)
for access_token in prior_access_tokens:
try:
refresh_token = get_refresh_token_model().objects.get(access_token=access_token.id)
# Only update the access token expires value if it is in the future
if access_token.expires > timezone.now():
access_token.expires = timezone.now()
access_token.save()
if refresh_token.revoked is None:
refresh_token.revoked = timezone.now()
refresh_token.access_token_id = None
refresh_token.save()
except RefreshToken.DoesNotExist:
# indicates it is a access token created via CAN flow, as it does not have an associated refresh token
access_token.expires = timezone.now()
access_token.save()