diff --git a/lms/djangoapps/certificates/data.py b/lms/djangoapps/certificates/data.py index f1131ad74886..51f8d4d0432f 100644 --- a/lms/djangoapps/certificates/data.py +++ b/lms/djangoapps/certificates/data.py @@ -57,11 +57,12 @@ class CertificateStatuses: requesting = 'requesting' readable_statuses = { - downloadable: "already received", - notpassing: "didn't receive", - error: "error states", - audit_passing: "audit passing states", - audit_notpassing: "audit not passing states", + downloadable: "Received", + notpassing: "Not Received", + unavailable: "Invalidated", + error: "Error State", + audit_passing: "Audit - Passing", + audit_notpassing: "Audit - Not Passing", } PASSED_STATUSES = (downloadable, generating) diff --git a/lms/djangoapps/certificates/models.py b/lms/djangoapps/certificates/models.py index ac5916879b4e..541ff41d7b15 100644 --- a/lms/djangoapps/certificates/models.py +++ b/lms/djangoapps/certificates/models.py @@ -588,7 +588,7 @@ def get_certificate_generation_candidates(self): if not task_input.strip(): # if task input is empty, it means certificates were generated for all learners # Translators: This string represents task was executed for all learners. - return _("All learners") + return _("All Learners") task_input_json = json.loads(task_input) @@ -607,9 +607,9 @@ def get_certificate_generation_candidates(self): # for backwards compatibility. if 'student_set' in task_input_json or 'students' in task_input_json: # Translators: This string represents task was executed for students having exceptions. - return _("For exceptions") + return _("Granted Exceptions") else: - return _("All learners") + return _("All Learners") class Meta: app_label = "certificates" diff --git a/lms/djangoapps/certificates/tests/test_models.py b/lms/djangoapps/certificates/tests/test_models.py index 526b9a8f9392..5d7ad08fe615 100644 --- a/lms/djangoapps/certificates/tests/test_models.py +++ b/lms/djangoapps/certificates/tests/test_models.py @@ -267,22 +267,22 @@ class TestCertificateGenerationHistory(OpenEdxEventsTestMixin, TestCase): ENABLED_OPENEDX_EVENTS = [] @ddt.data( - ({"student_set": "allowlisted_not_generated"}, "For exceptions", True), - ({"student_set": "allowlisted_not_generated"}, "For exceptions", False), + ({"student_set": "allowlisted_not_generated"}, "Granted Exceptions", True), + ({"student_set": "allowlisted_not_generated"}, "Granted Exceptions", False), # check "students" key for backwards compatibility - ({"students": [1, 2, 3]}, "For exceptions", True), - ({"students": [1, 2, 3]}, "For exceptions", False), - ({}, "All learners", True), - ({}, "All learners", False), + ({"students": [1, 2, 3]}, "Granted Exceptions", True), + ({"students": [1, 2, 3]}, "Granted Exceptions", False), + ({}, "All Learners", True), + ({}, "All Learners", False), # test single status to regenerate returns correctly - ({"statuses_to_regenerate": ['downloadable']}, 'already received', True), - ({"statuses_to_regenerate": ['downloadable']}, 'already received', False), + ({"statuses_to_regenerate": ['downloadable']}, 'Received', True), + ({"statuses_to_regenerate": ['downloadable']}, 'Received', False), # test that list of > 1 statuses render correctly - ({"statuses_to_regenerate": ['downloadable', 'error']}, 'already received, error states', True), - ({"statuses_to_regenerate": ['downloadable', 'error']}, 'already received, error states', False), + ({"statuses_to_regenerate": ['downloadable', 'error']}, 'Received, Error State', True), + ({"statuses_to_regenerate": ['downloadable', 'error']}, 'Received, Error State', False), # test that only "readable" statuses are returned - ({"statuses_to_regenerate": ['downloadable', 'not_readable']}, 'already received', True), - ({"statuses_to_regenerate": ['downloadable', 'not_readable']}, 'already received', False), + ({"statuses_to_regenerate": ['downloadable', 'not_readable']}, 'Received', True), + ({"statuses_to_regenerate": ['downloadable', 'not_readable']}, 'Received', False), ) @ddt.unpack def test_get_certificate_generation_candidates(self, task_input, expected, is_regeneration): diff --git a/lms/djangoapps/instructor/tests/test_api_v2.py b/lms/djangoapps/instructor/tests/test_api_v2.py index 403acb4be6a3..66dd6710a720 100644 --- a/lms/djangoapps/instructor/tests/test_api_v2.py +++ b/lms/djangoapps/instructor/tests/test_api_v2.py @@ -36,7 +36,7 @@ UserFactory, ) from lms.djangoapps.certificates.data import CertificateStatuses -from lms.djangoapps.certificates.models import CertificateGenerationHistory +from lms.djangoapps.certificates.models import CertificateAllowlist, CertificateGenerationHistory from lms.djangoapps.certificates.tests.factories import GeneratedCertificateFactory from lms.djangoapps.courseware.models import StudentModule from lms.djangoapps.instructor.access import ROLE_DISPLAY_NAMES @@ -2091,6 +2091,57 @@ def test_pagination(self): assert 'previous' in response.data assert 'results' in response.data + def test_granted_exceptions_without_certificates(self): + """ + Test that granted_exceptions filter shows allowlisted users + even if they don't have GeneratedCertificate records yet. + """ + # Add student1 to allowlist (has verified enrollment) + CertificateAllowlist.objects.create( + user=self.student1, + course_id=self.course_key, + allowlist=True, + notes='Medical emergency' + ) + + # Add student2 to allowlist (has audit enrollment, no certificate) + CertificateAllowlist.objects.create( + user=self.student2, + course_id=self.course_key, + allowlist=True, + notes='Special case' + ) + + # Create certificate only for student1 + GeneratedCertificateFactory.create( + user=self.student1, + course_id=self.course_key, + status=CertificateStatuses.downloadable + ) + + self.client.force_authenticate(user=self.instructor) + params = {'filter': 'granted_exceptions'} + response = self.client.get(self._get_url(), params) + + assert response.status_code == status.HTTP_200_OK + assert response.data['count'] == 2 # Both students should appear + + results = {r['username']: r for r in response.data['results']} + + # Verify student1 (has certificate) + assert 'student1' in results + assert results['student1']['enrollment_track'] == 'verified' + assert results['student1']['certificate_status'] == 'downloadable' + assert results['student1']['special_case'] == 'Exception' + assert results['student1']['exception_notes'] == 'Medical emergency' + + # Verify student2 (no certificate, but should appear with enrollment data) + assert 'student2' in results + assert results['student2']['enrollment_track'] == 'audit' + assert results['student2']['certificate_status'] == 'audit_notpassing' + assert results['student2']['special_case'] == 'Exception' + assert results['student2']['exception_notes'] == 'Special case' + @ddt.ddt class CertificateGenerationHistoryViewTest(SharedModuleStoreTestCase): @@ -2203,7 +2254,7 @@ def test_history_entry_structure(self): # Verify all required fields are present (snake_case from serializer) assert entry['task_name'] == 'Regenerated' assert 'date' in entry - assert entry['details'] == 'All learners' + assert entry['details'] == 'All Learners' # Verify data types assert isinstance(entry['task_name'], str) diff --git a/lms/djangoapps/instructor/tests/test_certificates_api_v2.py b/lms/djangoapps/instructor/tests/test_certificates_api_v2.py new file mode 100644 index 000000000000..db915f5452a3 --- /dev/null +++ b/lms/djangoapps/instructor/tests/test_certificates_api_v2.py @@ -0,0 +1,427 @@ +""" +Unit tests for instructor API v2 certificate management endpoints. +""" +from unittest.mock import patch + +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APIClient + +from common.djangoapps.student.tests.factories import ( + CourseEnrollmentFactory, + InstructorFactory, + UserFactory, +) +from lms.djangoapps.certificates.data import CertificateStatuses +from lms.djangoapps.certificates.models import CertificateAllowlist, CertificateInvalidation +from lms.djangoapps.certificates.tests.factories import GeneratedCertificateFactory +from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase +from xmodule.modulestore.tests.factories import CourseFactory + + +class ToggleCertificateGenerationViewTest(SharedModuleStoreTestCase): + """Tests for ToggleCertificateGenerationView.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.course = CourseFactory.create() + + def setUp(self): + super().setUp() + self.client = APIClient() + self.instructor = InstructorFactory.create(course_key=self.course.id) + self.student = UserFactory.create() + self.url = reverse( + 'instructor_api_v2:toggle_certificate_generation', + kwargs={'course_id': str(self.course.id)} + ) + + def test_permission_required(self): + """Test that only instructors can toggle certificate generation.""" + self.client.force_authenticate(user=self.student) + response = self.client.post(self.url, {'enabled': True}, format='json') + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_course_not_found(self): + """Test 404 when course doesn't exist.""" + self.client.force_authenticate(user=self.instructor) + url = reverse( + 'instructor_api_v2:toggle_certificate_generation', + kwargs={'course_id': 'course-v1:edX+Invalid+2024'} + ) + response = self.client.post(url, {'enabled': True}, format='json') + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_missing_enabled_field(self): + """Test validation error when enabled field is missing.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.post(self.url, {}, format='json') + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert 'enabled' in response.data + + def test_invalid_enabled_field_type(self): + """Test validation error when enabled is not boolean.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.post(self.url, {'enabled': 'invalid'}, format='json') + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_successful_toggle(self): + """Test successful certificate generation toggle.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.post(self.url, {'enabled': False}, format='json') + assert response.status_code == status.HTTP_200_OK + assert response.data == {'enabled': False} + + +class CertificateExceptionsViewTest(SharedModuleStoreTestCase): + """Tests for CertificateExceptionsView.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.course = CourseFactory.create() + + def setUp(self): + super().setUp() + self.client = APIClient() + self.instructor = InstructorFactory.create(course_key=self.course.id) + self.student = UserFactory.create() + self.enrolled_student = UserFactory.create() + CourseEnrollmentFactory.create(user=self.enrolled_student, course_id=self.course.id) + self.url = reverse( + 'instructor_api_v2:certificate_exceptions', + kwargs={'course_id': str(self.course.id)} + ) + + def test_post_permission_required(self): + """Test that only instructors can grant exceptions.""" + self.client.force_authenticate(user=self.student) + response = self.client.post( + self.url, + {'learners': [self.enrolled_student.username], 'notes': 'Test'}, + format='json' + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_post_course_not_found(self): + """Test 404 when course doesn't exist.""" + self.client.force_authenticate(user=self.instructor) + url = reverse( + 'instructor_api_v2:certificate_exceptions', + kwargs={'course_id': 'course-v1:edX+Invalid+2024'} + ) + response = self.client.post( + url, + {'learners': [self.enrolled_student.username], 'notes': 'Test'}, + format='json' + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_post_no_learners(self): + """Test error when no learners provided.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.post(self.url, {'learners': [], 'notes': 'Test'}, format='json') + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_post_user_not_found(self): + """Test error for non-existent user.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.post( + self.url, + {'learners': ['nonexistent'], 'notes': 'Test'}, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data['errors']) == 1 + # The actual error message from get_user_by_username_or_email + assert 'does not exist' in response.data['errors'][0]['message'] + + def test_post_user_not_enrolled(self): + """Test error when user is not enrolled.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.post( + self.url, + {'learners': [self.student.username], 'notes': 'Test'}, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data['errors']) == 1 + assert 'not enrolled' in response.data['errors'][0]['message'] + + def test_post_user_has_invalidation(self): + """Test error when user has active invalidation.""" + cert = GeneratedCertificateFactory.create( + user=self.enrolled_student, + course_id=self.course.id, + status=CertificateStatuses.unavailable + ) + CertificateInvalidation.objects.create( + generated_certificate=cert, + invalidated_by=self.instructor, + notes='Test invalidation', + active=True + ) + + self.client.force_authenticate(user=self.instructor) + response = self.client.post( + self.url, + {'learners': [self.enrolled_student.username], 'notes': 'Test'}, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data['errors']) == 1 + assert 'invalidation' in response.data['errors'][0]['message'] + + @patch('lms.djangoapps.instructor.views.api_v2.certs_api.create_or_update_certificate_allowlist_entry') + def test_post_successful_single(self, mock_create): + """Test successful exception grant for single learner.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.post( + self.url, + {'learners': [self.enrolled_student.username], 'notes': 'Test exception'}, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data['success']) == 1 + assert self.enrolled_student.username in response.data['success'] + mock_create.assert_called_once() + + @patch('lms.djangoapps.instructor.views.api_v2.certs_api.create_or_update_certificate_allowlist_entry') + def test_post_successful_bulk(self, mock_create): + """Test successful bulk exception grant.""" + student2 = UserFactory.create() + CourseEnrollmentFactory.create(user=student2, course_id=self.course.id) + + self.client.force_authenticate(user=self.instructor) + response = self.client.post( + self.url, + { + 'learners': [self.enrolled_student.username, student2.username], + 'notes': 'Bulk test' + }, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data['success']) == 2 + assert mock_create.call_count == 2 + + def test_delete_permission_required(self): + """Test that only instructors can remove exceptions.""" + self.client.force_authenticate(user=self.student) + response = self.client.delete( + self.url, + {'username': self.enrolled_student.username}, + format='json' + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_delete_no_username(self): + """Test error when username not provided.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.delete(self.url, {}, format='json') + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_delete_user_not_found(self): + """Test error when user doesn't exist.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.delete( + self.url, + {'username': 'nonexistent'}, + format='json' + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + @patch('lms.djangoapps.instructor.views.api_v2.certs_api.get_allowlist_entry') + def test_delete_no_exception(self, mock_get_entry): + """Test 404 when no exception exists.""" + mock_get_entry.return_value = None + self.client.force_authenticate(user=self.instructor) + response = self.client.delete( + self.url, + {'username': self.enrolled_student.username}, + format='json' + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + + @patch('lms.djangoapps.instructor.views.api_v2.certs_api.remove_allowlist_entry') + @patch('lms.djangoapps.instructor.views.api_v2.certs_api.get_allowlist_entry') + def test_delete_successful(self, mock_get_entry, mock_remove): + """Test successful exception removal.""" + mock_entry = CertificateAllowlist( + user=self.enrolled_student, + course_id=self.course.id, + allowlist=True, + notes='Test' + ) + mock_get_entry.return_value = mock_entry + + self.client.force_authenticate(user=self.instructor) + response = self.client.delete( + self.url, + {'username': self.enrolled_student.username}, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + mock_remove.assert_called_once_with(self.enrolled_student, self.course.id) + + +class CertificateInvalidationsViewTest(SharedModuleStoreTestCase): + """Tests for CertificateInvalidationsView.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.course = CourseFactory.create() + + def setUp(self): + super().setUp() + self.client = APIClient() + self.instructor = InstructorFactory.create(course_key=self.course.id) + self.student = UserFactory.create() + self.enrolled_student = UserFactory.create() + CourseEnrollmentFactory.create(user=self.enrolled_student, course_id=self.course.id) + self.url = reverse( + 'instructor_api_v2:certificate_invalidations', + kwargs={'course_id': str(self.course.id)} + ) + + def test_post_permission_required(self): + """Test that only instructors can invalidate certificates.""" + self.client.force_authenticate(user=self.student) + response = self.client.post( + self.url, + {'learners': [self.enrolled_student.username], 'notes': 'Test'}, + format='json' + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_post_course_not_found(self): + """Test 404 when course doesn't exist.""" + self.client.force_authenticate(user=self.instructor) + url = reverse( + 'instructor_api_v2:certificate_invalidations', + kwargs={'course_id': 'course-v1:edX+Invalid+2024'} + ) + response = self.client.post( + url, + {'learners': [self.enrolled_student.username], 'notes': 'Test'}, + format='json' + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_post_no_certificate(self): + """Test error when certificate doesn't exist.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.post( + self.url, + {'learners': [self.enrolled_student.username], 'notes': 'Test'}, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data['errors']) == 1 + assert 'not found' in response.data['errors'][0]['message'] + + def test_post_certificate_already_invalid(self): + """Test error when certificate is already invalid.""" + GeneratedCertificateFactory.create( + user=self.enrolled_student, + course_id=self.course.id, + status=CertificateStatuses.unavailable + ) + + self.client.force_authenticate(user=self.instructor) + response = self.client.post( + self.url, + {'learners': [self.enrolled_student.username], 'notes': 'Test'}, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data['errors']) == 1 + assert 'invalid' in response.data['errors'][0]['message'] + + @patch('lms.djangoapps.instructor.views.api_v2.certs_api.create_certificate_invalidation_entry') + def test_post_successful(self, mock_create): + """Test successful certificate invalidation.""" + GeneratedCertificateFactory.create( + user=self.enrolled_student, + course_id=self.course.id, + status=CertificateStatuses.downloadable + ) + + self.client.force_authenticate(user=self.instructor) + response = self.client.post( + self.url, + {'learners': [self.enrolled_student.username], 'notes': 'Test invalidation'}, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.data['success']) == 1 + mock_create.assert_called_once() + + def test_delete_permission_required(self): + """Test that only instructors can re-validate certificates.""" + self.client.force_authenticate(user=self.student) + response = self.client.delete( + self.url, + {'username': self.enrolled_student.username}, + format='json' + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_delete_no_certificate(self): + """Test 404 when certificate doesn't exist.""" + self.client.force_authenticate(user=self.instructor) + response = self.client.delete( + self.url, + {'username': self.enrolled_student.username}, + format='json' + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + + @patch('lms.djangoapps.instructor_task.api.generate_certificates_for_students') + def test_delete_no_invalidation(self, mock_generate): + """Test 404 when no active invalidation exists.""" + GeneratedCertificateFactory.create( + user=self.enrolled_student, + course_id=self.course.id, + status=CertificateStatuses.downloadable + ) + + self.client.force_authenticate(user=self.instructor) + response = self.client.delete( + self.url, + {'username': self.enrolled_student.username}, + format='json' + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + mock_generate.assert_not_called() + + @patch('lms.djangoapps.instructor_task.api.generate_certificates_for_students') + def test_delete_successful_regeneration(self, mock_generate): + """Test successful re-validation triggers certificate regeneration.""" + cert = GeneratedCertificateFactory.create( + user=self.enrolled_student, + course_id=self.course.id, + status=CertificateStatuses.unavailable + ) + CertificateInvalidation.objects.create( + generated_certificate=cert, + invalidated_by=self.instructor, + notes='Test invalidation', + active=True + ) + + self.client.force_authenticate(user=self.instructor) + response = self.client.delete( + self.url, + {'username': self.enrolled_student.username}, + format='json' + ) + assert response.status_code == status.HTTP_200_OK + + # Verify certificate regeneration was triggered + mock_generate.assert_called_once() + call_args = mock_generate.call_args + assert call_args.kwargs['student_set'] == 'specific_student' + assert call_args.kwargs['specific_student_id'] == self.enrolled_student.id diff --git a/lms/djangoapps/instructor/tests/views/test_api_v2.py b/lms/djangoapps/instructor/tests/views/test_api_v2.py index 76765fbbae52..e438b3583c12 100644 --- a/lms/djangoapps/instructor/tests/views/test_api_v2.py +++ b/lms/djangoapps/instructor/tests/views/test_api_v2.py @@ -232,14 +232,14 @@ def test_get_problem_with_learner_no_submission_returns_nulls(self): self.assertIsNone(data['attempts']) # noqa: PT009 def test_get_problem_with_unknown_learner_returns_404(self): - """Test that a 404 is returned when learner does not exist""" + """Test that a 400 is returned when learner does not exist""" url = reverse('instructor_api_v2:problem_detail', kwargs={ 'course_id': str(self.course.id), 'location': str(self.problem.location) }) response = self.client.get(url, {'email_or_username': 'nonexistent_user'}) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) # noqa: PT009 + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) # noqa: PT009 def test_get_problem_requires_authentication(self): """Test that endpoint requires authentication""" diff --git a/lms/djangoapps/instructor/views/api_urls.py b/lms/djangoapps/instructor/views/api_urls.py index 621b0a9f3b7c..df45d11b9611 100644 --- a/lms/djangoapps/instructor/views/api_urls.py +++ b/lms/djangoapps/instructor/views/api_urls.py @@ -86,6 +86,21 @@ api_v2.CertificateConfigView.as_view(), name='certificate_config' ), + re_path( + rf'^courses/{COURSE_ID_PATTERN}/certificates/toggle_generation$', + api_v2.ToggleCertificateGenerationView.as_view(), + name='toggle_certificate_generation' + ), + re_path( + rf'^courses/{COURSE_ID_PATTERN}/certificates/exceptions$', + api_v2.CertificateExceptionsView.as_view(), + name='certificate_exceptions' + ), + re_path( + rf'^courses/{COURSE_ID_PATTERN}/certificates/invalidations$', + api_v2.CertificateInvalidationsView.as_view(), + name='certificate_invalidations' + ), re_path( rf'^courses/{COURSE_ID_PATTERN}/enrollments$', api_v2.CourseEnrollmentsView.as_view(), diff --git a/lms/djangoapps/instructor/views/api_v2.py b/lms/djangoapps/instructor/views/api_v2.py index dd137f1630d4..3f2255d996c7 100644 --- a/lms/djangoapps/instructor/views/api_v2.py +++ b/lms/djangoapps/instructor/views/api_v2.py @@ -114,7 +114,9 @@ BetaTesterModifyRequestSerializerV2, BetaTesterModifyResponseSerializerV2, BlockDueDateSerializerV2, + CertificateExceptionSerializer, CertificateGenerationHistorySerializer, + CertificateInvalidationSerializer, CourseEnrollmentSerializerV2, CourseInformationSerializerV2, CourseTeamModifySerializer, @@ -123,14 +125,18 @@ EnrollmentModifyResponseSerializerV2, InstructorTaskListSerializer, IssuedCertificateSerializer, + LearnerInputSerializer, LearnerSerializer, ORASerializer, ORASummarySerializer, ProblemSerializer, RegenerateCertificatesSerializer, + RemoveCertificateExceptionSerializer, + RemoveCertificateInvalidationSerializer, ScoreOverrideRequestSerializer, SyncOperationResultSerializer, TaskStatusSerializer, + ToggleCertificateGenerationSerializer, UnitExtensionSerializer, ) from .tools import find_unit, get_units_with_due_date, keep_field_private, set_due_date_extension, title_or_url @@ -1252,6 +1258,169 @@ class IssuedCertificatesView(ListAPIView): permission_name = permissions.VIEW_ISSUED_CERTIFICATES serializer_class = IssuedCertificateSerializer + def _create_certificate_dict_for_allowlisted_user(self, allowlist_entry, enrollment_dict): + """ + Create a dictionary representing certificate data for an allowlisted user + who may not have a GeneratedCertificate record yet. + """ + user = allowlist_entry.user + enrollment_mode = enrollment_dict.get(user.id, '') + + # Determine certificate status based on enrollment + if enrollment_mode == 'audit': + cert_status = 'audit_notpassing' + elif enrollment_mode == 'verified': + cert_status = 'downloadable' + else: + cert_status = 'notpassing' + + return { + 'username': user.username, + 'email': user.email, + 'enrollment_track': enrollment_mode, + 'certificate_status': cert_status, + 'special_case': 'Exception', + 'exception_granted': allowlist_entry.created.isoformat(), + 'exception_notes': allowlist_entry.notes or '', + 'invalidated_by': None, + 'invalidation_date': None, + 'invalidation_note': '', + } + + def _create_certificate_dict_for_invalidated_user(self, invalidation, enrollment_dict): + """ + Create a dictionary representing certificate data for an invalidated certificate + that may not have a GeneratedCertificate record. + """ + user = invalidation.generated_certificate.user if invalidation.generated_certificate else None + enrollment_mode = enrollment_dict.get(user.id, '') if user else '' + + return { + 'username': user.username if user else '', + 'email': user.email if user else '', + 'enrollment_track': enrollment_mode, + 'certificate_status': 'unavailable', + 'special_case': 'Invalidation', + 'exception_granted': None, + 'exception_notes': '', + 'invalidated_by': invalidation.invalidated_by.email if invalidation.invalidated_by else '', + 'invalidation_date': invalidation.created.isoformat(), + 'invalidation_note': invalidation.notes or '', + } + + def list(self, request, *args, **kwargs): + """ + Override list to handle granted_exceptions and invalidated filters specially. + + For these filters, we need to show ALL relevant users, + even those without GeneratedCertificate records yet. + """ + filter_type = request.query_params.get("filter", "all") + + if filter_type == "granted_exceptions": + course_id = self.kwargs["course_id"] + course_key = CourseKey.from_string(course_id) + search = request.query_params.get("search", "").strip() + + # Get enrollment data for context + enrollments = CourseEnrollment.objects.filter( + course_id=course_key + ).select_related('user') + enrollment_dict = {e.user_id: e.mode for e in enrollments} + + # Get all allowlist entries + allowlist_qs = CertificateAllowlist.objects.filter( + course_id=course_key, + allowlist=True + ).select_related('user') + + # Apply search filter + if search: + allowlist_qs = allowlist_qs.filter( + Q(user__username__icontains=search) | Q(user__email__icontains=search) + ) + + # Get existing certificates for allowlisted users + allowlist_user_ids = list(allowlist_qs.values_list('user_id', flat=True)) + existing_certs = GeneratedCertificate.objects.filter( + course_id=course_key, + user_id__in=allowlist_user_ids + ).select_related('user') + existing_cert_user_ids = set(existing_certs.values_list('user_id', flat=True)) + + # Build list of certificate data + certificate_data = [] + + # Add existing certificates + context = self.get_serializer_context() + for cert in existing_certs: + serializer = self.get_serializer(cert, context=context) + certificate_data.append(serializer.data) + + # Add synthetic certificates for allowlisted users without GeneratedCertificate + for entry in allowlist_qs: + if entry.user_id not in existing_cert_user_ids: + cert_dict = self._create_certificate_dict_for_allowlisted_user(entry, enrollment_dict) + certificate_data.append(cert_dict) + + # Sort by username + certificate_data.sort(key=lambda x: x['username']) + + # Paginate manually + paginator = self.pagination_class() + page = paginator.paginate_queryset(certificate_data, request) + + return paginator.get_paginated_response(page if page is not None else certificate_data) + + elif filter_type == "invalidated": + course_id = self.kwargs["course_id"] + course_key = CourseKey.from_string(course_id) + search = request.query_params.get("search", "").strip() + + # Get enrollment data for context + enrollments = CourseEnrollment.objects.filter( + course_id=course_key + ).select_related('user') + enrollment_dict = {e.user_id: e.mode for e in enrollments} + + # Get all invalidations + invalidations_qs = CertificateInvalidation.objects.filter( + generated_certificate__course_id=course_key, + active=True + ).select_related('generated_certificate__user', 'invalidated_by') + + # Apply search filter + if search: + invalidations_qs = invalidations_qs.filter( + Q(generated_certificate__user__username__icontains=search) | + Q(generated_certificate__user__email__icontains=search) + ) + + # Get existing certificates for invalidated users + invalidated_cert_ids = list(invalidations_qs.values_list('generated_certificate_id', flat=True)) + existing_certs = GeneratedCertificate.objects.filter( + id__in=invalidated_cert_ids + ).select_related('user') + + # Build list of certificate data using existing certificates + certificate_data = [] + context = self.get_serializer_context() + for cert in existing_certs: + serializer = self.get_serializer(cert, context=context) + certificate_data.append(serializer.data) + + # Sort by username + certificate_data.sort(key=lambda x: x['username']) + + # Paginate manually + paginator = self.pagination_class() + page = paginator.paginate_queryset(certificate_data, request) + + return paginator.get_paginated_response(page if page is not None else certificate_data) + + # For other filters, use default behavior + return super().list(request, *args, **kwargs) + def _apply_certificate_status_filter(self, certificates, filter_type, cert_statuses, course_key): """Apply status-based filters to certificate queryset.""" if filter_type == "received": @@ -1315,7 +1484,8 @@ def get_serializer_context(self): context['invalidation_dict'] = { inv.generated_certificate.user_id: { 'invalidated_by': inv.invalidated_by.email, - 'created': inv.created.isoformat() + 'created': inv.created.isoformat(), + 'notes': inv.notes or '' } for inv in invalidations } @@ -1339,6 +1509,26 @@ def get_queryset(self): filter_type = self.request.query_params.get("filter", "all") search = self.request.query_params.get("search", "").strip() + # Special handling for granted_exceptions: show all allowlisted users + if filter_type == "granted_exceptions": + allowlist_user_ids = CertificateAllowlist.objects.filter( + course_id=course_key, allowlist=True + ).values_list('user_id', flat=True) + + # Get all certificates for allowlisted users + certificates = GeneratedCertificate.objects.filter( + course_id=course_key, + user_id__in=allowlist_user_ids + ).select_related('user', 'user__profile') + + # Apply search filter if provided + if search: + certificates = certificates.filter( + Q(user__username__icontains=search) | Q(user__email__icontains=search) + ) + + return certificates.order_by('user__username') + # Get certificates for the course if filter_type in ['audit_passing', 'audit_not_passing', 'all']: certificates = GeneratedCertificate.objects.filter( @@ -1361,7 +1551,7 @@ def get_queryset(self): course_key, filter_type ) - # Apply filter based on filter type (includes granted_exceptions and invalidated) + # Apply filter based on filter type (includes invalidated) certificates = self._apply_certificate_status_filter( certificates, filter_type, CertificateStatuses, course_key ) @@ -1606,6 +1796,527 @@ def get(self, request, course_id): return Response({'enabled': enabled}, status=status.HTTP_200_OK) +class ToggleCertificateGenerationView(DeveloperErrorViewMixin, APIView): + """ + View to toggle certificate generation for a course. + + **Example Requests** + + POST /api/instructor/v2/courses/{course_id}/certificates/toggle_generation + + **Request Body** + + { + "enabled": true + } + + **Response Values** + + { + "enabled": true + } + + **Returns** + + * 200: OK - Certificate generation toggled successfully + * 400: Bad Request - Invalid request body + * 401: Unauthorized - User is not authenticated + * 403: Forbidden - User lacks instructor permissions + """ + permission_classes = (IsAuthenticated, permissions.InstructorPermission) + permission_name = permissions.ENABLE_CERTIFICATE_GENERATION + + def post(self, request, course_id): + """Toggle certificate generation for a course.""" + course_key = CourseKey.from_string(course_id) + # Validate that the course exists before updating certificate settings + get_course_by_id(course_key) + + # Validate request body + serializer = ToggleCertificateGenerationSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + enabled = serializer.validated_data['enabled'] + + try: + certs_api.set_cert_generation_enabled(course_key, enabled) + return Response({'enabled': enabled}, status=status.HTTP_200_OK) + except Exception: # pylint: disable=broad-except + log.exception("Error toggling certificate generation for course %s", course_id) + return Response( + {'message': _('Unable to update certificate generation settings')}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + + +class CertificateExceptionsView(DeveloperErrorViewMixin, APIView): + """ + View to grant or remove certificate exceptions (allowlist). + + **Example Requests** + + POST /api/instructor/v2/courses/{course_id}/certificates/exceptions + DELETE /api/instructor/v2/courses/{course_id}/certificates/exceptions + + **POST Request Body** + + { + "learners": ["username1", "username2"], + "notes": "Reason for granting exceptions" + } + + **DELETE Request Body** + + { + "username": "username1" + } + + **Returns** + + * 200: OK - Exception granted/removed successfully + * 400: Bad Request - Invalid request or user not found + * 401: Unauthorized - User is not authenticated + * 403: Forbidden - User lacks instructor permissions + """ + permission_classes = (IsAuthenticated, permissions.InstructorPermission) + permission_name = permissions.CERTIFICATE_EXCEPTION_VIEW + + def post(self, request, course_id): + """Grant certificate exceptions (add to allowlist).""" + course_key = CourseKey.from_string(course_id) + # Validate that the course exists + get_course_by_id(course_key) + + # Validate request data + serializer = CertificateExceptionSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + learners = serializer.validated_data['learners'] + notes = serializer.validated_data['notes'] + + results = { + 'success': [], + 'errors': [] + } + + # Resolve all usernames/emails to users upfront + learner_to_user, user_errors = _resolve_learners_to_users(learners) + results['errors'].extend(user_errors) + + # Validate learners for certificate exceptions + exceptions_to_create, validation_errors = _validate_learners_for_certificate_exceptions( + learner_to_user, course_key + ) + results['errors'].extend(validation_errors) + + # Create all exceptions using the certificates API to ensure idempotency + # and avoid race conditions with the unique_together constraint + for learner, user in exceptions_to_create: + try: + certs_api.create_or_update_certificate_allowlist_entry(user, course_key, notes) + log.info( + "Certificate exception granted for user %s (%s) in course %s by %s", + user.id, learner, course_key, request.user.username + ) + results['success'].append(learner) + except Exception as exc: # pylint: disable=broad-except + log.exception( + "Error creating certificate exception for user %s in course %s", + user.id, course_key + ) + results['errors'].append({ + 'learner': learner, + 'message': str(exc) + }) + + return Response(results, status=status.HTTP_200_OK) + + def delete(self, request, course_id): + """Remove certificate exception (remove from allowlist).""" + course_key = CourseKey.from_string(course_id) + # Validate that the course exists + get_course_by_id(course_key) + + # Validate request data + serializer = RemoveCertificateExceptionSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + user = serializer.validated_data['username'] + + try: + # Remove exception via certificates API so any existing certificate + # is invalidated before the allowlist entry is removed + if not certs_api.get_allowlist_entry(user, course_key): + return Response( + {'message': _('No certificate exception found for this user')}, + status=status.HTTP_404_NOT_FOUND + ) + + certs_api.remove_allowlist_entry(user, course_key) + + return Response( + {'message': _('Certificate exception removed successfully')}, + status=status.HTTP_200_OK + ) + + except Exception: # pylint: disable=broad-except + log.exception("Error removing certificate exception for course %s", course_id) + return Response( + {'message': _('Unable to remove certificate exception')}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + + +def _resolve_learners_to_users(learners): + """ + Resolve a list of learner identifiers (usernames or emails) to User objects. + + Args: + learners: List of learner identifiers (usernames or email addresses) + + Returns: + tuple: (learner_to_user, errors) where: + - learner_to_user: Dictionary mapping learner identifiers to User objects + - errors: List of error dictionaries with 'learner' and 'message' keys + """ + learner_to_user = {} + errors = [] + + for learner in learners: + try: + user = get_user_by_username_or_email(learner) + learner_to_user[learner] = user + except User.DoesNotExist as exc: + errors.append({ + 'learner': learner, + 'message': str(exc) + }) + + return learner_to_user, errors + + +def _validate_learners_for_certificate_exceptions(learner_to_user, course_key): + """ + Validate learners to ensure they can receive certificate exceptions. + + Args: + learner_to_user: Dictionary mapping learner identifiers to User objects + course_key: Course key for the course + + Returns: + tuple: (exceptions_to_create, errors) where: + - exceptions_to_create: List of (learner, user) tuples ready for exception creation + - errors: List of error dictionaries with 'learner' and 'message' keys + """ + errors = [] + exceptions_to_create = [] + + if not learner_to_user: + return exceptions_to_create, errors + + users = list(learner_to_user.values()) + user_ids = [u.id for u in users] + + # Bulk fetch active enrollments + enrollments = CourseEnrollment.objects.filter( + course_id=course_key, + user_id__in=user_ids, + is_active=True + ).values_list('user_id', flat=True) + enrolled_user_ids = set(enrollments) + + # Bulk fetch existing active allowlist entries + existing_allowlist = CertificateAllowlist.objects.filter( + course_id=course_key, + user_id__in=user_ids, + allowlist=True + ).values_list('user_id', flat=True) + allowlisted_user_ids = set(existing_allowlist) + + # Bulk fetch active invalidations + active_invalidations = CertificateInvalidation.objects.filter( + generated_certificate__course_id=course_key, + generated_certificate__user_id__in=user_ids, + active=True + ).values_list('generated_certificate__user_id', flat=True) + invalidated_user_ids = set(active_invalidations) + + # Validate each learner + for learner, user in learner_to_user.items(): + try: + # Check if user is enrolled + if user.id not in enrolled_user_ids: + errors.append({ + 'learner': learner, + 'message': _('User is not enrolled in this course') + }) + continue + + # Check if user already has an exception + if user.id in allowlisted_user_ids: + errors.append({ + 'learner': learner, + 'message': _('User already has a certificate exception') + }) + continue + + # Check if user has an active invalidation + if user.id in invalidated_user_ids: + errors.append({ + 'learner': learner, + 'message': _('User has an active certificate invalidation') + }) + continue + + # Learner is ready for exception creation + exceptions_to_create.append((learner, user)) + + except Exception as exc: # pylint: disable=broad-except + errors.append({ + 'learner': learner, + 'message': str(exc) + }) + + return exceptions_to_create, errors + + +def _validate_certificates_for_invalidation(learner_to_user, course_key): + """ + Validate certificates for a set of users to ensure they can be invalidated. + + Args: + learner_to_user: Dictionary mapping learner identifiers to User objects + course_key: Course key for the course + + Returns: + tuple: (certificates_to_invalidate, errors) where: + - certificates_to_invalidate: List of (learner, certificate) tuples ready for invalidation + - errors: List of error dictionaries with 'learner' and 'message' keys + """ + errors = [] + certificates_to_invalidate = [] + + if not learner_to_user: + return certificates_to_invalidate, errors + + users = list(learner_to_user.values()) + user_ids = [u.id for u in users] + + # Bulk fetch certificates (exclude deleted/deleting status) + certificates = GeneratedCertificate.objects.filter( + course_id=course_key, + user_id__in=user_ids + ).exclude( + status__in=[CertificateStatuses.deleted, CertificateStatuses.deleting] + ).select_related('user') + user_id_to_certificate = {cert.user_id: cert for cert in certificates} + + # Validate each learner's certificate + for learner, user in learner_to_user.items(): + try: + # Check if certificate exists + certificate = user_id_to_certificate.get(user.id) + if not certificate: + errors.append({ + 'learner': learner, + 'message': _('Certificate not found for this user') + }) + continue + + # Verify that the certificate is valid before invalidating + if not certificate.is_valid(): + errors.append({ + 'learner': learner, + 'message': _('Certificate is already invalid') + }) + continue + + # Certificate is ready for invalidation + certificates_to_invalidate.append((learner, certificate)) + + except Exception as exc: # pylint: disable=broad-except + errors.append({ + 'learner': learner, + 'message': str(exc) + }) + + return certificates_to_invalidate, errors + + +class CertificateInvalidationsView(DeveloperErrorViewMixin, APIView): + """ + View to invalidate or re-validate certificates. + + **Example Requests** + + POST /api/instructor/v2/courses/{course_id}/certificates/invalidations + DELETE /api/instructor/v2/courses/{course_id}/certificates/invalidations + + **POST Request Body** + + { + "learners": ["username1", "username2"], + "notes": "Reason for invalidation" + } + + **DELETE Request Body** + + { + "username": "username1" + } + + **Returns** + + * 200: OK - Certificate invalidated/re-validated successfully + * 400: Bad Request - Invalid request or certificate not found + * 401: Unauthorized - User is not authenticated + * 403: Forbidden - User lacks instructor permissions + """ + permission_classes = (IsAuthenticated, permissions.InstructorPermission) + permission_name = permissions.CERTIFICATE_INVALIDATION_VIEW + + def post(self, request, course_id): + """Invalidate certificates.""" + course_key = CourseKey.from_string(course_id) + # Validate that the course exists + get_course_by_id(course_key) + + # Validate request data + serializer = CertificateInvalidationSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + learners = serializer.validated_data['learners'] + notes = serializer.validated_data['notes'] + + results = { + 'success': [], + 'errors': [] + } + + # Resolve all usernames/emails to users upfront + learner_to_user, user_errors = _resolve_learners_to_users(learners) + results['errors'].extend(user_errors) + + # Validate certificates for invalidation + certificates_to_invalidate, validation_errors = _validate_certificates_for_invalidation( + learner_to_user, course_key + ) + results['errors'].extend(validation_errors) + + # Invalidate certificates using the certificates API to ensure idempotency + # and consistency with v1 behavior + for learner, certificate in certificates_to_invalidate: + try: + with transaction.atomic(): + # Create invalidation entry (uses update_or_create for idempotency) + certs_api.create_certificate_invalidation_entry( + certificate, + request.user, + notes, + ) + # Invalidate the certificate with explicit source for auditability + certificate.invalidate(source='instructor_api_v2') + log.info( + "Certificate invalidated for user %s (%s) in course %s by %s", + certificate.user_id, learner, course_key, request.user.username + ) + results['success'].append(learner) + except AlreadyRunningError: + log.warning( + "Certificate generation already running for user %s in course %s", + certificate.user_id, course_key + ) + results['errors'].append({ + 'learner': learner, + 'message': _('Cannot invalidate certificate while certificate generation is in progress. ' + 'Please wait for it to complete.') + }) + except Exception as exc: # pylint: disable=broad-except + log.exception( + "Error invalidating certificate for user %s in course %s", + certificate.user_id, course_key + ) + results['errors'].append({ + 'learner': learner, + 'message': str(exc) + }) + + return Response(results, status=status.HTTP_200_OK) + + def delete(self, request, course_id): + """Re-validate certificate (remove invalidation).""" + course_key = CourseKey.from_string(course_id) + # Validate that the course exists + get_course_by_id(course_key) + + # Validate request data + serializer = RemoveCertificateInvalidationSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + user = serializer.validated_data['username'] + + try: + # Get the certificate (exclude deleted/deleting status) + try: + certificate = GeneratedCertificate.objects.exclude( + status__in=[CertificateStatuses.deleted, CertificateStatuses.deleting] + ).get( + course_id=course_key, + user=user + ) + except GeneratedCertificate.DoesNotExist: + return Response( + {'message': _('Certificate not found for this user')}, + status=status.HTTP_404_NOT_FOUND + ) + + # Remove invalidation and restore certificate generation + with transaction.atomic(): + updated_count = CertificateInvalidation.objects.filter( + generated_certificate=certificate, + active=True + ).update(active=False) + + if updated_count == 0: + return Response( + {'message': _('No active invalidation found for this certificate')}, + status=status.HTTP_404_NOT_FOUND + ) + + # Trigger certificate regeneration for this student + log.info( + "Re-validating certificate for student %s in course %s - triggering regeneration", + user.id, course_key + ) + try: + task_api.generate_certificates_for_students( + request, course_key, student_set="specific_student", specific_student_id=user.id + ) + except Exception as cert_gen_error: # pylint: disable=broad-except + # Log but don't fail - the invalidation was already removed + log.warning( + "Certificate regeneration failed for student %s in course %s: %s", + user.id, course_key, str(cert_gen_error) + ) + + return Response( + {'message': _('Certificate invalidation removed successfully')}, + status=status.HTTP_200_OK + ) + + except Exception as exc: # pylint: disable=broad-except + log.exception("Error removing certificate invalidation for course %s: %s", course_id, str(exc)) + return Response( + {'message': _('Unable to remove certificate invalidation: {}').format(str(exc))}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + + class CourseEnrollmentsView(DeveloperErrorViewMixin, ListAPIView): """ List all active enrollments for a course with optional search, filtering, and pagination. @@ -1732,19 +2443,12 @@ def get(self, request, course_id, email_or_username): status=status.HTTP_400_BAD_REQUEST ) - UserModel = get_user_model() - try: - student = get_user_by_username_or_email(email_or_username) - except UserModel.DoesNotExist: - return Response( - {'error': 'Learner not found'}, - status=status.HTTP_404_NOT_FOUND - ) - except UserModel.MultipleObjectsReturned: - return Response( - {'error': 'Multiple learners found for the given identifier'}, - status=status.HTTP_400_BAD_REQUEST - ) + # Validate learner identifier + serializer = LearnerInputSerializer(data={'email_or_username': email_or_username}) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + student = serializer.validated_data['email_or_username'] # Build progress URL (MFE or legacy depending on feature flag) if course_home_mfe_progress_tab_is_active(course_key): @@ -1881,19 +2585,12 @@ def get(self, request, course_id, location): learner_identifier = request.query_params.get('email_or_username') if learner_identifier: - UserModel = get_user_model() - try: - student = get_user_by_username_or_email(learner_identifier) - except UserModel.DoesNotExist: - return Response( - {'error': 'Learner not found'}, - status=status.HTTP_404_NOT_FOUND - ) - except UserModel.MultipleObjectsReturned: - return Response( - {'error': 'Multiple learners found for the given identifier'}, - status=status.HTTP_400_BAD_REQUEST - ) + # Validate learner identifier + serializer = LearnerInputSerializer(data={'email_or_username': learner_identifier}) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + student = serializer.validated_data['email_or_username'] try: student_module = StudentModule.objects.get( diff --git a/lms/djangoapps/instructor/views/serializers_v2.py b/lms/djangoapps/instructor/views/serializers_v2.py index c61f3a310eb0..1cd671fa1d0c 100644 --- a/lms/djangoapps/instructor/views/serializers_v2.py +++ b/lms/djangoapps/instructor/views/serializers_v2.py @@ -9,6 +9,7 @@ from urllib.parse import urlparse from django.conf import settings +from django.contrib.auth import get_user_model from django.utils.html import escape from django.utils.translation import gettext as _ from edx_when.api import is_enabled_for_course @@ -16,6 +17,7 @@ from common.djangoapps.course_modes.models import CourseMode from common.djangoapps.student.models import CourseEnrollment +from common.djangoapps.student.models.user import get_user_by_username_or_email from common.djangoapps.student.roles import ( CourseFinanceAdminRole, CourseInstructorRole, @@ -36,6 +38,7 @@ from .tools import DashboardError, get_student_from_identifier, parse_datetime +User = get_user_model() log = logging.getLogger(__name__) @@ -82,6 +85,9 @@ class CourseInformationSerializerV2(serializers.Serializer): analytics_dashboard_message = serializers.SerializerMethodField( help_text="Message about analytics dashboard availability" ) + certificates_enabled = serializers.SerializerMethodField( + help_text="Whether certificate management features are enabled for this course" + ) @staticmethod def _build_tab_url(setting_name, *path_parts, strip_url=True): @@ -467,6 +473,14 @@ def get_analytics_dashboard_message(self, data): """Get analytics dashboard availability message.""" return get_analytics_dashboard_message(data['course'].id) + def get_certificates_enabled(self, data): + """Check if certificate management features are enabled.""" + from lms.djangoapps.certificates import api as certs_api + + course_key = data['course'].id + # Check if certificate generation is enabled (not available for CCX courses) + return certs_api.is_certificate_generation_enabled() and not hasattr(course_key, 'ccx') + class InstructorTaskSerializer(serializers.Serializer): """Serializer for instructor task details.""" @@ -624,6 +638,9 @@ class IssuedCertificateSerializer(serializers.Serializer): allow_null=True, help_text="Date when certificate was invalidated in ISO 8601 format" ) + invalidation_note = serializers.SerializerMethodField( + help_text="Notes about the invalidation" + ) def get_enrollment_track(self, obj): """Get enrollment track from context.""" @@ -665,6 +682,12 @@ def get_invalidation_date(self, obj): invalidation_info = invalidation_dict.get(obj.user_id) return invalidation_info['created'] if invalidation_info else None + def get_invalidation_note(self, obj): + """Get invalidation notes from invalidation data in context.""" + invalidation_dict = self.context.get('invalidation_dict', {}) + invalidation_info = invalidation_dict.get(obj.user_id) + return invalidation_info.get('notes', '') if invalidation_info else '' + class CertificateGenerationHistorySerializer(serializers.Serializer): """ @@ -691,6 +714,94 @@ def get_details(self, obj): return str(obj.get_certificate_generation_candidates()) +class ToggleCertificateGenerationSerializer(serializers.Serializer): + """ + Serializer for toggling certificate generation request. + """ + enabled = serializers.BooleanField( + required=True, + help_text="Whether to enable or disable certificate generation" + ) + + +class CertificateExceptionSerializer(serializers.Serializer): + """ + Serializer for granting certificate exceptions (bulk). + """ + learners = serializers.ListField( + child=serializers.CharField(max_length=255, allow_blank=False), + allow_empty=False, + max_length=1000, + help_text="List of usernames or email addresses of learners to grant exceptions" + ) + notes = serializers.CharField( + max_length=1000, + required=False, + allow_blank=True, + default='', + help_text="Notes about why the exception is being granted" + ) + + +class CertificateInvalidationSerializer(serializers.Serializer): + """ + Serializer for invalidating certificates (bulk). + """ + learners = serializers.ListField( + child=serializers.CharField(max_length=255, allow_blank=False), + allow_empty=False, + max_length=1000, + help_text="List of usernames or email addresses of learners to invalidate certificates" + ) + notes = serializers.CharField( + max_length=1000, + required=False, + allow_blank=True, + default='', + help_text="Notes about why the certificate is being invalidated" + ) + + +class RemoveCertificateExceptionSerializer(serializers.Serializer): + """ + Serializer for removing a certificate exception. + """ + username = serializers.CharField( + required=True, + max_length=255, + allow_blank=False, + help_text="Username or email address of the learner" + ) + + def validate_username(self, value): + """Validate and resolve username/email to user object.""" + try: + user = get_user_by_username_or_email(value) + return user + except User.DoesNotExist as exc: + raise serializers.ValidationError(str(exc)) from exc + + +class RemoveCertificateInvalidationSerializer(serializers.Serializer): + """ + Serializer for re-validating a certificate (removing invalidation). + """ + username = serializers.CharField( + required=True, + max_length=255, + allow_blank=False, + help_text="Username or email address of the learner" + ) + + def validate_username(self, value): + """Validate and resolve username/email to user object.""" + try: + user = get_user_by_username_or_email(value) + return user + except User.DoesNotExist as exc: + raise serializers.ValidationError(str(exc)) from exc + + class RegenerateCertificatesSerializer(serializers.Serializer): """ Serializer for regenerating certificates request. @@ -715,6 +826,28 @@ class RegenerateCertificatesSerializer(serializers.Serializer): ) +class LearnerInputSerializer(serializers.Serializer): + """ + Serializer for validating learner identifier (username or email). + """ + email_or_username = serializers.CharField( + required=True, + max_length=255, + allow_blank=False, + help_text="Username or email address of the learner" + ) + + def validate_email_or_username(self, value): + """Validate and resolve username/email to user object.""" + try: + user = get_user_by_username_or_email(value) + return user + except User.DoesNotExist as exc: + raise serializers.ValidationError(str(exc)) from exc + except User.MultipleObjectsReturned as exc: + raise serializers.ValidationError('Multiple learners found for the given identifier') from exc + + class CourseEnrollmentSerializerV2(serializers.Serializer): """ Serializer for course enrollment data.