Skip to content

Commit 9dd0b39

Browse files
authored
fix: optimize sandbox cleaning performance (#796)
* fix: optimize sandbox cleaning query performance Split the single OR-based deletion query into two targeted queries: 1. Assigned but orphaned sandboxes — fast with Location key + NOT EXISTS (~4.7s on prod vs 140s before) 2. Unassigned sandboxes older than 15 days — benefits from new idx_assigned_lastaccesstime composite index Additional changes: - Add composite index on (Assigned, LastAccessTime) for the unassigned query - Fetch distinct SEName values upfront to enable the Location unique key - Replace non-sargable days_since(LastAccessTime) with a pre-computed threshold (LastAccessTime < NOW() - 15 days) - Increase default batch_size from 500 to 5000 - Chunk S3 bulk deletes into groups of 1000 (S3 API limit) * fix: split workers across both deletion query types Half the workers prefer orphaned sandboxes first, half prefer unassigned first. This ensures both query types make progress concurrently instead of one starving the other. * fix: use cursor-based pagination for sandbox deletion queries Track the last SBId seen per worker and add SBId > cursor to each query. This avoids re-scanning millions of already-checked rows on every batch, turning O(total_rows) scans into O(batch_size) seeks. * fix: simplify to single worker with parallel S3 deletions Replace multi-worker approach with a single sequential query loop and larger batches (50k). S3 delete chunks are now sent concurrently via TaskGroup. This eliminates lock contention between workers while keeping S3 throughput high. * fix: separate row selection from locking to avoid mass locking Split the SELECT FOR UPDATE into two steps: 1. SELECT without locking to find candidate SBIds (scans many rows) 2. SELECT FOR UPDATE SKIP LOCKED WHERE SBId IN (...) to lock only the exact matching rows by primary key Previously FOR UPDATE locked every row examined during the scan (82M rows for a 50k result), causing 143MB of lock memory and 12+ minute DELETE times. Now only the returned rows are locked. * fix: remove row locking from sandbox cleanup to avoid lock memory bloat Split the single long transaction into three phases: SELECT candidates (short transaction, no locks), S3 delete (no transaction), and DB DELETE (short transaction). This eliminates the SELECT FOR UPDATE that locked every row examined during the primary key scan. * fix: chunk DB deletes into 1000-row batches with concurrent execution A single DELETE of 50k rows caused MySQL to lock 30M rows during the index scan. Split into 1000-row chunks running up to 10 concurrently via asyncio.Semaphore, keeping each transaction short. * refactor: add per-phase timing to sandbox cleanup logging Log duration of each phase (SELECT, S3 delete, DB delete) per batch so bottlenecks are immediately visible. Demote per-chunk DB delete log to DEBUG to reduce noise. * fix: increase S3 connection pool to 50 for parallel bulk deletes The default max_pool_connections=10 serialized 50 concurrent delete requests into 5 rounds, adding ~4s latency per round against Ceph. * fix: make S3 connection pool size configurable Add s3_max_pool_connections setting (default 50) to SandboxStoreSettings, configurable via DIRACX_SANDBOX_STORE_S3_MAX_POOL_CONNECTIONS. * fix: stop iterating when batch returns fewer than batch_size candidates Avoids redundant SELECT queries when the previous batch already exhausted all matching rows. * fix: handle S3 partial delete failures and pass se_name directly s3_bulk_delete_with_retry now returns failed keys instead of raising, and _s3_delete_chunk_with_retry retries only the failed keys. The caller skips DB deletion for sandboxes whose S3 objects were not removed, preventing dark data. Also removes the per-batch SELECT DISTINCT SEName query by passing settings.se_name directly, and updates the stale docstring. * fix: remove idx_assigned_lastaccesstime index from schema This index is not needed for the current cleanup queries. * fix: make sandbox cleaning parameters configurable via settings Move hardcoded batch_size, delete_chunk_size, and semaphore concurrency values into SandboxStoreSettings so they can be tuned per deployment. * fix: remove redundant default values from settings docstrings The default values were duplicated in the docstring prose when they are already visible from the field definitions and the generated docs.
1 parent c94dad5 commit 9dd0b39

6 files changed

Lines changed: 270 additions & 93 deletions

File tree

diracx-core/src/diracx/core/s3.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,60 @@ def b16_to_b64(hex_string: str) -> str:
8989

9090
async def s3_bulk_delete_with_retry(
9191
s3_client, bucket: str, objects: list[S3Object]
92-
) -> None:
92+
) -> set[str]:
93+
"""Delete objects from S3 in chunks of 1000, retrying failures.
94+
95+
Returns:
96+
Set of keys that failed to delete after all retries.
97+
98+
"""
99+
max_chunk_size = 1000
100+
chunks = [
101+
objects[i : i + max_chunk_size] for i in range(0, len(objects), max_chunk_size)
102+
]
103+
tasks = [_s3_delete_chunk_with_retry(s3_client, bucket, chunk) for chunk in chunks]
104+
results = await asyncio.gather(*tasks)
105+
failed_keys: set[str] = set()
106+
for result in results:
107+
failed_keys.update(result)
108+
return failed_keys
109+
110+
111+
async def _s3_delete_chunk_with_retry(
112+
s3_client, bucket: str, objects: list[S3Object]
113+
) -> set[str]:
114+
"""Try to delete a chunk of S3 objects, retrying partial failures.
115+
116+
Returns:
117+
Set of keys that failed to delete after all retries.
118+
119+
"""
93120
max_attempts = 5
94121
delay = 1.0
122+
remaining = objects
95123
for attempt in range(1, max_attempts + 1):
96124
try:
97125
response = await s3_client.delete_objects(
98126
Bucket=bucket,
99-
Delete={"Objects": objects, "Quiet": True},
127+
Delete={"Objects": remaining, "Quiet": True},
100128
)
101-
if "Errors" in response and response["Errors"]:
102-
raise RuntimeError(f"S3 deletion error: {response['Errors']}")
103-
return
104-
except (ClientError, RuntimeError) as e:
129+
except ClientError:
105130
if attempt == max_attempts:
106-
raise RuntimeError(f"Failed to delete objects in {bucket}") from e
131+
return {obj["Key"] for obj in remaining}
107132
await asyncio.sleep(delay)
108133
delay *= 2
134+
continue
135+
136+
errors = response.get("Errors", [])
137+
if not errors:
138+
return set()
139+
140+
# Retry only the keys that failed
141+
failed_keys = {e["Key"] for e in errors}
142+
if attempt == max_attempts:
143+
return failed_keys
144+
remaining = [obj for obj in remaining if obj["Key"] in failed_keys]
145+
await asyncio.sleep(delay)
146+
delay *= 2
147+
148+
return set()

