Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/docs/concepts/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,34 @@ retry:
If one job of a multi-node task fails with retry enabled,
`dstack` will stop all the jobs and resubmit the run.

### Priority

Be default, submitted runs are scheduled in the order they were submitted.
When compute resources are limited, you may want to prioritize some runs over others.
This can be done by specifying the [`priority`](../reference/dstack.yml/task.md) property in the run configuration:

<div editor-title=".dstack.yml">

```yaml
type: task
name: train

python: "3.10"

# Commands of the task
commands:
- pip install -r fine-tuning/qlora/requirements.txt
- python fine-tuning/qlora/train.py

priority: 50
```

</div>

`dstack` tries to provision runs with higher priority first.
Note that if a high priority run cannot be scheduled,
it does not block other runs with lower priority from scheduling.

--8<-- "docs/concepts/snippets/manage-fleets.ext"

--8<-- "docs/concepts/snippets/manage-runs.ext"
Expand Down
19 changes: 17 additions & 2 deletions src/dstack/_internal/core/models/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
MAX_INT64 = 2**63 - 1
SERVICE_HTTPS_DEFAULT = True
STRIP_PREFIX_DEFAULT = True
RUN_PRIOTIRY_MIN = 0
RUN_PRIOTIRY_MAX = 100
RUN_PRIORITY_DEFAULT = 0


class RunConfigurationType(str, Enum):
Expand Down Expand Up @@ -221,14 +224,26 @@ class BaseRunConfiguration(CoreModel):
)
),
] = None
# deprecated since 0.18.31; task, service -- no effect; dev-environment -- executed right before `init`
setup: CommandsList = []
resources: Annotated[
ResourcesSpec, Field(description="The resources requirements to run the configuration")
] = ResourcesSpec()
priority: Annotated[
Optional[int],
Field(
ge=RUN_PRIOTIRY_MIN,
le=RUN_PRIOTIRY_MAX,
description=(
f"The priority of the run, an integer between `{RUN_PRIOTIRY_MIN}` and `{RUN_PRIOTIRY_MAX}`."
" `dstack` tries to provision runs with higher priority first."
f" Defaults to `{RUN_PRIORITY_DEFAULT}`"
),
),
] = None
volumes: Annotated[
List[Union[MountPoint, str]], Field(description="The volumes mount points")
] = []
# deprecated since 0.18.31; task, service -- no effect; dev-environment -- executed right before `init`
setup: CommandsList = []

