Skip to content

Commit ab5dfbf

Browse files
authored
Introduce job_submissions_limit and include_jobs parameters for /api/runs/list (#2883)
* Introduce job_submissions_limit for /api/runs/list * Test job_submissions_limit * Use "job_submissions_limit": 1 in the UI * Add include_jobs filter * Fix typo
1 parent db55a99 commit ab5dfbf

File tree

9 files changed

+194
-38
lines changed

9 files changed

+194
-38
lines changed

frontend/src/pages/Runs/List/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export const RunList: React.FC = () => {
4848

4949
const { data, isLoading, refreshList, isLoadingMore } = useInfiniteScroll<IRun, TRunsRequestParams>({
5050
useLazyQuery: useLazyGetRunsQuery,
51-
args: { ...filteringRequestParams, limit: DEFAULT_TABLE_PAGE_SIZE },
51+
args: { ...filteringRequestParams, limit: DEFAULT_TABLE_PAGE_SIZE, job_submissions_limit: 1 },
5252
getPaginationParams: (lastRun) => ({ prev_submitted_at: lastRun.submitted_at }),
5353
});
5454

frontend/src/types/run.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ declare type TRunsRequestParams = {
77
prev_run_id?: string;
88
limit?: number;
99
ascending?: boolean;
10+
job_submissions_limit?: number;
1011
};
1112

1213
declare type TDeleteRunsRequestParams = {

src/dstack/_internal/core/models/runs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,10 @@ def _get_error(termination_reason: Optional[RunTerminationReason]) -> Optional[s
556556

557557
@root_validator
558558
def _status_message(cls, values) -> Dict:
559+
# FIXME: status_message should not require all job submissions for status calculation
560+
# since it's very expensive and is not required for anything else.
561+
# May return a different status if not all job submissions requested.
562+
# TODO: Calculate status_message by looking at job models directly instead job submissions.
559563
try:
560564
status = values["status"]
561565
jobs: List[Job] = values["jobs"]

src/dstack/_internal/server/routers/runs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ async def list_runs(
5454
repo_id=body.repo_id,
5555
username=body.username,
5656
only_active=body.only_active,
57+
include_jobs=body.include_jobs,
58+
job_submissions_limit=body.job_submissions_limit,
5759
prev_submitted_at=body.prev_submitted_at,
5860
prev_run_id=body.prev_run_id,
5961
limit=body.limit,

src/dstack/_internal/server/schemas/runs.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,24 @@
99

1010

1111
class ListRunsRequest(CoreModel):
12-
project_name: Optional[str]
13-
repo_id: Optional[str]
14-
username: Optional[str]
12+
project_name: Optional[str] = None
13+
repo_id: Optional[str] = None
14+
username: Optional[str] = None
1515
only_active: bool = False
16-
prev_submitted_at: Optional[datetime]
17-
prev_run_id: Optional[UUID]
16+
include_jobs: bool = Field(
17+
True,
18+
description=("Whether to include `jobs` in the response"),
19+
)
20+
job_submissions_limit: Optional[int] = Field(
21+
None,
22+
ge=0,
23+
description=(
24+
"Limit number of job submissions returned per job to avoid large responses."
25+
"Drops older job submissions. No effect with `include_jobs: false`"
26+
),
27+
)
28+
prev_submitted_at: Optional[datetime] = None
29+
prev_run_id: Optional[UUID] = None
1830
limit: int = Field(100, ge=0, le=100)
1931
ascending: bool = False
2032

src/dstack/_internal/server/services/runs.py

Lines changed: 62 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ async def list_user_runs(
105105
repo_id: Optional[str],
106106
username: Optional[str],
107107
only_active: bool,
108+
include_jobs: bool,
109+
job_submissions_limit: Optional[int],
108110
prev_submitted_at: Optional[datetime],
109111
prev_run_id: Optional[uuid.UUID],
110112
limit: int,
@@ -148,7 +150,14 @@ async def list_user_runs(
148150
runs = []
149151
for r in run_models:
150152
try:
151-
runs.append(run_model_to_run(r, return_in_api=True))
153+
runs.append(
154+
run_model_to_run(
155+
r,
156+
return_in_api=True,
157+
include_jobs=include_jobs,
158+
job_submissions_limit=job_submissions_limit,
159+
)
160+
)
152161
except pydantic.ValidationError:
153162
pass
154163
if len(run_models) > len(runs):
@@ -652,46 +661,26 @@ async def delete_runs(
652661

653662
def run_model_to_run(
654663
run_model: RunModel,
655-
include_job_submissions: bool = True,
664+
include_jobs: bool = True,
665+
job_submissions_limit: Optional[int] = None,
656666
return_in_api: bool = False,
657667
include_sensitive: bool = False,
658668
) -> Run:
659669
jobs: List[Job] = []
660-
run_jobs = sorted(run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num))
661-
for replica_num, replica_submissions in itertools.groupby(
662-
run_jobs, key=lambda j: j.replica_num
663-
):
664-
for job_num, job_submissions in itertools.groupby(
665-
replica_submissions, key=lambda j: j.job_num
666-
):
667-
submissions = []
668-
job_model = None
669-
for job_model in job_submissions:
670-
if include_job_submissions:
671-
job_submission = job_model_to_job_submission(job_model)
672-
if return_in_api:
673-
# Set default non-None values for 0.18 backward-compatibility
674-
# Remove in 0.19
675-
if job_submission.job_provisioning_data is not None:
676-
if job_submission.job_provisioning_data.hostname is None:
677-
job_submission.job_provisioning_data.hostname = ""
678-
if job_submission.job_provisioning_data.ssh_port is None:
679-
job_submission.job_provisioning_data.ssh_port = 22
680-
submissions.append(job_submission)
681-
if job_model is not None:
682-
# Use the spec from the latest submission. Submissions can have different specs
683-
job_spec = JobSpec.__response__.parse_raw(job_model.job_spec_data)
684-
if not include_sensitive:
685-
_remove_job_spec_sensitive_info(job_spec)
686-
jobs.append(Job(job_spec=job_spec, job_submissions=submissions))
670+
if include_jobs:
671+
jobs = _get_run_jobs_with_submissions(
672+
run_model=run_model,
673+
job_submissions_limit=job_submissions_limit,
674+
return_in_api=return_in_api,
675+
include_sensitive=include_sensitive,
676+
)
687677

688678
run_spec = RunSpec.__response__.parse_raw(run_model.run_spec)
689679

690680
latest_job_submission = None
691-
if include_job_submissions:
681+
if len(jobs) > 0 and len(jobs[0].job_submissions) > 0:
692682
# TODO(egor-s): does it make sense with replicas and multi-node?
693-
if jobs:
694-
latest_job_submission = jobs[0].job_submissions[-1]
683+
latest_job_submission = jobs[0].job_submissions[-1]
695684

696685
service_spec = None
697686
if run_model.service_spec is not None:
@@ -716,6 +705,47 @@ def run_model_to_run(
716705
return run
717706

718707

708+
def _get_run_jobs_with_submissions(
709+
run_model: RunModel,
710+
job_submissions_limit: Optional[int],
711+
return_in_api: bool = False,
712+
include_sensitive: bool = False,
713+
) -> List[Job]:
714+
jobs: List[Job] = []
715+
run_jobs = sorted(run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num))
716+
for replica_num, replica_submissions in itertools.groupby(
717+
run_jobs, key=lambda j: j.replica_num
718+
):
719+
for job_num, job_models in itertools.groupby(replica_submissions, key=lambda j: j.job_num):
720+
submissions = []
721+
job_model = None
722+
if job_submissions_limit is not None:
723+
if job_submissions_limit == 0:
724+
# Take latest job submission to return its job_spec
725+
job_models = list(job_models)[-1:]
726+
else:
727+
job_models = list(job_models)[-job_submissions_limit:]
728+
for job_model in job_models:
729+
if job_submissions_limit != 0:
730+
job_submission = job_model_to_job_submission(job_model)
731+
if return_in_api:
732+
# Set default non-None values for 0.18 backward-compatibility
733+
# Remove in 0.19
734+
if job_submission.job_provisioning_data is not None:
735+
if job_submission.job_provisioning_data.hostname is None:
736+
job_submission.job_provisioning_data.hostname = ""
737+
if job_submission.job_provisioning_data.ssh_port is None:
738+
job_submission.job_provisioning_data.ssh_port = 22
739+
submissions.append(job_submission)
740+
if job_model is not None:
741+
# Use the spec from the latest submission. Submissions can have different specs
742+
job_spec = JobSpec.__response__.parse_raw(job_model.job_spec_data)
743+
if not include_sensitive:
744+
_remove_job_spec_sensitive_info(job_spec)
745+
jobs.append(Job(job_spec=job_spec, job_submissions=submissions))
746+
return jobs
747+
748+
719749
async def _get_pool_offers(
720750
session: AsyncSession,
721751
project: ProjectModel,

src/dstack/api/_public/runs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,7 @@ def list(self, all: bool = False, limit: Optional[int] = None) -> List[Run]:
748748
repo_id=None,
749749
only_active=only_active,
750750
limit=limit or 100,
751+
job_submissions_limit=1, # no need to return more than 1 submission per job
751752
)
752753
if only_active and len(runs) == 0:
753754
runs = self._api_client.runs.list(

src/dstack/api/server/_runs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,16 @@ def list(
3333
prev_run_id: Optional[UUID] = None,
3434
limit: int = 100,
3535
ascending: bool = False,
36+
include_jobs: bool = True,
37+
job_submissions_limit: Optional[int] = None,
3638
) -> List[Run]:
3739
body = ListRunsRequest(
3840
project_name=project_name,
3941
repo_id=repo_id,
4042
username=username,
4143
only_active=only_active,
44+
include_jobs=include_jobs,
45+
job_submissions_limit=job_submissions_limit,
4246
prev_submitted_at=prev_submitted_at,
4347
prev_run_id=prev_run_id,
4448
limit=limit,

src/tests/_internal/server/routers/test_runs.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,108 @@ async def test_lists_runs_pagination(
707707
assert len(response2_json) == 1
708708
assert response2_json[0]["id"] == str(run2.id)
709709

710+
@pytest.mark.asyncio
711+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
712+
async def test_limits_job_submissions(
713+
self, test_db, session: AsyncSession, client: AsyncClient
714+
):
715+
user = await create_user(session=session, global_role=GlobalRole.USER)
716+
project = await create_project(session=session, owner=user)
717+
await add_project_member(
718+
session=session, project=project, user=user, project_role=ProjectRole.USER
719+
)
720+
repo = await create_repo(
721+
session=session,
722+
project_id=project.id,
723+
)
724+
run_submitted_at = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc)
725+
run = await create_run(
726+
session=session,
727+
project=project,
728+
repo=repo,
729+
user=user,
730+
submitted_at=run_submitted_at,
731+
)
732+
run_spec = RunSpec.parse_raw(run.run_spec)
733+
await create_job(
734+
session=session,
735+
run=run,
736+
submitted_at=run_submitted_at,
737+
last_processed_at=run_submitted_at,
738+
)
739+
job2 = await create_job(
740+
session=session,
741+
run=run,
742+
submitted_at=run_submitted_at,
743+
last_processed_at=run_submitted_at,
744+
)
745+
job2_spec = JobSpec.parse_raw(job2.job_spec_data)
746+
response = await client.post(
747+
"/api/runs/list",
748+
headers=get_auth_headers(user.token),
749+
json={"job_submissions_limit": 1},
750+
)
751+
assert response.status_code == 200, response.json()
752+
assert response.json() == [
753+
{
754+
"id": str(run.id),
755+
"project_name": project.name,
756+
"user": user.name,
757+
"submitted_at": run_submitted_at.isoformat(),
758+
"last_processed_at": run_submitted_at.isoformat(),
759+
"status": "submitted",
760+
"status_message": "submitted",
761+
"run_spec": run_spec.dict(),
762+
"jobs": [
763+
{
764+
"job_spec": job2_spec.dict(),
765+
"job_submissions": [
766+
{
767+
"id": str(job2.id),
768+
"submission_num": 0,
769+
"deployment_num": 0,
770+
"submitted_at": run_submitted_at.isoformat(),
771+
"last_processed_at": run_submitted_at.isoformat(),
772+
"finished_at": None,
773+
"inactivity_secs": None,
774+
"status": "submitted",
775+
"status_message": "submitted",
776+
"termination_reason": None,
777+
"termination_reason_message": None,
778+
"error": None,
779+
"exit_status": None,
780+
"job_provisioning_data": None,
781+
"job_runtime_data": None,
782+
}
783+
],
784+
}
785+
],
786+
"latest_job_submission": {
787+
"id": str(job2.id),
788+
"submission_num": 0,
789+
"deployment_num": 0,
790+
"submitted_at": run_submitted_at.isoformat(),
791+
"last_processed_at": run_submitted_at.isoformat(),
792+
"finished_at": None,
793+
"inactivity_secs": None,
794+
"status": "submitted",
795+
"status_message": "submitted",
796+
"termination_reason_message": None,
797+
"termination_reason": None,
798+
"error": None,
799+
"exit_status": None,
800+
"job_provisioning_data": None,
801+
"job_runtime_data": None,
802+
},
803+
"cost": 0,
804+
"service": None,
805+
"deployment_num": 0,
806+
"termination_reason": None,
807+
"error": None,
808+
"deleted": False,
809+
},
810+
]
811+
710812

711813
class TestGetRun:
712814
@pytest.mark.asyncio

0 commit comments

Comments
 (0)