diracx-core/src/diracx/core/settings.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,14 +269,43 @@ class SandboxStoreSettings(ServiceSettingsBase):
269269
This name is used within DIRAC to refer to this sandbox storage
270270
endpoint in job descriptions and file catalogs.
271271
"""
272+
273+
s3_max_pool_connections: int = 50
274+
"""Maximum number of connections in the S3 client connection pool.
275+
276+
Higher values allow more parallel S3 requests (e.g. during bulk sandbox
277+
deletion).
278+
"""
279+
280+
clean_batch_size: int = 50_000
281+
"""Number of sandbox candidates to select per batch during cleaning.
282+
283+
Each batch runs SELECT → S3 delete → DB delete sequentially.
284+
"""
285+
286+
clean_delete_chunk_size: int = 1000
287+
"""Number of sandbox DB rows to delete per chunk during cleaning.
288+
289+
Smaller chunks mean shorter transactions and less lock contention.
290+
"""
291+
292+
clean_max_concurrent_db_deletes: int = 10
293+
"""Maximum number of concurrent DB delete chunks during cleaning.
294+
295+
Controls parallelism of database DELETE operations.
296+
"""
297+
272298
_client: S3Client = PrivateAttr()
273299

274300
@contextlib.asynccontextmanager
275301
async def lifetime_function(self) -> AsyncIterator[None]:
276302
async with get_session().create_client(
277303
"s3",
278304
**self.s3_client_kwargs,
279-
config=Config(signature_version="v4"),
305+
config=Config(
306+
signature_version="v4",
307+
max_pool_connections=self.s3_max_pool_connections,
308+
),
280309
) as self._client: # type: ignore
281310
if not await s3_bucket_exists(self._client, self.bucket_name):
282311
if not self.auto_create_bucket:

diracx-db/src/diracx/db/sql/sandbox_metadata/db.py

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
exists,
1111
insert,
1212
literal,
13-
or_,
1413
select,
1514
update,
1615
)
@@ -24,7 +23,7 @@
2423
from diracx.core.models.auth import UserInfo
2524
from diracx.core.models.sandbox import SandboxInfo, SandboxType
2625
from diracx.db.sql.utils.base import BaseSQLDB
27-
from diracx.db.sql.utils.functions import days_since, utcnow
26+
from diracx.db.sql.utils.functions import substract_date, utcnow
2827

2928
from .schema import Base as SandboxMetadataDBBase
3029
from .schema import SandBoxes, SBEntityMapping, SBOwners
@@ -209,53 +208,75 @@ async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None:
209208
await self.conn.execute(unassign_stmt)
210209

211210
async def select_sandboxes_for_deletion(
212-
self, *, batch_size: int = 500
213-
) -> tuple[list[int], list[str]]:
214-
"""Select and lock a batch of sandboxes for deletion.
211+
self,
212+
*,
213+
se_name: str,
214+
batch_size: int = 500,
215+
cursor: int = 0,
216+
) -> tuple[list[int], list[str], int]:
217+
"""Select a batch of sandboxes eligible for deletion.
215218
216-
Uses FOR UPDATE SKIP LOCKED on MySQL to allow concurrent workers to
217-
process different sandboxes in parallel without conflicts.
219+
Uses cursor-based pagination (SBId > cursor) and runs two queries:
220+
- Orphaned: assigned but no entity mapping (NOT EXISTS)
221+
- Unassigned: not assigned and older than 15 days
222+
223+
No row locking is used — the caller is responsible for crash safety
224+
by deleting from S3 before removing DB rows.
218225
219226
Args:
227+
se_name: Storage element name to filter on.
220228
batch_size: Maximum number of sandboxes to select.
229+
cursor: Only consider sandboxes with SBId > cursor.
221230
222231
Returns:
223-
Tuple of (sb_ids, pfns) for the selected sandboxes.
224-
On MySQL, the rows remain locked until the transaction commits/rollbacks.
232+
Tuple of (sb_ids, pfns, new_cursor) for the selected sandboxes.
233+
new_cursor is the maximum SBId seen, to pass as cursor next time.
225234
226235
"""
236+
s3_condition = and_(
237+
SandBoxes.SEName == se_name,
238+
SandBoxes.SEPFN.like("/S3/%"),
239+
)
240+
227241
conditions = [
228-
# If it has assigned to a job but is no longer mapped it can be removed
242+
# Orphaned: assigned but no longer mapped to any entity
229243
and_(
244+
SandBoxes.SBId > cursor,
230245
SandBoxes.Assigned,
231246
~exists().where(SBEntityMapping.SBId == SandBoxes.SBId),
247+
s3_condition,
248+
),
249+
# Unassigned: never assigned and older than 15 days (sargable)
250+
and_(
251+
SandBoxes.SBId > cursor,
252+
~SandBoxes.Assigned,
253+
SandBoxes.LastAccessTime < substract_date(days=15),
254+
s3_condition,
232255
),
233-
# If the sandbox is still unassigned after 15 days, remove it
234-
and_(~SandBoxes.Assigned, days_since(SandBoxes.LastAccessTime) >= 15),
235256
]
236-
# Sandboxes which are not on S3 will be handled by legacy DIRAC
237-
condition = and_(SandBoxes.SEPFN.like("/S3/%"), or_(*conditions))
238-
239-
select_stmt = (
240-
select(SandBoxes.SBId, SandBoxes.SEPFN).where(condition).limit(batch_size)
241-
)
242257

