Skip to content

Commit 4b3ff66

Browse files
authored
Fix run and job status_message calculation (#2889)
* Fix run and job status_message calculation * Comment no_error_reasons
1 parent 86c4048 commit 4b3ff66

File tree

4 files changed

+114
-122
lines changed

4 files changed

+114
-122
lines changed

src/dstack/_internal/core/models/runs.py

Lines changed: 28 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ def to_status(self) -> "RunStatus":
101101
}
102102
return mapping[self]
103103

104+
def to_error(self) -> Optional[str]:
105+
if self == RunTerminationReason.RETRY_LIMIT_EXCEEDED:
106+
return "retry limit exceeded"
107+
elif self == RunTerminationReason.SERVER_ERROR:
108+
return "server error"
109+
else:
110+
return None
111+
104112

105113
class JobTerminationReason(str, Enum):
106114
# Set by the server
@@ -162,6 +170,24 @@ def to_retry_event(self) -> Optional[RetryEvent]:
162170
default = RetryEvent.ERROR if self.to_status() == JobStatus.FAILED else None
163171
return mapping.get(self, default)
164172

173+
def to_error(self) -> Optional[str]:
174+
# Should return None for values that are already
175+
# handled and shown in status_message.
176+
error_mapping = {
177+
JobTerminationReason.INSTANCE_UNREACHABLE: "instance unreachable",
178+
JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED: "waiting instance limit exceeded",
179+
JobTerminationReason.VOLUME_ERROR: "volume error",
180+
JobTerminationReason.GATEWAY_ERROR: "gateway error",
181+
JobTerminationReason.SCALED_DOWN: "scaled down",
182+
JobTerminationReason.INACTIVITY_DURATION_EXCEEDED: "inactivity duration exceeded",
183+
JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY: "utilization policy",
184+
JobTerminationReason.PORTS_BINDING_FAILED: "ports binding failed",
185+
JobTerminationReason.CREATING_CONTAINER_ERROR: "runner error",
186+
JobTerminationReason.EXECUTOR_ERROR: "executor error",
187+
JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded",
188+
}
189+
return error_mapping.get(self)
190+
165191

166192
class Requirements(CoreModel):
167193
# TODO: Make requirements' fields required
@@ -305,13 +331,12 @@ class JobSubmission(CoreModel):
305331
finished_at: Optional[datetime]
306332
inactivity_secs: Optional[int]
307333
status: JobStatus
334+
status_message: str = "" # default for backward compatibility
308335
termination_reason: Optional[JobTerminationReason]
309336
termination_reason_message: Optional[str]
310337
exit_status: Optional[int]
311338
job_provisioning_data: Optional[JobProvisioningData]
312339
job_runtime_data: Optional[JobRuntimeData]
313-
# TODO: make status_message and error a computed field after migrating to pydanticV2
314-
status_message: Optional[str] = None
315340
error: Optional[str] = None
316341

317342
@property
@@ -325,71 +350,11 @@ def duration(self) -> timedelta:
325350
end_time = self.finished_at
326351
return end_time - self.submitted_at
327352

