Skip to content

Commit 6fd1f1b

Browse files
authored
Fix no offers retry for scheduled runs (#3759)
1 parent 4062911 commit 6fd1f1b

File tree

4 files changed

+114
-5
lines changed

4 files changed

+114
-5
lines changed

src/dstack/_internal/server/background/pipeline_tasks/runs/active.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -288,10 +288,11 @@ async def _should_retry_job(
288288
job_model: JobModel,
289289
) -> Optional[timedelta]:
290290
"""
291-
Checks if the job should be retried.
292-
Returns the current duration of retrying if retry is enabled.
293-
Retrying duration is calculated as the time since `last_processed_at`
294-
of the latest provisioned submission.
291+
Checks if the job should be retried and returns the elapsed retry duration.
292+
293+
For `no-capacity`, retry is limited by the age of the current run. Once the
294+
job has already provisioned, retry is limited by the time since the latest
295+
provisioned submission for that job.
295296
"""
296297
job_spec = get_job_spec(job_model)
297298
if job_spec.retry is None:
@@ -309,7 +310,13 @@ async def _should_retry_job(
309310
and last_provisioned is None
310311
and RetryEvent.NO_CAPACITY in job_spec.retry.on_events
311312
):
312-
return get_current_datetime() - run_model.submitted_at
313+
retry_started_at = run_model.submitted_at
314+
if run_model.next_triggered_at is not None:
315+
# Scheduled runs keep `next_triggered_at` pointing to the current trigger time while
316+
# retrying. Retryable failures go back to PENDING directly, and the terminating worker
317+
# advances `next_triggered_at` only when the current execution is over.
318+
retry_started_at = run_model.next_triggered_at
319+
return get_current_datetime() - retry_started_at
313320

314321
if (
315322
job_model.termination_reason is not None

src/dstack/_internal/server/background/pipeline_tasks/runs/terminating.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class TerminatingRunUpdateMap(ItemUpdateMap, total=False):
2929
status: RunStatus
3030
next_triggered_at: Optional[datetime]
3131
fleet_id: Optional[uuid.UUID]
32+
resubmission_attempt: int
3233

3334

3435
class TerminatingRunJobUpdateMap(ItemUpdateMap, total=False):
@@ -134,6 +135,7 @@ def _get_run_update_map(run_model: models.RunModel) -> TerminatingRunUpdateMap:
134135
status=RunStatus.PENDING,
135136
next_triggered_at=_get_next_triggered_at(run_spec),
136137
fleet_id=None,
138+
resubmission_attempt=0,
137139
)
138140
return TerminatingRunUpdateMap(status=termination_reason.to_status())
139141

src/tests/_internal/server/background/pipeline_tasks/test_runs/test_active.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
Profile,
1818
ProfileRetry,
1919
RetryEvent,
20+
Schedule,
2021
StopCriteria,
2122
)
2223
from dstack._internal.core.models.resources import Range
@@ -295,6 +296,104 @@ async def test_retries_no_capacity_replica_and_keeps_service_running(
295296
assert retried_job.status == JobStatus.SUBMITTED
296297
assert len(jobs) == 3
297298

299+
async def test_retries_scheduled_run_no_capacity_from_trigger_time(
300+
self, test_db, session: AsyncSession, worker: RunWorker
301+
) -> None:
302+
project = await create_project(session=session)
303+
user = await create_user(session=session)
304+
repo = await create_repo(session=session, project_id=project.id)
305+
run_spec = get_run_spec(
306+
repo_id=repo.name,
307+
profile=Profile(
308+
name="default",
309+
retry=ProfileRetry(duration=3600, on_events=[RetryEvent.NO_CAPACITY]),
310+
),
311+
configuration=TaskConfiguration(
312+
commands=["echo hello"],
313+
schedule=Schedule(cron="15 * * * *"),
314+
),
315+
)
316+
trigger_time = get_current_datetime() - timedelta(minutes=5)
317+
run = await create_run(
318+
session=session,
319+
project=project,
320+
repo=repo,
321+
user=user,
322+
run_spec=run_spec,
323+
status=RunStatus.SUBMITTED,
324+
submitted_at=get_current_datetime() - timedelta(hours=2),
325+
next_triggered_at=trigger_time,
326+
resubmission_attempt=0,
327+
)
328+
await create_job(
329+
session=session,
330+
run=run,
331+
status=JobStatus.FAILED,
332+
termination_reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
333+
)
334+
lock_run(run)
335+
await session.commit()
336+
337+
with patch(
338+
"dstack._internal.server.background.pipeline_tasks.runs.active.get_current_datetime",
339+
return_value=trigger_time + timedelta(minutes=10),
340+
):
341+
await worker.process(run_to_pipeline_item(run))
342+
343+
await session.refresh(run)
344+
assert run.status == RunStatus.PENDING
345+
assert run.resubmission_attempt == 1
346+
assert run.lock_token is None
347+
348+
async def test_terminates_scheduled_run_when_no_capacity_retry_exceeded_from_trigger_time(
349+
self, test_db, session: AsyncSession, worker: RunWorker
350+
) -> None:
351+
project = await create_project(session=session)
352+
user = await create_user(session=session)
353+
repo = await create_repo(session=session, project_id=project.id)
354+
run_spec = get_run_spec(
355+
repo_id=repo.name,
356+
profile=Profile(
357+
name="default",
358+
retry=ProfileRetry(duration=600, on_events=[RetryEvent.NO_CAPACITY]),
359+
),
360+
configuration=TaskConfiguration(
361+
commands=["echo hello"],
362+
schedule=Schedule(cron="15 * * * *"),
363+
),
364+
)
365+
trigger_time = get_current_datetime() - timedelta(minutes=20)
366+
run = await create_run(
367+
session=session,
368+
project=project,
369+
repo=repo,
370+
user=user,
371+
run_spec=run_spec,
372+
status=RunStatus.SUBMITTED,
373+
submitted_at=get_current_datetime() - timedelta(hours=2),
374+
next_triggered_at=trigger_time,
375+
resubmission_attempt=0,
376+
)
377+
await create_job(
378+
session=session,
379+
run=run,
380+
status=JobStatus.FAILED,
381+
termination_reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
382+
)
383+
lock_run(run)
384+
await session.commit()
385+
386+
with patch(
387+
"dstack._internal.server.background.pipeline_tasks.runs.active.get_current_datetime",
388+
return_value=trigger_time + timedelta(minutes=20),
389+
):
390+
await worker.process(run_to_pipeline_item(run))
391+
392+
await session.refresh(run)
393+
assert run.status == RunStatus.TERMINATING
394+
assert run.termination_reason == RunTerminationReason.RETRY_LIMIT_EXCEEDED
395+
assert run.lock_token is None
396+
298397
async def test_retrying_multinode_replica_terminates_active_sibling_jobs(
299398
self, test_db, session: AsyncSession, worker: RunWorker
300399
) -> None:

src/tests/_internal/server/background/pipeline_tasks/test_runs/test_termination.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ async def test_reschedules_scheduled_run_and_clears_fleet(
221221
await session.refresh(run)
222222
assert run.status == RunStatus.PENDING
223223
assert run.next_triggered_at == datetime(2023, 1, 2, 3, 15, tzinfo=timezone.utc)
224+
assert run.resubmission_attempt == 0
224225
assert run.fleet_id is None
225226
assert run.lock_token is None
226227
assert run.lock_expires_at is None

0 commit comments

Comments
 (0)