@validator("python", pre=True, always=True)
def convert_python(cls, v, values) -> Optional[PythonVersion]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,20 @@ async def _process_next_submitted_job():
async with lock:
res = await session.execute(
select(JobModel)
.join(JobModel.run)
.where(
JobModel.status == JobStatus.SUBMITTED,
JobModel.id.not_in(lockset),
)
.order_by(JobModel.last_processed_at.asc())
# Jobs are process in FIFO sorted by priority globally,
# thus runs from different project can "overtake" each other by using higher priorities.
# That's not a big problem as long as projects do not compete for the same compute resources.
# Jobs with lower priorities from other projects will be processed without major lag
# as long as new higher priority runs are not constantly submitted.
# TODO: Consider processing jobs from different projects fairly/round-robin
# Fully fair processing can be tricky to implement via the current DB queue as
# there can be many projects and we are limited by the max DB connections.
.order_by(RunModel.priority.desc(), JobModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Add RunModel.priority

Revision ID: bca2fdf130bf
Revises: 20166748b60c
Create Date: 2025-05-14 15:24:21.269775

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "bca2fdf130bf"
down_revision = "20166748b60c"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("runs", schema=None) as batch_op:
batch_op.add_column(sa.Column("priority", sa.Integer(), nullable=True))
batch_op.execute("UPDATE runs SET priority = 0")
with op.batch_alter_table("runs", schema=None) as batch_op:
batch_op.alter_column("priority", nullable=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("runs", schema=None) as batch_op:
batch_op.drop_column("priority")

# ### end Alembic commands ###
1 change: 1 addition & 0 deletions src/dstack/_internal/server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ class RunModel(BaseModel):
resubmission_attempt: Mapped[int] = mapped_column(Integer, default=0)
run_spec: Mapped[str] = mapped_column(Text)
service_spec: Mapped[Optional[str]] = mapped_column(Text)
priority: Mapped[int] = mapped_column(Integer, default=0)

jobs: Mapped[List["JobModel"]] = relationship(
back_populates="run", lazy="selectin", order_by="[JobModel.replica_num, JobModel.job_num]"
Expand Down
31 changes: 22 additions & 9 deletions src/dstack/_internal/server/services/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ServerClientError,
)
from dstack._internal.core.models.common import ApplyAction
from dstack._internal.core.models.configurations import AnyRunConfiguration
from dstack._internal.core.models.configurations import RUN_PRIORITY_DEFAULT, AnyRunConfiguration
from dstack._internal.core.models.instances import (
InstanceAvailability,
InstanceOfferWithAvailability,
Expand Down Expand Up @@ -434,7 +434,12 @@ async def apply_plan(
# FIXME: potentially long write transaction
# Avoid getting run_model after update
await session.execute(
update(RunModel).where(RunModel.id == current_resource.id).values(run_spec=run_spec.json())
update(RunModel)
.where(RunModel.id == current_resource.id)
.values(
run_spec=run_spec.json(),
priority=run_spec.configuration.priority,
)
)
run = await get_run_by_name(
session=session,
Expand Down Expand Up @@ -495,6 +500,7 @@ async def submit_run(
status=RunStatus.SUBMITTED,
run_spec=run_spec.json(),
last_processed_at=submitted_at,
priority=run_spec.configuration.priority,
)
session.add(run_model)

Expand Down Expand Up @@ -852,6 +858,13 @@ def _get_job_submission_cost(job_submission: JobSubmission) -> float:


def _validate_run_spec_and_set_defaults(run_spec: RunSpec):
# This function may set defaults for null run_spec values,
# although most defaults are resolved when building job_spec
# so that we can keep both the original user-supplied value (null in run_spec)
# and the default in job_spec.
# If a property is stored in job_spec - resolve the default there.
# Server defaults are preferable over client defaults so that
# the defaults depend on the server version, not the client version.
if run_spec.run_name is not None:
validate_dstack_resource_name(run_spec.run_name)
for mount_point in run_spec.configuration.volumes:
Expand All @@ -875,11 +888,14 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec):
raise ServerClientError(
f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_TTL_SECONDS}s"
)
if run_spec.configuration.priority is None:
run_spec.configuration.priority = RUN_PRIORITY_DEFAULT
set_resources_defaults(run_spec.configuration.resources)


_UPDATABLE_SPEC_FIELDS = ["repo_code_hash", "configuration"]
_CONF_TYPE_TO_UPDATABLE_FIELDS = {
_CONF_UPDATABLE_FIELDS = ["priority"]
_TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS = {
"dev-environment": ["inactivity_duration"],
# Most service fields can be updated via replica redeployment.
# TODO: Allow updating other fields when rolling deployment is supported.
Expand Down Expand Up @@ -915,12 +931,9 @@ def _check_can_update_configuration(
raise ServerClientError(
f"Configuration type changed from {current.type} to {new.type}, cannot update"
)
updatable_fields = _CONF_TYPE_TO_UPDATABLE_FIELDS.get(new.type)
if updatable_fields is None:
raise ServerClientError(
f"Can only update {', '.join(_CONF_TYPE_TO_UPDATABLE_FIELDS)} configurations."
f" Not {new.type}"
)
updatable_fields = _CONF_UPDATABLE_FIELDS + _TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS.get(
new.type, []
)
diff = diff_models(current, new)
changed_fields = list(diff.keys())
for key in changed_fields:
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/server/testing/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ async def create_run(
run_spec: Optional[RunSpec] = None,
run_id: Optional[UUID] = None,
deleted: bool = False,
priority: int = 0,
) -> RunModel:
if run_spec is None:
run_spec = get_run_spec(
Expand All @@ -282,6 +283,7 @@ async def create_run(
run_spec=run_spec.json(),
last_processed_at=submitted_at,
jobs=[],
priority=priority,
)
session.add(run)
await session.commit()
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/api/server/_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ def _get_run_spec_excludes(run_spec: RunSpec) -> Optional[Dict]:
configuration_excludes["rate_limits"] = True
if configuration.shell is None:
configuration_excludes["shell"] = True
if configuration.priority is None:
configuration_excludes["priority"] = True

if configuration_excludes:
spec_excludes["configuration"] = configuration_excludes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,3 +634,63 @@ async def test_creates_new_instance_in_existing_fleet(self, test_db, session: As
assert job.instance is not None
assert job.instance.instance_num == 1
assert job.instance.fleet_id == fleet.id

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_picks_high_priority_jobs_first(self, test_db, session: AsyncSession):
project = await create_project(session)
user = await create_user(session)
repo = await create_repo(
session=session,
project_id=project.id,
)
instance = await create_instance(
session=session,
project=project,
status=InstanceStatus.IDLE,
)
run1 = await create_run(
session=session,
project=project,
repo=repo,
user=user,
priority=10,
)
job1 = await create_job(
session=session,
run=run1,
instance_assigned=True,
instance=instance,
)
run2 = await create_run(
session=session,
project=project,
repo=repo,
user=user,
priority=0,
)
job2 = await create_job(
session=session, run=run2, instance_assigned=True, instance=instance
)
run3 = await create_run(
session=session,
project=project,
repo=repo,
user=user,
priority=100,
)
job3 = await create_job(
session=session,
run=run3,
instance_assigned=True,
instance=instance,
)
await process_submitted_jobs()
await session.refresh(job3)
assert job3.status == JobStatus.PROVISIONING
await process_submitted_jobs()
await session.refresh(job1)
assert job1.status == JobStatus.PROVISIONING
await process_submitted_jobs()
await session.refresh(job2)
assert job2.status == JobStatus.PROVISIONING
2 changes: 2 additions & 0 deletions src/tests/_internal/server/routers/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def get_dev_env_run_plan_dict(
"reservation": None,
"fleets": None,
"tags": None,
"priority": 0,
},
"configuration_path": "dstack.yaml",
"profile": {
Expand Down Expand Up @@ -284,6 +285,7 @@ def get_dev_env_run_dict(
"reservation": None,
"fleets": None,
"tags": None,
"priority": 0,
},
"configuration_path": "dstack.yaml",
"profile": {
Expand Down
Loading