diff --git a/docs/docs/concepts/tasks.md b/docs/docs/concepts/tasks.md
index dcdc6b5304..3dc4b08b6f 100644
--- a/docs/docs/concepts/tasks.md
+++ b/docs/docs/concepts/tasks.md
@@ -427,6 +427,34 @@ retry:
If one job of a multi-node task fails with retry enabled,
`dstack` will stop all the jobs and resubmit the run.
+### Priority
+
+Be default, submitted runs are scheduled in the order they were submitted.
+When compute resources are limited, you may want to prioritize some runs over others.
+This can be done by specifying the [`priority`](../reference/dstack.yml/task.md) property in the run configuration:
+
+
+
+```yaml
+type: task
+name: train
+
+python: "3.10"
+
+# Commands of the task
+commands:
+ - pip install -r fine-tuning/qlora/requirements.txt
+ - python fine-tuning/qlora/train.py
+
+priority: 50
+```
+
+
+
+`dstack` tries to provision runs with higher priority first.
+Note that if a high priority run cannot be scheduled,
+it does not block other runs with lower priority from scheduling.
+
--8<-- "docs/concepts/snippets/manage-fleets.ext"
--8<-- "docs/concepts/snippets/manage-runs.ext"
diff --git a/src/dstack/_internal/core/models/configurations.py b/src/dstack/_internal/core/models/configurations.py
index f2ce73e85b..b12fb703db 100644
--- a/src/dstack/_internal/core/models/configurations.py
+++ b/src/dstack/_internal/core/models/configurations.py
@@ -23,6 +23,9 @@
MAX_INT64 = 2**63 - 1
SERVICE_HTTPS_DEFAULT = True
STRIP_PREFIX_DEFAULT = True
+RUN_PRIOTIRY_MIN = 0
+RUN_PRIOTIRY_MAX = 100
+RUN_PRIORITY_DEFAULT = 0
class RunConfigurationType(str, Enum):
@@ -221,14 +224,26 @@ class BaseRunConfiguration(CoreModel):
)
),
] = None
- # deprecated since 0.18.31; task, service -- no effect; dev-environment -- executed right before `init`
- setup: CommandsList = []
resources: Annotated[
ResourcesSpec, Field(description="The resources requirements to run the configuration")
] = ResourcesSpec()
+ priority: Annotated[
+ Optional[int],
+ Field(
+ ge=RUN_PRIOTIRY_MIN,
+ le=RUN_PRIOTIRY_MAX,
+ description=(
+ f"The priority of the run, an integer between `{RUN_PRIOTIRY_MIN}` and `{RUN_PRIOTIRY_MAX}`."
+ " `dstack` tries to provision runs with higher priority first."
+ f" Defaults to `{RUN_PRIORITY_DEFAULT}`"
+ ),
+ ),
+ ] = None
volumes: Annotated[
List[Union[MountPoint, str]], Field(description="The volumes mount points")
] = []
+ # deprecated since 0.18.31; task, service -- no effect; dev-environment -- executed right before `init`
+ setup: CommandsList = []
@validator("python", pre=True, always=True)
def convert_python(cls, v, values) -> Optional[PythonVersion]:
diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py
index efa8600a43..7dbb0ef3c9 100644
--- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py
+++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py
@@ -93,11 +93,20 @@ async def _process_next_submitted_job():
async with lock:
res = await session.execute(
select(JobModel)
+ .join(JobModel.run)
.where(
JobModel.status == JobStatus.SUBMITTED,
JobModel.id.not_in(lockset),
)
- .order_by(JobModel.last_processed_at.asc())
+ # Jobs are process in FIFO sorted by priority globally,
+ # thus runs from different project can "overtake" each other by using higher priorities.
+ # That's not a big problem as long as projects do not compete for the same compute resources.
+ # Jobs with lower priorities from other projects will be processed without major lag
+ # as long as new higher priority runs are not constantly submitted.
+ # TODO: Consider processing jobs from different projects fairly/round-robin
+ # Fully fair processing can be tricky to implement via the current DB queue as
+ # there can be many projects and we are limited by the max DB connections.
+ .order_by(RunModel.priority.desc(), JobModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
)
diff --git a/src/dstack/_internal/server/migrations/versions/bca2fdf130bf_add_runmodel_priority.py b/src/dstack/_internal/server/migrations/versions/bca2fdf130bf_add_runmodel_priority.py
new file mode 100644
index 0000000000..4c51278534
--- /dev/null
+++ b/src/dstack/_internal/server/migrations/versions/bca2fdf130bf_add_runmodel_priority.py
@@ -0,0 +1,34 @@
+"""Add RunModel.priority
+
+Revision ID: bca2fdf130bf
+Revises: 20166748b60c
+Create Date: 2025-05-14 15:24:21.269775
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "bca2fdf130bf"
+down_revision = "20166748b60c"
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ # ### commands auto generated by Alembic - please adjust! ###
+ with op.batch_alter_table("runs", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("priority", sa.Integer(), nullable=True))
+ batch_op.execute("UPDATE runs SET priority = 0")
+ with op.batch_alter_table("runs", schema=None) as batch_op:
+ batch_op.alter_column("priority", nullable=False)
+ # ### end Alembic commands ###
+
+
+def downgrade() -> None:
+ # ### commands auto generated by Alembic - please adjust! ###
+ with op.batch_alter_table("runs", schema=None) as batch_op:
+ batch_op.drop_column("priority")
+
+ # ### end Alembic commands ###
diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py
index 322f2163bb..161e242bcb 100644
--- a/src/dstack/_internal/server/models.py
+++ b/src/dstack/_internal/server/models.py
@@ -348,6 +348,7 @@ class RunModel(BaseModel):
resubmission_attempt: Mapped[int] = mapped_column(Integer, default=0)
run_spec: Mapped[str] = mapped_column(Text)
service_spec: Mapped[Optional[str]] = mapped_column(Text)
+ priority: Mapped[int] = mapped_column(Integer, default=0)
jobs: Mapped[List["JobModel"]] = relationship(
back_populates="run", lazy="selectin", order_by="[JobModel.replica_num, JobModel.job_num]"
diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py
index d2df9eda44..4f8d9b6941 100644
--- a/src/dstack/_internal/server/services/runs.py
+++ b/src/dstack/_internal/server/services/runs.py
@@ -16,7 +16,7 @@
ServerClientError,
)
from dstack._internal.core.models.common import ApplyAction
-from dstack._internal.core.models.configurations import AnyRunConfiguration
+from dstack._internal.core.models.configurations import RUN_PRIORITY_DEFAULT, AnyRunConfiguration
from dstack._internal.core.models.instances import (
InstanceAvailability,
InstanceOfferWithAvailability,
@@ -434,7 +434,12 @@ async def apply_plan(
# FIXME: potentially long write transaction
# Avoid getting run_model after update
await session.execute(
- update(RunModel).where(RunModel.id == current_resource.id).values(run_spec=run_spec.json())
+ update(RunModel)
+ .where(RunModel.id == current_resource.id)
+ .values(
+ run_spec=run_spec.json(),
+ priority=run_spec.configuration.priority,
+ )
)
run = await get_run_by_name(
session=session,
@@ -495,6 +500,7 @@ async def submit_run(
status=RunStatus.SUBMITTED,
run_spec=run_spec.json(),
last_processed_at=submitted_at,
+ priority=run_spec.configuration.priority,
)
session.add(run_model)
@@ -852,6 +858,13 @@ def _get_job_submission_cost(job_submission: JobSubmission) -> float:
def _validate_run_spec_and_set_defaults(run_spec: RunSpec):
+ # This function may set defaults for null run_spec values,
+ # although most defaults are resolved when building job_spec
+ # so that we can keep both the original user-supplied value (null in run_spec)
+ # and the default in job_spec.
+ # If a property is stored in job_spec - resolve the default there.
+ # Server defaults are preferable over client defaults so that
+ # the defaults depend on the server version, not the client version.
if run_spec.run_name is not None:
validate_dstack_resource_name(run_spec.run_name)
for mount_point in run_spec.configuration.volumes:
@@ -875,11 +888,14 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec):
raise ServerClientError(
f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_TTL_SECONDS}s"
)
+ if run_spec.configuration.priority is None:
+ run_spec.configuration.priority = RUN_PRIORITY_DEFAULT
set_resources_defaults(run_spec.configuration.resources)
_UPDATABLE_SPEC_FIELDS = ["repo_code_hash", "configuration"]
-_CONF_TYPE_TO_UPDATABLE_FIELDS = {
+_CONF_UPDATABLE_FIELDS = ["priority"]
+_TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS = {
"dev-environment": ["inactivity_duration"],
# Most service fields can be updated via replica redeployment.
# TODO: Allow updating other fields when rolling deployment is supported.
@@ -915,12 +931,9 @@ def _check_can_update_configuration(
raise ServerClientError(
f"Configuration type changed from {current.type} to {new.type}, cannot update"
)
- updatable_fields = _CONF_TYPE_TO_UPDATABLE_FIELDS.get(new.type)
- if updatable_fields is None:
- raise ServerClientError(
- f"Can only update {', '.join(_CONF_TYPE_TO_UPDATABLE_FIELDS)} configurations."
- f" Not {new.type}"
- )
+ updatable_fields = _CONF_UPDATABLE_FIELDS + _TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS.get(
+ new.type, []
+ )
diff = diff_models(current, new)
changed_fields = list(diff.keys())
for key in changed_fields:
diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py
index 6d6f37e9f5..d50ac01158 100644
--- a/src/dstack/_internal/server/testing/common.py
+++ b/src/dstack/_internal/server/testing/common.py
@@ -262,6 +262,7 @@ async def create_run(
run_spec: Optional[RunSpec] = None,
run_id: Optional[UUID] = None,
deleted: bool = False,
+ priority: int = 0,
) -> RunModel:
if run_spec is None:
run_spec = get_run_spec(
@@ -282,6 +283,7 @@ async def create_run(
run_spec=run_spec.json(),
last_processed_at=submitted_at,
jobs=[],
+ priority=priority,
)
session.add(run)
await session.commit()
diff --git a/src/dstack/api/server/_runs.py b/src/dstack/api/server/_runs.py
index bed56f29da..d7fd2f4d0c 100644
--- a/src/dstack/api/server/_runs.py
+++ b/src/dstack/api/server/_runs.py
@@ -186,6 +186,8 @@ def _get_run_spec_excludes(run_spec: RunSpec) -> Optional[Dict]:
configuration_excludes["rate_limits"] = True
if configuration.shell is None:
configuration_excludes["shell"] = True
+ if configuration.priority is None:
+ configuration_excludes["priority"] = True
if configuration_excludes:
spec_excludes["configuration"] = configuration_excludes
diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py
index 24cb19daab..e23d787f5e 100644
--- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py
+++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py
@@ -634,3 +634,63 @@ async def test_creates_new_instance_in_existing_fleet(self, test_db, session: As
assert job.instance is not None
assert job.instance.instance_num == 1
assert job.instance.fleet_id == fleet.id
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
+ async def test_picks_high_priority_jobs_first(self, test_db, session: AsyncSession):
+ project = await create_project(session)
+ user = await create_user(session)
+ repo = await create_repo(
+ session=session,
+ project_id=project.id,
+ )
+ instance = await create_instance(
+ session=session,
+ project=project,
+ status=InstanceStatus.IDLE,
+ )
+ run1 = await create_run(
+ session=session,
+ project=project,
+ repo=repo,
+ user=user,
+ priority=10,
+ )
+ job1 = await create_job(
+ session=session,
+ run=run1,
+ instance_assigned=True,
+ instance=instance,
+ )
+ run2 = await create_run(
+ session=session,
+ project=project,
+ repo=repo,
+ user=user,
+ priority=0,
+ )
+ job2 = await create_job(
+ session=session, run=run2, instance_assigned=True, instance=instance
+ )
+ run3 = await create_run(
+ session=session,
+ project=project,
+ repo=repo,
+ user=user,
+ priority=100,
+ )
+ job3 = await create_job(
+ session=session,
+ run=run3,
+ instance_assigned=True,
+ instance=instance,
+ )
+ await process_submitted_jobs()
+ await session.refresh(job3)
+ assert job3.status == JobStatus.PROVISIONING
+ await process_submitted_jobs()
+ await session.refresh(job1)
+ assert job1.status == JobStatus.PROVISIONING
+ await process_submitted_jobs()
+ await session.refresh(job2)
+ assert job2.status == JobStatus.PROVISIONING
diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py
index 00141c9a0b..c20ef77cb4 100644
--- a/src/tests/_internal/server/routers/test_runs.py
+++ b/src/tests/_internal/server/routers/test_runs.py
@@ -124,6 +124,7 @@ def get_dev_env_run_plan_dict(
"reservation": None,
"fleets": None,
"tags": None,
+ "priority": 0,
},
"configuration_path": "dstack.yaml",
"profile": {
@@ -284,6 +285,7 @@ def get_dev_env_run_dict(
"reservation": None,
"fleets": None,
"tags": None,
+ "priority": 0,
},
"configuration_path": "dstack.yaml",
"profile": {