Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions dg_projects/legacy_openedx/legacy_openedx/ops/open_edx.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from pypika import MySQLQuery as Query
from pypika import Table, Tables

COURSE_EXPORT_TIMEOUT = timedelta(minutes=60)


class ListCoursesConfig(Config):
edx_course_api_page_size: int = Field(
Expand Down Expand Up @@ -568,7 +570,7 @@ def export_edx_forum_database(
),
},
)
def export_edx_courses(
def export_edx_courses( # noqa: C901
context: OpExecutionContext,
edx_course_ids: list[str],
daily_extracts_dir: str,
Expand All @@ -577,23 +579,48 @@ def export_edx_courses(
exported_courses = context.resources.openedx.export_courses(
course_ids=edx_course_ids,
)
failed_initial = exported_courses.get("failed_uploads", {})
if failed_initial:
context.log.warning(
"The following courses failed to queue for export and will be skipped: %s",
list(failed_initial.keys()),
)
successful_exports: set[str] = set()
failed_exports: set[str] = set()
tasks = exported_courses["upload_task_ids"]
context.log.info("Exporting %s tasks from Open edX", len(tasks))
# Possible status values found here:
# https://github.com/openedx/django-user-tasks/blob/master/user_tasks/models.py
start_time = datetime.now(tz=UTC)
while len(successful_exports.union(failed_exports)) < len(tasks):
if datetime.now(tz=UTC) - start_time > COURSE_EXPORT_TIMEOUT:
timed_out = set(tasks.keys()) - successful_exports - failed_exports
err_msg = (
f"Timed out waiting for course exports after {COURSE_EXPORT_TIMEOUT}. "
f"Unresolved courses: {timed_out}"
)
raise TimeoutError(err_msg)
time.sleep(timedelta(seconds=60).seconds)
for course_id, task_id in tasks.items():
if course_id in successful_exports or course_id in failed_exports:
continue
try:
task_status = context.resources.openedx.check_course_export_status(
course_id,
task_id,
)
except httpx.HTTPStatusError:
# Don't fail the whole job if one task status yields an error
continue
except httpx.HTTPStatusError as e:
if e.response.status_code == 404: # noqa: PLR2004
context.log.warning(
"Export task not found (HTTP 404) for course %s "
"(task %s), marking as failed",
course_id,
task_id,
)
failed_exports.add(course_id)
continue
else:
raise
if task_status["state"] == "Succeeded":
successful_exports.add(course_id)
if task_status["state"] in {"Failed", "Canceled", "Retrying"}:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ def export_courses(self, course_ids: list[str]) -> dict[str, dict[str, str]]:

:returns: A dictionary of course IDs and the S3 URL where it will be exported
to.

.. note::
The ol_openedx_course_export plugin returns HTTP 400 when some (but
not all) courses fail to queue. We allow 400 through only when the
response body contains the expected partial-failure schema
(``upload_task_ids`` key present), so callers can still process
successfully-queued tasks. A 400 with an unrecognised body (e.g. a
bad-request error from a malformed payload) still raises so the
caller gets a clear error instead of a later ``KeyError``.
"""
request_url = f"{self.studio_url}/api/courses/v0/export/"
response = self.http_client.post(
Expand All @@ -72,6 +81,16 @@ def export_courses(self, course_ids: list[str]) -> dict[str, dict[str, str]]:
headers={"Authorization": f"JWT {self._fetch_access_token()}"},
timeout=60,
)
if response.status_code == 400: # noqa: PLR2004
# Partial-failure: some courses failed to queue but others succeeded.
# Only suppress the error when the body matches the expected schema.
try:
body = response.json()
except ValueError:
response.raise_for_status()
if "upload_task_ids" not in body:
response.raise_for_status()
return body
response.raise_for_status()
return response.json()

Expand Down
Loading