Skip to content

Commit 96b9286

Browse files
committed
Refactor process_idle_volumes
1 parent a546d97 commit 96b9286

3 files changed

Lines changed: 85 additions & 81 deletions

File tree

src/dstack/_internal/server/background/tasks/process_idle_volumes.py

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import datetime
22
from typing import List
33

4-
from sqlalchemy import select, update
4+
from sqlalchemy import select
55
from sqlalchemy.ext.asyncio import AsyncSession
66
from sqlalchemy.orm import joinedload
77

@@ -37,15 +37,14 @@ async def process_idle_volumes():
3737
)
3838
.order_by(VolumeModel.last_processed_at.asc())
3939
.limit(10)
40-
.with_for_update(skip_locked=True)
40+
.with_for_update(skip_locked=True, key_share=True)
4141
)
4242
volume_ids = list(res.scalars().all())
4343
if not volume_ids:
4444
return
4545
for volume_id in volume_ids:
4646
lockset.add(volume_id)
4747

48-
# Refetch volumes with proper relationship loading to avoid MissingGreenlet
4948
res = await session.execute(
5049
select(VolumeModel)
5150
.where(VolumeModel.id.in_(volume_ids))
@@ -54,89 +53,65 @@ async def process_idle_volumes():
5453
.options(joinedload(VolumeModel.attachments))
5554
.execution_options(populate_existing=True)
5655
)
57-
volumes = list(res.unique().scalars().all())
58-
56+
volume_models = list(res.unique().scalars().all())
5957
try:
60-
to_delete = []
61-
for volume in volumes:
62-
if _should_delete_volume(volume):
63-
to_delete.append(volume)
64-
65-
if to_delete:
66-
await _delete_idle_volumes(session, to_delete)
67-
58+
volumes_to_delete = [v for v in volume_models if _should_delete_volume(v)]
59+
if not volumes_to_delete:
60+
return
61+
await _delete_idle_volumes(session, volumes_to_delete)
6862
finally:
6963
lockset.difference_update(volume_ids)
7064

7165

7266
def _should_delete_volume(volume: VolumeModel) -> bool:
73-
config = get_volume_configuration(volume)
74-
75-
if not config.auto_cleanup_duration:
67+
if volume.attachments:
7668
return False
7769

78-
if isinstance(config.auto_cleanup_duration, int) and config.auto_cleanup_duration < 0:
70+
config = get_volume_configuration(volume)
71+
if not config.auto_cleanup_duration:
7972
return False
8073

8174
duration_seconds = parse_duration(config.auto_cleanup_duration)
8275
if not duration_seconds or duration_seconds <= 0:
8376
return False
8477

85-
if volume.attachments:
86-
return False
87-
8878
idle_time = _get_idle_time(volume)
8979
threshold = datetime.timedelta(seconds=duration_seconds)
90-
91-
if idle_time > threshold:
92-
logger.info(
93-
"Deleting idle volume %s (idle %.1fh)", volume.name, idle_time.total_seconds() / 3600
94-
)
95-
return True
96-
97-
return False
80+
return idle_time > threshold
9881

9982

10083
def _get_idle_time(volume: VolumeModel) -> datetime.timedelta:
10184
last_used = volume.last_job_processed_at or volume.created_at
10285
last_used_utc = last_used.replace(tzinfo=datetime.timezone.utc)
103-
now = get_current_datetime()
104-
105-
idle_time = now - last_used_utc
86+
idle_time = get_current_datetime() - last_used_utc
10687
return max(idle_time, datetime.timedelta(0))
10788

10889

10990
async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel]):
110-
"""Delete idle volumes from cloud providers and mark as deleted in database."""
91+
# Note: Multiple volumes are deleted in the same transaction,
92+
# so long deletion of one volume may block processing other volumes.
11193
for volume_model in volumes:
94+
logger.info("Deleting idle volume %s", volume_model.name)
11295
try:
113-
await _delete_volume_from_cloud(session, volume_model)
114-
except Exception:
115-
logger.exception("Error when deleting volume %s from cloud", volume_model.name)
116-
try:
117-
await session.execute(
118-
update(VolumeModel)
119-
.where(VolumeModel.id == volume_model.id)
120-
.values(
121-
deleted=True,
122-
deleted_at=get_current_datetime(),
123-
)
124-
)
125-
logger.info("Deleted idle volume %s", volume_model.name)
96+
await _delete_idle_volume(session, volume_model)
12697
except Exception:
127-
logger.exception("Failed to mark volume %s as deleted in database", volume_model.name)
98+
logger.exception("Error when deleting idle volume %s", volume_model.name)
99+
100+
volume_model.deleted = True
101+
volume_model.deleted_at = get_current_datetime()
102+
103+
logger.info("Deleted idle volume %s", volume_model.name)
128104

129105
await session.commit()
130106

131107

132-
async def _delete_volume_from_cloud(session: AsyncSession, volume_model: VolumeModel):
133-
"""Delete volume from cloud provider. Based on volumes.py:_delete_volume"""
108+
async def _delete_idle_volume(session: AsyncSession, volume_model: VolumeModel):
134109
volume = volume_model_to_volume(volume_model)
135110

