-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmiddleware.py
More file actions
131 lines (101 loc) · 3.73 KB
/
middleware.py
File metadata and controls
131 lines (101 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import jwt
from channels.db import database_sync_to_async
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import AuthenticationFailed
User = get_user_model()
class TokenAuthentication:
"""
Simple token based authentication.
Clients should authenticate by passing the token key in the query parameters.
For example:
?token=401f7ac837da42b97f613d789819ff93537bee6a
"""
model = None
def get_model(self):
if self.model is not None:
return self.model
from rest_framework.authtoken.models import Token
return Token
"""
A custom token model may be used, but must have the following properties.
* key -- The string identifying the token
* user -- The user to which the token belongs
"""
def authenticate_credentials(self, key):
model = self.get_model()
try:
token = model.objects.select_related("user").get(key=key)
except model.DoesNotExist:
raise AuthenticationFailed(_("Invalid token."))
if not token.user.is_active:
raise AuthenticationFailed(_("User inactive or deleted."))
return token.user
def authenticate(self, token):
"""
Returns a `User` if a correct username and password have been supplied
Args:
token: token key
Returns:
User: A user instance.
"""
try:
user_id = jwt.decode(
jwt=token, key=settings.SECRET_KEY, algorithms=["HS256"]
)["user_id"]
except jwt.exceptions.DecodeError:
raise AuthenticationFailed(_("Invalid token."))
except jwt.exceptions.ExpiredSignatureError:
raise AuthenticationFailed(_("Token expired."))
user = User.objects.get(pk=user_id)
return user
@database_sync_to_async
def get_user(scope):
"""
Return the user model instance associated with the given scope.
If no user is retrieved, return an instance of `AnonymousUser`.
"""
# postpone model import to avoid ImproperlyConfigured error before Django
# setup is complete.
from django.contrib.auth.models import AnonymousUser
if "token" not in scope:
raise ValueError(
"Cannot find token in scope. You should wrap your consumer in "
"TokenAuthMiddleware."
)
token = scope["token"]
user = None
try:
auth = TokenAuthentication()
user = auth.authenticate(token)
except AuthenticationFailed:
pass
return user or AnonymousUser()
class TokenAuthMiddleware:
"""
Custom middleware that takes a token from WebSocket subprotocols and authenticates via JWT.
"""
SUBPROTOCOL_KEYWORD = "Bearer"
def __init__(self, app):
# Store the ASGI application we were passed
self.app = app
async def __call__(self, scope, receive, send):
# Extract token from Sec-WebSocket-Protocol header.
token = self._extract_token_from_subprotocol(scope.get("subprotocols", []))
if token:
scope["token"] = token
scope["user"] = await get_user(scope)
else:
from django.contrib.auth.models import AnonymousUser
scope["user"] = AnonymousUser()
return await self.app(scope, receive, send)
def _extract_token_from_subprotocol(self, subprotocols: list[str]) -> str | None:
"""
Expect subprotocols in the form ["Bearer", "<JWT>"].
"""
if not subprotocols:
return None
if len(subprotocols) >= 2 and subprotocols[0] == self.SUBPROTOCOL_KEYWORD:
return subprotocols[1]
return None