-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathdecorators.py
More file actions
126 lines (105 loc) · 5.02 KB
/
decorators.py
File metadata and controls
126 lines (105 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from functools import wraps
from django.http import JsonResponse
from django_email_learning.models import OrganizationUser
from django_email_learning.apps import PLATFORM_ADMIN_GROUP_NAME
from django_email_learning.services.jwt_service import decode_jwt, InvalidTokenException
from django_email_learning.models import ApiKey
import typing
def is_platform_admin() -> typing.Callable:
def decorator(view_func: typing.Callable) -> typing.Callable:
@wraps(view_func)
def _wrapped_view(request, *view_args, **view_kwargs) -> JsonResponse: # type: ignore[no-untyped-def]
if not request.user.is_authenticated:
return JsonResponse({"error": "Unauthorized"}, status=401)
if (
not request.user.is_superuser
and not request.user.groups.filter(
name=PLATFORM_ADMIN_GROUP_NAME
).exists()
):
return JsonResponse({"error": "Forbidden"}, status=403)
return view_func(request, *view_args, **view_kwargs)
return _wrapped_view
return decorator
def accessible_for(roles: set[str]) -> typing.Callable:
def decorator(view_func: typing.Callable) -> typing.Callable:
@wraps(view_func)
def _wrapped_view(request, *view_args, **view_kwargs) -> JsonResponse: # type: ignore[no-untyped-def]
user = request.user
if not user.is_authenticated:
return JsonResponse({"error": "Unauthorized"}, status=401)
if not user.is_superuser:
has_access = OrganizationUser.objects.filter( # type: ignore[misc]
user=user,
organization_id=view_kwargs.get("organization_id"),
role__in=roles,
).exists()
if not has_access:
return JsonResponse({"error": "Forbidden"}, status=403)
return view_func(request, *view_args, **view_kwargs)
return _wrapped_view
return decorator
def is_an_organization_member(only_admin: bool = False) -> typing.Callable:
def decorator(view_func: typing.Callable) -> typing.Callable:
@wraps(view_func)
def _wrapped_view(request, *view_args, **view_kwargs) -> JsonResponse: # type: ignore[no-untyped-def]
user = request.user
if not user.is_authenticated:
return JsonResponse({"error": "Unauthorized"}, status=401)
if not user.is_superuser:
qs = OrganizationUser.objects.filter( # type: ignore[misc]
user=user
)
if only_admin:
qs = qs.filter(role="admin")
has_access = qs.exists()
if not has_access:
return JsonResponse({"error": "Forbidden"}, status=403)
return view_func(request, *view_args, **view_kwargs)
return _wrapped_view
return decorator
def check_api_key() -> typing.Callable:
def decorator(view_func: typing.Callable) -> typing.Callable:
@wraps(view_func)
def _wrapped_view(request, *view_args, **view_kwargs) -> JsonResponse: # type: ignore[no-untyped-def]
authorization_header = request.headers.get("Authorization")
if not authorization_header:
return JsonResponse(
{"error": "Authorization header missing"}, status=401
)
authorization_header_parts = authorization_header.split(" ")
if (
len(authorization_header_parts) != 2
or authorization_header_parts[0] != "Bearer"
):
return JsonResponse(
{
"error": "Invalid Authorization header format. Expected: Bearer <API_KEY>"
},
status=401,
)
api_key = authorization_header_parts[1]
try:
key_data = decode_jwt(api_key)
possible_keys = ApiKey.objects.filter(salt=key_data["salt"])
key_matched = False
for possible_key in possible_keys:
key_value = possible_key.decrypt_password(possible_key.key)
if key_value == key_data["key"]:
key_matched = True
break
if not key_matched:
return JsonResponse({"error": "Invalid API key"}, status=401)
except InvalidTokenException:
return JsonResponse({"error": "Invalid Json Web Token"}, status=401)
except KeyError:
return JsonResponse(
{"error": "Json Web Token missing required fields"}, status=401
)
try:
api_key = ApiKey.objects.get(salt=key_data["salt"])
except ApiKey.DoesNotExist:
return JsonResponse({"error": "Invalid API key"}, status=401)
return view_func(request, *view_args, **view_kwargs)
return _wrapped_view
return decorator