328-
def dict(self, *args, **kwargs) -> Dict:
329-
status_message = self._get_status_message()
330-
error = self._get_error()
331-
# super() does not work with pydantic-duality
332-
res = CoreModel.dict(self, *args, **kwargs)
333-
res["status_message"] = status_message
334-
res["error"] = error
335-
return res
336-
337-
def _get_status_message(self) -> Optional[str]:
338-
if self.status == JobStatus.DONE:
339-
return "exited (0)"
340-
elif self.status == JobStatus.FAILED:
341-
if self.termination_reason == JobTerminationReason.CONTAINER_EXITED_WITH_ERROR:
342-
return f"exited ({self.exit_status})"
343-
elif (
344-
self.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
345-
):
346-
return "no offers"
347-
elif self.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY:
348-
return "interrupted"
349-
else:
350-
return "error"
351-
elif self.status == JobStatus.TERMINATED:
352-
if self.termination_reason == JobTerminationReason.TERMINATED_BY_USER:
353-
return "stopped"
354-
elif self.termination_reason == JobTerminationReason.ABORTED_BY_USER:
355-
return "aborted"
356-
return self.status.value
357-
358-
def _get_error(self) -> Optional[str]:
359-
return JobSubmission._termination_reason_to_error(
360-
termination_reason=self.termination_reason
361-
)
362-
363-
@staticmethod
364-
def _termination_reason_to_error(
365-
termination_reason: Optional[JobTerminationReason],
366-
) -> Optional[str]:
367-
error_mapping = {
368-
JobTerminationReason.INSTANCE_UNREACHABLE: "instance unreachable",
369-
JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED: "waiting instance limit exceeded",
370-
JobTerminationReason.VOLUME_ERROR: "volume error",
371-
JobTerminationReason.GATEWAY_ERROR: "gateway error",
372-
JobTerminationReason.SCALED_DOWN: "scaled down",
373-
JobTerminationReason.INACTIVITY_DURATION_EXCEEDED: "inactivity duration exceeded",
374-
JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY: "utilization policy",
375-
JobTerminationReason.PORTS_BINDING_FAILED: "ports binding failed",
376-
JobTerminationReason.CREATING_CONTAINER_ERROR: "runner error",
377-
JobTerminationReason.EXECUTOR_ERROR: "executor error",
378-
JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded",
379-
}
380-
return error_mapping.get(termination_reason)
381-
382353

383354
class Job(CoreModel):
384355
job_spec: JobSpec
385356
job_submissions: List[JobSubmission]
386357

387-
def get_last_termination_reason(self) -> Optional[JobTerminationReason]:
388-
for submission in reversed(self.job_submissions):
389-
if submission.termination_reason is not None:
390-
return submission.termination_reason
391-
return None
392-
393358

394359
class RunSpec(CoreModel):
395360
# TODO: run_name, working_dir are redundant here since they already passed in configuration
@@ -519,72 +484,17 @@ class Run(CoreModel):
519484
submitted_at: datetime
520485
last_processed_at: datetime
521486
status: RunStatus
522-
status_message: Optional[str] = None
487+
status_message: str = "" # default for backward compatibility
523488
termination_reason: Optional[RunTerminationReason] = None
524489
run_spec: RunSpec
525490
jobs: List[Job]
526491
latest_job_submission: Optional[JobSubmission] = None
527492
cost: float = 0
528493
service: Optional[ServiceSpec] = None
529494
deployment_num: int = 0 # default for compatibility with pre-0.19.14 servers
530-
# TODO: make error a computed field after migrating to pydanticV2
531495
error: Optional[str] = None
532496
deleted: Optional[bool] = None
533497

