From 4edf2476e3e80dafab651e9da109119c86f9cd05 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 14 May 2025 16:24:22 +0500 Subject: [PATCH 1/4] Implement run priorities --- .../_internal/core/models/configurations.py | 19 +++++++++-- .../tasks/process_submitted_jobs.py | 11 +++++- .../bca2fdf130bf_add_runmodel_priority.py | 34 +++++++++++++++++++ src/dstack/_internal/server/models.py | 1 + src/dstack/_internal/server/services/runs.py | 31 ++++++++++++----- .../_internal/server/routers/test_runs.py | 2 ++ 6 files changed, 86 insertions(+), 12 deletions(-) create mode 100644 src/dstack/_internal/server/migrations/versions/bca2fdf130bf_add_runmodel_priority.py diff --git a/src/dstack/_internal/core/models/configurations.py b/src/dstack/_internal/core/models/configurations.py index f2ce73e85b..b12fb703db 100644 --- a/src/dstack/_internal/core/models/configurations.py +++ b/src/dstack/_internal/core/models/configurations.py @@ -23,6 +23,9 @@ MAX_INT64 = 2**63 - 1 SERVICE_HTTPS_DEFAULT = True STRIP_PREFIX_DEFAULT = True +RUN_PRIOTIRY_MIN = 0 +RUN_PRIOTIRY_MAX = 100 +RUN_PRIORITY_DEFAULT = 0 class RunConfigurationType(str, Enum): @@ -221,14 +224,26 @@ class BaseRunConfiguration(CoreModel): ) ), ] = None - # deprecated since 0.18.31; task, service -- no effect; dev-environment -- executed right before `init` - setup: CommandsList = [] resources: Annotated[ ResourcesSpec, Field(description="The resources requirements to run the configuration") ] = ResourcesSpec() + priority: Annotated[ + Optional[int], + Field( + ge=RUN_PRIOTIRY_MIN, + le=RUN_PRIOTIRY_MAX, + description=( + f"The priority of the run, an integer between `{RUN_PRIOTIRY_MIN}` and `{RUN_PRIOTIRY_MAX}`." + " `dstack` tries to provision runs with higher priority first." + f" Defaults to `{RUN_PRIORITY_DEFAULT}`" + ), + ), + ] = None volumes: Annotated[ List[Union[MountPoint, str]], Field(description="The volumes mount points") ] = [] + # deprecated since 0.18.31; task, service -- no effect; dev-environment -- executed right before `init` + setup: CommandsList = [] @validator("python", pre=True, always=True) def convert_python(cls, v, values) -> Optional[PythonVersion]: diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index efa8600a43..7dbb0ef3c9 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -93,11 +93,20 @@ async def _process_next_submitted_job(): async with lock: res = await session.execute( select(JobModel) + .join(JobModel.run) .where( JobModel.status == JobStatus.SUBMITTED, JobModel.id.not_in(lockset), ) - .order_by(JobModel.last_processed_at.asc()) + # Jobs are process in FIFO sorted by priority globally, + # thus runs from different project can "overtake" each other by using higher priorities. + # That's not a big problem as long as projects do not compete for the same compute resources. + # Jobs with lower priorities from other projects will be processed without major lag + # as long as new higher priority runs are not constantly submitted. + # TODO: Consider processing jobs from different projects fairly/round-robin + # Fully fair processing can be tricky to implement via the current DB queue as + # there can be many projects and we are limited by the max DB connections. + .order_by(RunModel.priority.desc(), JobModel.last_processed_at.asc()) .limit(1) .with_for_update(skip_locked=True) ) diff --git a/src/dstack/_internal/server/migrations/versions/bca2fdf130bf_add_runmodel_priority.py b/src/dstack/_internal/server/migrations/versions/bca2fdf130bf_add_runmodel_priority.py new file mode 100644 index 0000000000..4c51278534 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/bca2fdf130bf_add_runmodel_priority.py @@ -0,0 +1,34 @@ +"""Add RunModel.priority + +Revision ID: bca2fdf130bf +Revises: 20166748b60c +Create Date: 2025-05-14 15:24:21.269775 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "bca2fdf130bf" +down_revision = "20166748b60c" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("runs", schema=None) as batch_op: + batch_op.add_column(sa.Column("priority", sa.Integer(), nullable=True)) + batch_op.execute("UPDATE runs SET priority = 0") + with op.batch_alter_table("runs", schema=None) as batch_op: + batch_op.alter_column("priority", nullable=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("runs", schema=None) as batch_op: + batch_op.drop_column("priority") + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index 322f2163bb..161e242bcb 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -348,6 +348,7 @@ class RunModel(BaseModel): resubmission_attempt: Mapped[int] = mapped_column(Integer, default=0) run_spec: Mapped[str] = mapped_column(Text) service_spec: Mapped[Optional[str]] = mapped_column(Text) + priority: Mapped[int] = mapped_column(Integer, default=0) jobs: Mapped[List["JobModel"]] = relationship( back_populates="run", lazy="selectin", order_by="[JobModel.replica_num, JobModel.job_num]" diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index d2df9eda44..4f8d9b6941 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -16,7 +16,7 @@ ServerClientError, ) from dstack._internal.core.models.common import ApplyAction -from dstack._internal.core.models.configurations import AnyRunConfiguration +from dstack._internal.core.models.configurations import RUN_PRIORITY_DEFAULT, AnyRunConfiguration from dstack._internal.core.models.instances import ( InstanceAvailability, InstanceOfferWithAvailability, @@ -434,7 +434,12 @@ async def apply_plan( # FIXME: potentially long write transaction # Avoid getting run_model after update await session.execute( - update(RunModel).where(RunModel.id == current_resource.id).values(run_spec=run_spec.json()) + update(RunModel) + .where(RunModel.id == current_resource.id) + .values( + run_spec=run_spec.json(), + priority=run_spec.configuration.priority, + ) ) run = await get_run_by_name( session=session, @@ -495,6 +500,7 @@ async def submit_run( status=RunStatus.SUBMITTED, run_spec=run_spec.json(), last_processed_at=submitted_at, + priority=run_spec.configuration.priority, ) session.add(run_model) @@ -852,6 +858,13 @@ def _get_job_submission_cost(job_submission: JobSubmission) -> float: def _validate_run_spec_and_set_defaults(run_spec: RunSpec): + # This function may set defaults for null run_spec values, + # although most defaults are resolved when building job_spec + # so that we can keep both the original user-supplied value (null in run_spec) + # and the default in job_spec. + # If a property is stored in job_spec - resolve the default there. + # Server defaults are preferable over client defaults so that + # the defaults depend on the server version, not the client version. if run_spec.run_name is not None: validate_dstack_resource_name(run_spec.run_name) for mount_point in run_spec.configuration.volumes: @@ -875,11 +888,14 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec): raise ServerClientError( f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_TTL_SECONDS}s" ) + if run_spec.configuration.priority is None: + run_spec.configuration.priority = RUN_PRIORITY_DEFAULT set_resources_defaults(run_spec.configuration.resources) _UPDATABLE_SPEC_FIELDS = ["repo_code_hash", "configuration"] -_CONF_TYPE_TO_UPDATABLE_FIELDS = { +_CONF_UPDATABLE_FIELDS = ["priority"] +_TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS = { "dev-environment": ["inactivity_duration"], # Most service fields can be updated via replica redeployment. # TODO: Allow updating other fields when rolling deployment is supported. @@ -915,12 +931,9 @@ def _check_can_update_configuration( raise ServerClientError( f"Configuration type changed from {current.type} to {new.type}, cannot update" ) - updatable_fields = _CONF_TYPE_TO_UPDATABLE_FIELDS.get(new.type) - if updatable_fields is None: - raise ServerClientError( - f"Can only update {', '.join(_CONF_TYPE_TO_UPDATABLE_FIELDS)} configurations." - f" Not {new.type}" - ) + updatable_fields = _CONF_UPDATABLE_FIELDS + _TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS.get( + new.type, [] + ) diff = diff_models(current, new) changed_fields = list(diff.keys()) for key in changed_fields: diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 00141c9a0b..c20ef77cb4 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -124,6 +124,7 @@ def get_dev_env_run_plan_dict( "reservation": None, "fleets": None, "tags": None, + "priority": 0, }, "configuration_path": "dstack.yaml", "profile": { @@ -284,6 +285,7 @@ def get_dev_env_run_dict( "reservation": None, "fleets": None, "tags": None, + "priority": 0, }, "configuration_path": "dstack.yaml", "profile": { From 3c0b88c5d61a3c40f190ef1c513c7b179b3b4578 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 15 May 2025 10:09:29 +0500 Subject: [PATCH 2/4] Test runs priority --- src/dstack/_internal/server/testing/common.py | 2 + .../tasks/test_process_submitted_jobs.py | 60 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 6d6f37e9f5..d50ac01158 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -262,6 +262,7 @@ async def create_run( run_spec: Optional[RunSpec] = None, run_id: Optional[UUID] = None, deleted: bool = False, + priority: int = 0, ) -> RunModel: if run_spec is None: run_spec = get_run_spec( @@ -282,6 +283,7 @@ async def create_run( run_spec=run_spec.json(), last_processed_at=submitted_at, jobs=[], + priority=priority, ) session.add(run) await session.commit() diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index 24cb19daab..e23d787f5e 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -634,3 +634,63 @@ async def test_creates_new_instance_in_existing_fleet(self, test_db, session: As assert job.instance is not None assert job.instance.instance_num == 1 assert job.instance.fleet_id == fleet.id + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_picks_high_priority_jobs_first(self, test_db, session: AsyncSession): + project = await create_project(session) + user = await create_user(session) + repo = await create_repo( + session=session, + project_id=project.id, + ) + instance = await create_instance( + session=session, + project=project, + status=InstanceStatus.IDLE, + ) + run1 = await create_run( + session=session, + project=project, + repo=repo, + user=user, + priority=10, + ) + job1 = await create_job( + session=session, + run=run1, + instance_assigned=True, + instance=instance, + ) + run2 = await create_run( + session=session, + project=project, + repo=repo, + user=user, + priority=0, + ) + job2 = await create_job( + session=session, run=run2, instance_assigned=True, instance=instance + ) + run3 = await create_run( + session=session, + project=project, + repo=repo, + user=user, + priority=100, + ) + job3 = await create_job( + session=session, + run=run3, + instance_assigned=True, + instance=instance, + ) + await process_submitted_jobs() + await session.refresh(job3) + assert job3.status == JobStatus.PROVISIONING + await process_submitted_jobs() + await session.refresh(job1) + assert job1.status == JobStatus.PROVISIONING + await process_submitted_jobs() + await session.refresh(job2) + assert job2.status == JobStatus.PROVISIONING From de22da66f14c92e3a4f2a7742fa725d4a241bc97 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 15 May 2025 10:33:18 +0500 Subject: [PATCH 3/4] Document priorities for tasks --- docs/docs/concepts/tasks.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/docs/docs/concepts/tasks.md b/docs/docs/concepts/tasks.md index dcdc6b5304..3dc4b08b6f 100644 --- a/docs/docs/concepts/tasks.md +++ b/docs/docs/concepts/tasks.md @@ -427,6 +427,34 @@ retry: If one job of a multi-node task fails with retry enabled, `dstack` will stop all the jobs and resubmit the run. +### Priority + +Be default, submitted runs are scheduled in the order they were submitted. +When compute resources are limited, you may want to prioritize some runs over others. +This can be done by specifying the [`priority`](../reference/dstack.yml/task.md) property in the run configuration: + +
+ +```yaml +type: task +name: train + +python: "3.10" + +# Commands of the task +commands: + - pip install -r fine-tuning/qlora/requirements.txt + - python fine-tuning/qlora/train.py + +priority: 50 +``` + +
+ +`dstack` tries to provision runs with higher priority first. +Note that if a high priority run cannot be scheduled, +it does not block other runs with lower priority from scheduling. + --8<-- "docs/concepts/snippets/manage-fleets.ext" --8<-- "docs/concepts/snippets/manage-runs.ext" From af3d3156e91870269ba5d2414cf2158727885dbf Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 15 May 2025 15:13:34 +0500 Subject: [PATCH 4/4] Exclude priority for backward compatibility --- src/dstack/api/server/_runs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dstack/api/server/_runs.py b/src/dstack/api/server/_runs.py index bed56f29da..d7fd2f4d0c 100644 --- a/src/dstack/api/server/_runs.py +++ b/src/dstack/api/server/_runs.py @@ -186,6 +186,8 @@ def _get_run_spec_excludes(run_spec: RunSpec) -> Optional[Dict]: configuration_excludes["rate_limits"] = True if configuration.shell is None: configuration_excludes["shell"] = True + if configuration.priority is None: + configuration_excludes["priority"] = True if configuration_excludes: spec_excludes["configuration"] = configuration_excludes