-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathviews.py
More file actions
187 lines (163 loc) · 6.69 KB
/
views.py
File metadata and controls
187 lines (163 loc) · 6.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import json
from django.http import JsonResponse, HttpResponse
from django.shortcuts import render
from django.utils.translation import get_language, get_language_info
from django.utils.translation import gettext as _
from django.views import View
from pydantic import ValidationError
from django_email_learning.apps import PLATFORM_ADMIN_GROUP_NAME
from django_email_learning.models import Course, OrganizationUser
from django_email_learning.services.command_models.enroll_from_google_directory_command import (
EnrollFromGoogleDirectoryCommand,
)
from django_email_learning.services.command_handler_service import CommandHandlerService
from django_email_learning.services.jwt_service import (
InvalidTokenException,
decode_jwt,
generate_jwt,
)
from .models import Session, SessionState
from .serializers import CreateSessionRequest
def _command_result_response( # type: ignore[no-untyped-def]
request,
*,
page_title: str,
success_message: str | None = None,
error_message: str | None = None,
status_code: int = 200,
) -> HttpResponse:
current_lang_code = get_language()
lang_info = get_language_info(current_lang_code)
return render(
request,
"personalised/command_result.html",
{
"page_title": page_title,
"appContext": {
"successMessage": success_message,
"errorMessage": error_message,
"closeWindow": True,
"direction": "rtl" if lang_info["bidi"] else "ltr",
"localeMessages": {"Confirm": _("Confirm")},
},
},
status=status_code,
)
def _has_oauth_session_access(user, organization_id: int) -> bool: # type: ignore[no-untyped-def]
if user.is_superuser:
return True
if user.groups.filter(name=PLATFORM_ADMIN_GROUP_NAME).exists():
return True
return OrganizationUser.objects.filter(
user=user,
organization_id=organization_id,
role="admin",
).exists()
class SessionsView(View):
def post(self, request, *args, **kwargs): # type: ignore[no-untyped-def]
if not request.user.is_authenticated:
return JsonResponse({"error": "Unauthorized"}, status=401)
payload = json.loads(request.body)
try:
serializer = CreateSessionRequest.model_validate(payload)
except ValidationError as ve:
return JsonResponse({"error": ve.errors()}, status=400)
command = serializer.request.command
if not isinstance(command, EnrollFromGoogleDirectoryCommand):
return JsonResponse(
{"error": "Unsupported command for oauth session"}, status=400
)
try:
course = Course.objects.get(id=command.course_id)
except Course.DoesNotExist:
return JsonResponse({"error": "Course not found"}, status=404)
if not _has_oauth_session_access(request.user, course.organization_id):
return JsonResponse({"error": "Forbidden"}, status=403)
temp_session = Session.objects.create(jwt_token="pending")
authorization_url = command.get_authorization_url(temp_session.session_id)
command_payload = serializer.request.model_dump(mode="json")
temp_session.jwt_token = generate_jwt(command_payload)
temp_session.save(update_fields=["jwt_token"])
return JsonResponse(
{
"session_id": temp_session.session_id,
"authorization_url": authorization_url,
},
status=201,
)
class SessionView(View):
def get(self, request, *args, **kwargs): # type: ignore[no-untyped-def]
try:
session = Session.objects.get(session_id=kwargs["session_id"])
return JsonResponse(
{"session_id": session.session_id, "state": session.state}, status=200
)
except Session.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
class RedirectView(View):
def get(self, request, *args, **kwargs): # type: ignore[no-untyped-def]
session_id = request.GET.get("state")
code = request.GET.get("code")
if not session_id:
return _command_result_response(
request,
page_title=_("Authorization Error"),
error_message=_("Missing state parameter."),
status_code=400,
)
try:
session = Session.objects.get(session_id=session_id)
except Session.DoesNotExist:
return _command_result_response(
request,
page_title=_("Authorization Error"),
error_message=_("Invalid session identifier."),
status_code=404,
)
if not code:
session.state = SessionState.FAILED
session.save(update_fields=["state"])
return _command_result_response(
request,
page_title=_("Authorization Error"),
error_message=_("Missing code parameter."),
status_code=400,
)
try:
session.state = SessionState.PROCESSING
session.save(update_fields=["state"])
decoded_request = decode_jwt(session.jwt_token)
command_payload = decoded_request.get("command", {})
command_payload["code"] = code
command_payload["state"] = session_id
decoded_request["command"] = command_payload
command_handler = CommandHandlerService()
command_handler.handle_json_command(decoded_request)
session.state = SessionState.COMPLETED
session.save(update_fields=["state"])
return _command_result_response(
request,
page_title=_("Authorization Complete"),
success_message=_(
"Google authorization completed successfully. You can close this window."
),
status_code=200,
)
except InvalidTokenException:
session.state = SessionState.FAILED
session.save(update_fields=["state"])
return _command_result_response(
request,
page_title=_("Authorization Error"),
error_message=_("Invalid OAuth session token."),
status_code=400,
)
except Exception as e: # noqa: BLE001
session.state = SessionState.FAILED
session.save(update_fields=["state"])
return _command_result_response(
request,
page_title=_("Authorization Error"),
error_message=str(e),
status_code=400,
)