diff --git a/dg_projects/legacy_openedx/legacy_openedx/ops/open_edx.py b/dg_projects/legacy_openedx/legacy_openedx/ops/open_edx.py index 81656cd90..e4924c3b6 100644 --- a/dg_projects/legacy_openedx/legacy_openedx/ops/open_edx.py +++ b/dg_projects/legacy_openedx/legacy_openedx/ops/open_edx.py @@ -31,6 +31,8 @@ from pypika import MySQLQuery as Query from pypika import Table, Tables +COURSE_EXPORT_TIMEOUT = timedelta(minutes=60) + class ListCoursesConfig(Config): edx_course_api_page_size: int = Field( @@ -568,7 +570,7 @@ def export_edx_forum_database( ), }, ) -def export_edx_courses( +def export_edx_courses( # noqa: C901 context: OpExecutionContext, edx_course_ids: list[str], daily_extracts_dir: str, @@ -577,23 +579,48 @@ def export_edx_courses( exported_courses = context.resources.openedx.export_courses( course_ids=edx_course_ids, ) + failed_initial = exported_courses.get("failed_uploads", {}) + if failed_initial: + context.log.warning( + "The following courses failed to queue for export and will be skipped: %s", + list(failed_initial.keys()), + ) successful_exports: set[str] = set() failed_exports: set[str] = set() tasks = exported_courses["upload_task_ids"] context.log.info("Exporting %s tasks from Open edX", len(tasks)) # Possible status values found here: # https://github.com/openedx/django-user-tasks/blob/master/user_tasks/models.py + start_time = datetime.now(tz=UTC) while len(successful_exports.union(failed_exports)) < len(tasks): + if datetime.now(tz=UTC) - start_time > COURSE_EXPORT_TIMEOUT: + timed_out = set(tasks.keys()) - successful_exports - failed_exports + err_msg = ( + f"Timed out waiting for course exports after {COURSE_EXPORT_TIMEOUT}. " + f"Unresolved courses: {timed_out}" + ) + raise TimeoutError(err_msg) time.sleep(timedelta(seconds=60).seconds) for course_id, task_id in tasks.items(): + if course_id in successful_exports or course_id in failed_exports: + continue try: task_status = context.resources.openedx.check_course_export_status( course_id, task_id, ) - except httpx.HTTPStatusError: - # Don't fail the whole job if one task status yields an error - continue + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: # noqa: PLR2004 + context.log.warning( + "Export task not found (HTTP 404) for course %s " + "(task %s), marking as failed", + course_id, + task_id, + ) + failed_exports.add(course_id) + continue + else: + raise if task_status["state"] == "Succeeded": successful_exports.add(course_id) if task_status["state"] in {"Failed", "Canceled", "Retrying"}: diff --git a/packages/ol-orchestrate-lib/src/ol_orchestrate/resources/openedx.py b/packages/ol-orchestrate-lib/src/ol_orchestrate/resources/openedx.py index 47834f3fb..c9e68f2dc 100644 --- a/packages/ol-orchestrate-lib/src/ol_orchestrate/resources/openedx.py +++ b/packages/ol-orchestrate-lib/src/ol_orchestrate/resources/openedx.py @@ -64,6 +64,15 @@ def export_courses(self, course_ids: list[str]) -> dict[str, dict[str, str]]: :returns: A dictionary of course IDs and the S3 URL where it will be exported to. + + .. note:: + The ol_openedx_course_export plugin returns HTTP 400 when some (but + not all) courses fail to queue. We allow 400 through only when the + response body contains the expected partial-failure schema + (``upload_task_ids`` key present), so callers can still process + successfully-queued tasks. A 400 with an unrecognised body (e.g. a + bad-request error from a malformed payload) still raises so the + caller gets a clear error instead of a later ``KeyError``. """ request_url = f"{self.studio_url}/api/courses/v0/export/" response = self.http_client.post( @@ -72,6 +81,16 @@ def export_courses(self, course_ids: list[str]) -> dict[str, dict[str, str]]: headers={"Authorization": f"JWT {self._fetch_access_token()}"}, timeout=60, ) + if response.status_code == 400: # noqa: PLR2004 + # Partial-failure: some courses failed to queue but others succeeded. + # Only suppress the error when the body matches the expected schema. + try: + body = response.json() + except ValueError: + response.raise_for_status() + if "upload_task_ids" not in body: + response.raise_for_status() + return body response.raise_for_status() return response.json()