243-
# FOR UPDATE SKIP LOCKED is only supported on MySQL
244-
# SQLite is used for testing and doesn't support row locking
245-
if self.conn.dialect.name == "mysql":
246-
select_stmt = select_stmt.with_for_update(skip_locked=True)
247-
elif self.conn.dialect.name != "sqlite":
248-
raise NotImplementedError(
249-
f"Unsupported database dialect: {self.conn.dialect.name}"
258+
sb_ids: list[int] = []
259+
pfns: list[str] = []
260+
for condition in conditions:
261+
remaining = batch_size - len(sb_ids)
262+
if remaining <= 0:
263+
break
264+
stmt = (
265+
select(SandBoxes.SBId, SandBoxes.SEPFN)
266+
.where(condition)
267+
.order_by(SandBoxes.SBId)
268+
.limit(remaining)
250269
)
270+
result = await self.conn.execute(stmt)
271+
for row in result:
272+
sb_ids.append(row.SBId)
273+
pfns.append(row.SEPFN)
251274

252-
result = await self.conn.execute(select_stmt)
253-
rows = result.all()
254-
255-
sb_ids = [row.SBId for row in rows]
256-
pfns = [row.SEPFN for row in rows]
275+
if not sb_ids:
276+
return [], [], cursor
257277

258-
return sb_ids, pfns
278+
new_cursor = max(sb_ids)
279+
return sb_ids, pfns, new_cursor
259280

260281
async def delete_sandboxes(self, sb_ids: list[int]) -> int:
261282
"""Delete sandboxes by their IDs.
@@ -272,5 +293,5 @@ async def delete_sandboxes(self, sb_ids: list[int]) -> int:
272293

273294
delete_stmt = delete(SandBoxes).where(SandBoxes.SBId.in_(sb_ids))
274295
result = await self.conn.execute(delete_stmt)
275-
logger.info("Deleted %d expired/unassigned sandboxes", result.rowcount)
296+
logger.debug("Deleted %d expired/unassigned sandboxes", result.rowcount)
276297
return result.rowcount

0 commit comments

Comments
 (0)