3131 JobSubmission ,
3232 JobTerminationReason ,
3333 RunSpec ,
34+ RunTerminationReason ,
3435)
3536from dstack ._internal .core .models .volumes import Volume , VolumeMountPoint , VolumeStatus
3637from dstack ._internal .server import settings
@@ -349,6 +350,7 @@ async def process_terminating_job(
349350 if len (volume_models ) > 0 :
350351 logger .info ("Detaching volumes: %s" , [v .name for v in volume_models ])
351352 all_volumes_detached = await _detach_volumes_from_job_instance (
353+ session = session ,
352354 project = instance_model .project ,
353355 job_model = job_model ,
354356 jpd = jpd ,
@@ -432,6 +434,7 @@ async def process_volumes_detaching(
432434 )
433435 logger .info ("Detaching volumes: %s" , [v .name for v in volume_models ])
434436 all_volumes_detached = await _detach_volumes_from_job_instance (
437+ session = session ,
435438 project = instance_model .project ,
436439 job_model = job_model ,
437440 jpd = jpd ,
@@ -523,6 +526,7 @@ def group_jobs_by_replica_latest(jobs: List[JobModel]) -> Iterable[Tuple[int, Li
523526
524527
525528async def _detach_volumes_from_job_instance (
529+ session : AsyncSession ,
526530 project : ProjectModel ,
527531 job_model : JobModel ,
528532 jpd : JobProvisioningData ,
@@ -544,6 +548,7 @@ async def _detach_volumes_from_job_instance(
544548 detached_volumes = []
545549 for volume_model in volume_models :
546550 detached = await _detach_volume_from_job_instance (
551+ session = session ,
547552 backend = backend ,
548553 job_model = job_model ,
549554 jpd = jpd ,
@@ -566,6 +571,7 @@ async def _detach_volumes_from_job_instance(
566571
567572
568573async def _detach_volume_from_job_instance (
574+ session : AsyncSession ,
569575 backend : Backend ,
570576 job_model : JobModel ,
571577 jpd : JobProvisioningData ,
@@ -601,7 +607,10 @@ async def _detach_volume_from_job_instance(
601607 volume = volume ,
602608 provisioning_data = jpd ,
603609 )
604- if not detached and _should_force_detach_volume (job_model , job_spec .stop_duration ):
610+ run_termination_reason = await _get_run_termination_reason (session , job_model )
611+ if not detached and _should_force_detach_volume (
612+ job_model , run_termination_reason , job_spec .stop_duration
613+ ):
605614 logger .info (
606615 "Force detaching volume %s from %s" ,
607616 volume_model .name ,
@@ -633,13 +642,27 @@ async def _detach_volume_from_job_instance(
633642MIN_FORCE_DETACH_WAIT_PERIOD = timedelta (seconds = 60 )
634643
635644
636- def _should_force_detach_volume (job_model : JobModel , stop_duration : Optional [int ]) -> bool :
645+ async def _get_run_termination_reason (
646+ session : AsyncSession , job_model : JobModel
647+ ) -> Optional [RunTerminationReason ]:
648+ res = await session .execute (
649+ select (RunModel .termination_reason ).where (RunModel .id == job_model .run_id )
650+ )
651+ return res .scalar_one_or_none ()
652+
653+
654+ def _should_force_detach_volume (
655+ job_model : JobModel ,
656+ run_termination_reason : Optional [RunTerminationReason ],
657+ stop_duration : Optional [int ],
658+ ) -> bool :
637659 return (
638660 job_model .volumes_detached_at is not None
639661 and common .get_current_datetime ()
640662 > job_model .volumes_detached_at + MIN_FORCE_DETACH_WAIT_PERIOD
641663 and (
642664 job_model .termination_reason == JobTerminationReason .ABORTED_BY_USER
665+ or run_termination_reason == RunTerminationReason .ABORTED_BY_USER
643666 or stop_duration is not None
644667 and common .get_current_datetime ()
645668 > job_model .volumes_detached_at + timedelta (seconds = stop_duration )
0 commit comments