136-
if volume.external:
137-
return
138-
139111
if volume.provisioning_data is None:
112+
logger.error(
113+
f"Failed to delete volume {volume_model.name}. volume.provisioning_data is None."
114+
)
140115
return
141116

142117
if volume.provisioning_data.backend is None:

src/dstack/_internal/server/testing/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,7 @@ async def create_volume(
742742
status: VolumeStatus = VolumeStatus.SUBMITTED,
743743
created_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc),
744744
last_processed_at: Optional[datetime] = None,
745+
last_job_processed_at: Optional[datetime] = None,
745746
configuration: Optional[VolumeConfiguration] = None,
746747
volume_provisioning_data: Optional[VolumeProvisioningData] = None,
747748
deleted_at: Optional[datetime] = None,
@@ -759,6 +760,7 @@ async def create_volume(
759760
status=status,
760761
created_at=created_at,
761762
last_processed_at=last_processed_at,
763+
last_job_processed_at=last_job_processed_at,
762764
configuration=configuration.json(),
763765
volume_provisioning_data=volume_provisioning_data.json()
764766
if volume_provisioning_data

src/tests/_internal/server/background/tasks/test_process_idle_volumes.py

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import datetime
2+
from unittest.mock import Mock, patch
23

34
import pytest
45
from sqlalchemy.ext.asyncio import AsyncSession
@@ -12,6 +13,7 @@
1213
)
1314
from dstack._internal.server.models import VolumeAttachmentModel
1415
from dstack._internal.server.testing.common import (
16+
ComputeMockSpec,
1517
create_instance,
1618
create_project,
1719
create_user,
@@ -24,6 +26,61 @@
2426
@pytest.mark.asyncio
2527
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
2628
class TestProcessIdleVolumes:
29+
async def test_deletes_idle_volumes(self, test_db, session: AsyncSession):
30+
project = await create_project(session=session)
31+
user = await create_user(session=session)
32+
33+
config1 = get_volume_configuration(
34+
name="test-volume",
35+
auto_cleanup_duration="1h",
36+
)
37+
config2 = get_volume_configuration(
38+
name="test-volume",
39+
auto_cleanup_duration="3h",
40+
)
41+
volume1 = await create_volume(
42+
session=session,
43+
project=project,
44+
user=user,
45+
status=VolumeStatus.ACTIVE,
46+
backend=BackendType.AWS,
47+
configuration=config1,
48+
volume_provisioning_data=get_volume_provisioning_data(),
49+
last_job_processed_at=datetime.datetime.now(datetime.timezone.utc)
50+
- datetime.timedelta(hours=2),
51+
)
52+
volume2 = await create_volume(
53+
session=session,
54+
project=project,
55+
user=user,
56+
status=VolumeStatus.ACTIVE,
57+
backend=BackendType.AWS,
58+
configuration=config2,
59+
volume_provisioning_data=get_volume_provisioning_data(),
60+
last_job_processed_at=datetime.datetime.now(datetime.timezone.utc)
61+
- datetime.timedelta(hours=2),
62+
)
63+
await session.commit()
64+
65+
with patch(
66+
"dstack._internal.server.services.backends.get_project_backend_by_type_or_error"
67+
) as m:
68+
aws_mock = Mock()
69+
m.return_value = aws_mock
70+
aws_mock.compute.return_value = Mock(spec=ComputeMockSpec)
71+
await process_idle_volumes()
72+
73+
await session.refresh(volume1)
74+
await session.refresh(volume2)
75+
assert volume1.deleted
76+
assert volume1.deleted_at is not None
77+
assert not volume2.deleted
78+
assert volume2.deleted_at is None
79+
80+
81+
@pytest.mark.asyncio
82+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
83+
class TestShouldDeleteVolume:
2784
async def test_no_idle_duration(self, test_db, session: AsyncSession):
2885
project = await create_project(session=session)
2986
user = await create_user(session=session)
@@ -131,33 +188,3 @@ async def test_never_used_volume(self, test_db, session: AsyncSession):
131188
volume.last_job_processed_at = None
132189
idle_time = _get_idle_time(volume)
133190
assert idle_time.total_seconds() >= 7000
134-
135-
async def test_integration(self, test_db, session: AsyncSession):
136-
project = await create_project(session=session)
137-
user = await create_user(session=session)
138-
139-
config = get_volume_configuration(name="test-volume")
140-
config.auto_cleanup_duration = "1h"
141-
142-
volume = await create_volume(
143-
session=session,
144-
project=project,
145-
user=user,
146-
status=VolumeStatus.ACTIVE,
147-
backend=BackendType.AWS,
148-
configuration=config,
149-
volume_provisioning_data=get_volume_provisioning_data(),
150-
)
151-
152-
volume.last_job_processed_at = (
153-
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2)
154-
).replace(tzinfo=None)
155-
await session.commit()
156-
157-
# Run the background task
158-
await process_idle_volumes()
159-
160-
# Refresh the volume to see if it was marked as deleted
161-
await session.refresh(volume)
162-
assert volume.deleted is True
163-
assert volume.deleted_at is not None

0 commit comments

Comments
 (0)