Skip to content

Commit dc54c67

Browse files
committed
Support multiple cron expressions
1 parent 3eb81df commit dc54c67

3 files changed

Lines changed: 70 additions & 16 deletions

File tree

src/dstack/_internal/core/models/profiles.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,20 +170,34 @@ def validate_time_window(cls, v: Union[int, str]) -> int:
170170

171171
class Schedule(CoreModel):
172172
cron: Annotated[
173-
Optional[str],
173+
Union[List[str], str],
174174
Field(
175175
description=(
176-
"The UTC time when the run needs to be started specified using POSIX cron syntax"
176+
"A cron expression or a list of cron expressions specifying the UTC time when the run needs to be started"
177177
)
178178
),
179-
] = None
179+
]
180180

181181
@validator("cron")
182-
def _validate_cron(cls, v) -> Optional[str]:
183-
if v is None:
184-
return None
185-
validate_cron(v)
186-
return v
182+
def _validate_cron(cls, v: Union[List[str], str]) -> List[str]:
183+
if isinstance(v, str):
184+
values = [v]
185+
else:
186+
values = v
187+
if len(values) == 0:
188+
raise ValueError("At least one cron expression must be specified")
189+
for value in values:
190+
validate_cron(value)
191+
return values
192+
193+
@property
194+
def crons(self) -> List[str]:
195+
"""
196+
Access `cron` attribute as a list.
197+
"""
198+
if isinstance(self.cron, str):
199+
return [self.cron]
200+
return self.cron
187201

188202

189203
class ProfileParams(CoreModel):

src/dstack/_internal/server/services/runs.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,11 +1253,14 @@ def _remove_job_spec_sensitive_info(spec: JobSpec):
12531253
def _get_next_triggered_at(run_spec: RunSpec) -> Optional[datetime]:
12541254
if run_spec.merged_profile.schedule is None:
12551255
return None
1256-
cron_trigger = CronTrigger.from_crontab(
1257-
run_spec.merged_profile.schedule.cron,
1258-
timezone=timezone.utc,
1259-
)
1260-
return cron_trigger.get_next_fire_time(
1261-
previous_fire_time=None,
1262-
now=common_utils.get_current_datetime(),
1263-
)
1256+
now = common_utils.get_current_datetime()
1257+
fire_times = []
1258+
for cron in run_spec.merged_profile.schedule.crons:
1259+
cron_trigger = CronTrigger.from_crontab(cron, timezone=timezone.utc)
1260+
fire_times.append(
1261+
cron_trigger.get_next_fire_time(
1262+
previous_fire_time=None,
1263+
now=now,
1264+
)
1265+
)
1266+
return min(fire_times)

src/tests/_internal/server/routers/test_runs.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import pytest
99
from fastapi.testclient import TestClient
10+
from freezegun import freeze_time
1011
from httpx import AsyncClient
1112
from sqlalchemy import select
1213
from sqlalchemy.ext.asyncio import AsyncSession
@@ -28,6 +29,7 @@
2829
InstanceType,
2930
Resources,
3031
)
32+
from dstack._internal.core.models.profiles import Schedule
3133
from dstack._internal.core.models.resources import Range
3234
from dstack._internal.core.models.runs import (
3335
ApplyRunPlanInput,
@@ -1370,6 +1372,41 @@ async def test_updates_run(self, test_db, session: AsyncSession, client: AsyncCl
13701372
assert run.run_spec.configuration.replicas == Range(min=1, max=1)
13711373
assert updated_run.run_spec.configuration.replicas == Range(min=2, max=2)
13721374

1375+
@pytest.mark.asyncio
1376+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
1377+
async def test_creates_pending_run_if_run_is_scheduled(
1378+
self, test_db, session: AsyncSession, client: AsyncClient
1379+
):
1380+
user = await create_user(session=session, global_role=GlobalRole.USER)
1381+
project = await create_project(session=session, owner=user)
1382+
await add_project_member(
1383+
session=session, project=project, user=user, project_role=ProjectRole.USER
1384+
)
1385+
repo = await create_repo(session=session, project_id=project.id)
1386+
run_spec = get_run_spec(
1387+
run_name="test-run",
1388+
repo_id=repo.name,
1389+
)
1390+
run_spec.configuration.schedule = Schedule(cron=["5 * * * *", "10 * * * *"])
1391+
with freeze_time(datetime(2023, 1, 2, 3, 9, tzinfo=timezone.utc)):
1392+
response = await client.post(
1393+
f"/api/project/{project.name}/runs/apply",
1394+
headers=get_auth_headers(user.token),
1395+
json={
1396+
"plan": {
1397+
"run_spec": json.loads(run_spec.json()),
1398+
"current_resource": None,
1399+
},
1400+
"force": False,
1401+
},
1402+
)
1403+
assert response.status_code == 200, response.json()
1404+
res = await session.execute(select(RunModel))
1405+
run = res.scalar()
1406+
assert run is not None
1407+
assert run.status == RunStatus.PENDING
1408+
assert run.next_triggered_at == datetime(2023, 1, 2, 3, 10)
1409+
13731410

13741411
class TestSubmitRun:
13751412
@pytest.mark.asyncio

0 commit comments

Comments
 (0)