534-
def dict(self, *args, **kwargs) -> Dict:
535-
status_message = self._get_status_message()
536-
error = self._get_error()
537-
# super() does not work with pydantic-duality
538-
res = CoreModel.dict(self, *args, **kwargs)
539-
res["status_message"] = status_message
540-
res["error"] = error
541-
return res
542-
543-
def _get_error(self) -> Optional[str]:
544-
return Run._termination_reason_to_error(termination_reason=self.termination_reason)
545-
546-
@staticmethod
547-
def _termination_reason_to_error(
548-
termination_reason: Optional[RunTerminationReason],
549-
) -> Optional[str]:
550-
if termination_reason == RunTerminationReason.RETRY_LIMIT_EXCEEDED:
551-
return "retry limit exceeded"
552-
elif termination_reason == RunTerminationReason.SERVER_ERROR:
553-
return "server error"
554-
else:
555-
return None
556-
557-
def _get_status_message(self) -> Optional[str]:
558-
if len(self.jobs) == 0:
559-
return self.status.value
560-
561-
last_job = self.jobs[0]
562-
# FIXME: status_message should not require all job submissions for status calculation
563-
# since it's very expensive and is not required for anything else.
564-
# May return a different status if not all job submissions requested.
565-
# TODO: Calculate status_message by looking at job models directly instead job submissions.
566-
last_job_termination_reason = last_job.get_last_termination_reason()
567-
568-
if len(self.jobs) == 1:
569-
# FIXME: Clarify why show "pulling" only in case of one job
570-
if (
571-
last_job.job_submissions
572-
and last_job.job_submissions[-1].status == JobStatus.PULLING
573-
):
574-
return "pulling"
575-
576-
retry_on_events = last_job.job_spec.retry.on_events if last_job.job_spec.retry else []
577-
# Currently, `retrying` is shown only for `no-capacity` events
578-
if (
579-
self.status in [RunStatus.SUBMITTED, RunStatus.PENDING]
580-
and last_job_termination_reason
581-
== JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
582-
and RetryEvent.NO_CAPACITY in retry_on_events
583-
):
584-
return "retrying"
585-
586-
return self.status.value
587-
588498
def is_deployment_in_progress(self) -> bool:
589499
return any(
590500
not j.job_submissions[-1].status.is_finished()

src/dstack/_internal/server/services/jobs/__init__.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ def job_model_to_job_submission(job_model: JobModel) -> JobSubmission:
134134
finished_at = None
135135
if job_model.status.is_finished():
136136
finished_at = last_processed_at
137+
status_message = _get_job_status_message(job_model)
138+
error = _get_job_error(job_model)
137139
return JobSubmission(
138140
id=job_model.id,
139141
submission_num=job_model.submission_num,
@@ -143,11 +145,13 @@ def job_model_to_job_submission(job_model: JobModel) -> JobSubmission:
143145
finished_at=finished_at,
144146
inactivity_secs=job_model.inactivity_secs,
145147
status=job_model.status,
148+
status_message=status_message,
146149
termination_reason=job_model.termination_reason,
147150
termination_reason_message=job_model.termination_reason_message,
148151
exit_status=job_model.exit_status,
149152
job_provisioning_data=job_provisioning_data,
150153
job_runtime_data=get_job_runtime_data(job_model),
154+
error=error,
151155
)
152156

153157

@@ -693,3 +697,31 @@ def _get_job_mount_point_attached_volume(
693697
continue
694698
return volume
695699
raise ServerClientError("Failed to find an eligible volume for the mount point")
700+
701+
702+
def _get_job_status_message(job_model: JobModel) -> str:
703+
if job_model.status == JobStatus.DONE:
704+
return "exited (0)"
705+
elif job_model.status == JobStatus.FAILED:
706+
if job_model.termination_reason == JobTerminationReason.CONTAINER_EXITED_WITH_ERROR:
707+
return f"exited ({job_model.exit_status})"
708+
elif (
709+
job_model.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
710+
):
711+
return "no offers"
712+
elif job_model.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY:
713+
return "interrupted"
714+
else:
715+
return "error"
716+
elif job_model.status == JobStatus.TERMINATED:
717+
if job_model.termination_reason == JobTerminationReason.TERMINATED_BY_USER:
718+
return "stopped"
719+
elif job_model.termination_reason == JobTerminationReason.ABORTED_BY_USER:
720+
return "aborted"
721+
return job_model.status.value
722+
723+
724+
def _get_job_error(job_model: JobModel) -> Optional[str]:
725+
if job_model.termination_reason is None:
726+
return None
727+
return job_model.termination_reason.to_error()

src/dstack/_internal/server/services/runs.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
)
2525
from dstack._internal.core.models.profiles import (
2626
CreationPolicy,
27+
RetryEvent,
2728
)
2829
from dstack._internal.core.models.repos.virtual import DEFAULT_VIRTUAL_REPO_ID, VirtualRunRepoData
2930
from dstack._internal.core.models.runs import (
@@ -686,19 +687,23 @@ def run_model_to_run(
686687
if run_model.service_spec is not None:
687688
service_spec = ServiceSpec.__response__.parse_raw(run_model.service_spec)
688689

690+
status_message = _get_run_status_message(run_model)
691+
error = _get_run_error(run_model)
689692
run = Run(
690693
id=run_model.id,
691694
project_name=run_model.project.name,
692695
user=run_model.user.name,
693696
submitted_at=run_model.submitted_at.replace(tzinfo=timezone.utc),
694697
last_processed_at=run_model.last_processed_at.replace(tzinfo=timezone.utc),
695698
status=run_model.status,
699+
status_message=status_message,
696700
termination_reason=run_model.termination_reason,
697701
run_spec=run_spec,
698702
jobs=jobs,
699703
latest_job_submission=latest_job_submission,
700704
service=service_spec,
701705
deployment_num=run_model.deployment_num,
706+
error=error,
702707
deleted=run_model.deleted,
703708
)
704709
run.cost = _get_run_cost(run)
@@ -746,6 +751,52 @@ def _get_run_jobs_with_submissions(
746751
return jobs
747752

748753

754+
def _get_run_status_message(run_model: RunModel) -> str:
755+
if len(run_model.jobs) == 0:
756+
return run_model.status.value
757+
758+
sorted_job_models = sorted(
759+
run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num)
760+
)
761+
job_models_grouped_by_job = list(
762+
list(jm)
763+
for _, jm in itertools.groupby(sorted_job_models, key=lambda j: (j.replica_num, j.job_num))
764+
)
765+
766+
if all(job_models[-1].status == JobStatus.PULLING for job_models in job_models_grouped_by_job):
767+
# Show `pulling`` if last job submission of all jobs is pulling
768+
return "pulling"
769+
770+
if run_model.status in [RunStatus.SUBMITTED, RunStatus.PENDING]:
771+
# Show `retrying` if any job caused the run to retry
772+
for job_models in job_models_grouped_by_job:
773+
last_job_spec = JobSpec.__response__.parse_raw(job_models[-1].job_spec_data)
774+
retry_on_events = last_job_spec.retry.on_events if last_job_spec.retry else []
775+
last_job_termination_reason = _get_last_job_termination_reason(job_models)
776+
if (
777+
last_job_termination_reason
778+
== JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
779+
and RetryEvent.NO_CAPACITY in retry_on_events
780+
):
781+
# TODO: Show `retrying` for other retry events
782+
return "retrying"
783+
784+
return run_model.status.value
785+
786+
787+
def _get_last_job_termination_reason(job_models: List[JobModel]) -> Optional[JobTerminationReason]:
788+
for job_model in reversed(job_models):
789+
if job_model.termination_reason is not None:
790+
return job_model.termination_reason
791+
return None
792+
793+
794+
def _get_run_error(run_model: RunModel) -> Optional[str]:
795+
if run_model.termination_reason is None:
796+
return None
797+
return run_model.termination_reason.to_error()
798+
799+
749800
async def _get_pool_offers(
750801
session: AsyncSession,
751802
project: ProjectModel,

src/tests/_internal/core/models/test_runs.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
from dstack._internal.core.models.profiles import RetryEvent
22
from dstack._internal.core.models.runs import (
33
JobStatus,
4-
JobSubmission,
54
JobTerminationReason,
6-
Run,
75
RunStatus,
86
RunTerminationReason,
97
)
@@ -35,6 +33,7 @@ def test_job_termination_reason_to_retry_event_works_with_all_enum_variants():
3533

3634
# Will fail if JobTerminationReason value is added without updating JobSubmission._get_error
3735
def test_get_error_returns_expected_messages():
36+
# already handled and shown in status_message
3837
no_error_reasons = [
3938
JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
4039
JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY,
@@ -47,7 +46,7 @@ def test_get_error_returns_expected_messages():
4746
]
4847

4948
for reason in JobTerminationReason:
50-
if JobSubmission._termination_reason_to_error(reason) is None:
49+
if reason.to_error() is None:
5150
# Fail no-error reason is not in the list
5251
assert reason in no_error_reasons
5352

@@ -62,6 +61,6 @@ def test_run_get_error_returns_none_for_specific_reasons():
6261
]
6362

6463
for reason in RunTerminationReason:
65-
if Run._termination_reason_to_error(reason) is None:
64+
if reason.to_error() is None:
6665
# Fail no-error reason is not in the list
6766
assert reason in no_error_reasons

0 commit comments

Comments
 (0)