From 648203bcd430d278713affb75b1802de6d55f1c0 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Tue, 17 Feb 2026 05:53:26 +0100 Subject: [PATCH 01/15] fix: optimize sandbox cleaning query performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split the single OR-based deletion query into two targeted queries: 1. Assigned but orphaned sandboxes — fast with Location key + NOT EXISTS (~4.7s on prod vs 140s before) 2. Unassigned sandboxes older than 15 days — benefits from new idx_assigned_lastaccesstime composite index Additional changes: - Add composite index on (Assigned, LastAccessTime) for the unassigned query - Fetch distinct SEName values upfront to enable the Location unique key - Replace non-sargable days_since(LastAccessTime) with a pre-computed threshold (LastAccessTime < NOW() - 15 days) - Increase default batch_size from 500 to 5000 - Chunk S3 bulk deletes into groups of 1000 (S3 API limit) --- diracx-core/src/diracx/core/s3.py | 10 +++ .../src/diracx/db/sql/sandbox_metadata/db.py | 79 +++++++++++++------ .../diracx/db/sql/sandbox_metadata/schema.py | 1 + .../src/diracx/logic/jobs/sandboxes.py | 2 +- 4 files changed, 67 insertions(+), 25 deletions(-) diff --git a/diracx-core/src/diracx/core/s3.py b/diracx-core/src/diracx/core/s3.py index 5b3f88e99..150579798 100644 --- a/diracx-core/src/diracx/core/s3.py +++ b/diracx-core/src/diracx/core/s3.py @@ -89,6 +89,16 @@ def b16_to_b64(hex_string: str) -> str: async def s3_bulk_delete_with_retry( s3_client, bucket: str, objects: list[S3Object] +) -> None: + # S3 delete_objects supports at most 1000 keys per call + max_chunk_size = 1000 + for i in range(0, len(objects), max_chunk_size): + chunk = objects[i : i + max_chunk_size] + await _s3_delete_chunk_with_retry(s3_client, bucket, chunk) + + +async def _s3_delete_chunk_with_retry( + s3_client, bucket: str, objects: list[S3Object] ) -> None: max_attempts = 5 delay = 1.0 diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index dcbababc2..799b77074 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -10,7 +10,6 @@ exists, insert, literal, - or_, select, update, ) @@ -24,7 +23,7 @@ from diracx.core.models.auth import UserInfo from diracx.core.models.sandbox import SandboxInfo, SandboxType from diracx.db.sql.utils.base import BaseSQLDB -from diracx.db.sql.utils.functions import days_since, utcnow +from diracx.db.sql.utils.functions import substract_date, utcnow from .schema import Base as SandboxMetadataDBBase from .schema import SandBoxes, SBEntityMapping, SBOwners @@ -216,6 +215,10 @@ async def select_sandboxes_for_deletion( Uses FOR UPDATE SKIP LOCKED on MySQL to allow concurrent workers to process different sandboxes in parallel without conflicts. + Runs two separate queries so MySQL can use indexes effectively: + 1. Assigned but orphaned sandboxes (fast with Location key + NOT EXISTS) + 2. Unassigned sandboxes older than 15 days (uses idx_assigned_lastaccesstime) + Args: batch_size: Maximum number of sandboxes to select. @@ -224,33 +227,61 @@ async def select_sandboxes_for_deletion( On MySQL, the rows remain locked until the transaction commits/rollbacks. """ - conditions = [ - # If it has assigned to a job but is no longer mapped it can be removed - and_( - SandBoxes.Assigned, - ~exists().where(SBEntityMapping.SBId == SandBoxes.SBId), - ), - # If the sandbox is still unassigned after 15 days, remove it - and_(~SandBoxes.Assigned, days_since(SandBoxes.LastAccessTime) >= 15), - ] - # Sandboxes which are not on S3 will be handled by legacy DIRAC - condition = and_(SandBoxes.SEPFN.like("/S3/%"), or_(*conditions)) + is_mysql = self.conn.dialect.name == "mysql" + if not is_mysql and self.conn.dialect.name != "sqlite": + raise NotImplementedError( + f"Unsupported database dialect: {self.conn.dialect.name}" + ) + + # Fetch distinct SEName values so the queries can use the Location + # unique key (SEName, SEPFN) instead of doing a full table scan. + se_names_result = await self.conn.execute(select(SandBoxes.SEName).distinct()) + se_names = [row.SEName for row in se_names_result] + if not se_names: + return [], [] - select_stmt = ( - select(SandBoxes.SBId, SandBoxes.SEPFN).where(condition).limit(batch_size) + s3_condition = and_( + SandBoxes.SEName.in_(se_names), + SandBoxes.SEPFN.like("/S3/%"), ) - # FOR UPDATE SKIP LOCKED is only supported on MySQL - # SQLite is used for testing and doesn't support row locking - if self.conn.dialect.name == "mysql": - select_stmt = select_stmt.with_for_update(skip_locked=True) - elif self.conn.dialect.name != "sqlite": - raise NotImplementedError( - f"Unsupported database dialect: {self.conn.dialect.name}" + # Query 1: Assigned but orphaned sandboxes (fast with Location key) + orphaned_condition = and_( + SandBoxes.Assigned, + ~exists().where(SBEntityMapping.SBId == SandBoxes.SBId), + s3_condition, + ) + stmt1 = ( + select(SandBoxes.SBId, SandBoxes.SEPFN) + .where(orphaned_condition) + .limit(batch_size) + ) + if is_mysql: + stmt1 = stmt1.with_for_update(skip_locked=True) + + result = await self.conn.execute(stmt1) + rows = list(result.all()) + + # Query 2: Unassigned sandboxes older than 15 days (sargable, + # benefits from idx_assigned_lastaccesstime index) + remaining = batch_size - len(rows) + if remaining > 0: + threshold = substract_date(days=15) + unassigned_condition = and_( + ~SandBoxes.Assigned, + SandBoxes.LastAccessTime < threshold, + s3_condition, + ) + stmt2 = ( + select(SandBoxes.SBId, SandBoxes.SEPFN) + .where(unassigned_condition) + .limit(remaining) ) + if is_mysql: + stmt2 = stmt2.with_for_update(skip_locked=True) - result = await self.conn.execute(select_stmt) - rows = result.all() + result = await self.conn.execute(stmt2) + rows.extend(result.all()) sb_ids = [row.SBId for row in rows] pfns = [row.SEPFN for row in rows] diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/schema.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/schema.py index 473966ed6..77f51765f 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/schema.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/schema.py @@ -46,6 +46,7 @@ class SandBoxes(Base): __table_args__ = ( PrimaryKeyConstraint("SBId"), Index("OwnerId", "OwnerId"), + Index("idx_assigned_lastaccesstime", "Assigned", "LastAccessTime"), UniqueConstraint("SEName", "SEPFN", name="Location"), ) diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index f18be50ae..2fe406bad 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -201,7 +201,7 @@ async def clean_sandboxes( sandbox_metadata_db: SandboxMetadataDB, settings: SandboxStoreSettings, *, - batch_size: int = 500, + batch_size: int = 5000, max_workers: int = 10, ) -> int: """Delete sandboxes that are not assigned to any job. From 18beaf566f3df265978bd0e1a568dc4846f1168d Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Wed, 18 Feb 2026 05:49:09 +0100 Subject: [PATCH 02/15] fix: split workers across both deletion query types Half the workers prefer orphaned sandboxes first, half prefer unassigned first. This ensures both query types make progress concurrently instead of one starving the other. --- .../src/diracx/db/sql/sandbox_metadata/db.py | 58 +++++++++---------- .../src/diracx/logic/jobs/sandboxes.py | 11 +++- 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index 799b77074..5f842d16b 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -208,7 +208,7 @@ async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None: await self.conn.execute(unassign_stmt) async def select_sandboxes_for_deletion( - self, *, batch_size: int = 500 + self, *, batch_size: int = 500, prefer_unassigned: bool = False ) -> tuple[list[int], list[str]]: """Select and lock a batch of sandboxes for deletion. @@ -216,11 +216,15 @@ async def select_sandboxes_for_deletion( process different sandboxes in parallel without conflicts. Runs two separate queries so MySQL can use indexes effectively: - 1. Assigned but orphaned sandboxes (fast with Location key + NOT EXISTS) - 2. Unassigned sandboxes older than 15 days (uses idx_assigned_lastaccesstime) + - Assigned but orphaned sandboxes (fast with Location key + NOT EXISTS) + - Unassigned sandboxes older than 15 days (uses idx_assigned_lastaccesstime) + + The ``prefer_unassigned`` flag controls which query runs first, allowing + callers to spread workers across both query types. Args: batch_size: Maximum number of sandboxes to select. + prefer_unassigned: If True, run the unassigned query first. Returns: Tuple of (sb_ids, pfns) for the selected sandboxes. @@ -245,42 +249,38 @@ async def select_sandboxes_for_deletion( SandBoxes.SEPFN.like("/S3/%"), ) - # Query 1: Assigned but orphaned sandboxes (fast with Location key) + # Orphaned: assigned but no longer mapped to any entity orphaned_condition = and_( SandBoxes.Assigned, ~exists().where(SBEntityMapping.SBId == SandBoxes.SBId), s3_condition, ) - stmt1 = ( - select(SandBoxes.SBId, SandBoxes.SEPFN) - .where(orphaned_condition) - .limit(batch_size) + # Unassigned: never assigned and older than 15 days (sargable) + threshold = substract_date(days=15) + unassigned_condition = and_( + ~SandBoxes.Assigned, + SandBoxes.LastAccessTime < threshold, + s3_condition, ) - if is_mysql: - stmt1 = stmt1.with_for_update(skip_locked=True) - - result = await self.conn.execute(stmt1) - rows = list(result.all()) - - # Query 2: Unassigned sandboxes older than 15 days (sargable, - # benefits from idx_assigned_lastaccesstime index) - remaining = batch_size - len(rows) - if remaining > 0: - threshold = substract_date(days=15) - unassigned_condition = and_( - ~SandBoxes.Assigned, - SandBoxes.LastAccessTime < threshold, - s3_condition, - ) - stmt2 = ( + + if prefer_unassigned: + conditions = [unassigned_condition, orphaned_condition] + else: + conditions = [orphaned_condition, unassigned_condition] + + rows: list = [] + for condition in conditions: + remaining = batch_size - len(rows) + if remaining <= 0: + break + stmt = ( select(SandBoxes.SBId, SandBoxes.SEPFN) - .where(unassigned_condition) + .where(condition) .limit(remaining) ) if is_mysql: - stmt2 = stmt2.with_for_update(skip_locked=True) - - result = await self.conn.execute(stmt2) + stmt = stmt.with_for_update(skip_locked=True) + result = await self.conn.execute(stmt) rows.extend(result.all()) sb_ids = [row.SBId for row in rows] diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index 2fe406bad..ea8a16f58 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -230,14 +230,15 @@ async def clean_sandboxes( "SQLite does not support parallel workers (no SKIP LOCKED support)" ) - async def worker() -> int: + async def worker(prefer_unassigned: bool) -> int: """Process batches until no more work is available.""" worker_deleted = 0 while True: async with sandbox_metadata_db: # Select and lock a batch of sandboxes sb_ids, pfns = await sandbox_metadata_db.select_sandboxes_for_deletion( - batch_size=batch_size + batch_size=batch_size, + prefer_unassigned=prefer_unassigned, ) if not pfns: @@ -263,6 +264,10 @@ async def worker() -> int: return worker_deleted async with asyncio.TaskGroup() as tg: - tasks = [tg.create_task(worker()) for _ in range(max_workers)] + # Split workers: half prefer orphaned sandboxes, half prefer unassigned + tasks = [ + tg.create_task(worker(prefer_unassigned=i % 2 == 1)) + for i in range(max_workers) + ] return sum(task.result() for task in tasks) From 00880c13a71c193274d08c5f05081b1d817d01a8 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Wed, 18 Feb 2026 06:02:53 +0100 Subject: [PATCH 03/15] fix: use cursor-based pagination for sandbox deletion queries Track the last SBId seen per worker and add SBId > cursor to each query. This avoids re-scanning millions of already-checked rows on every batch, turning O(total_rows) scans into O(batch_size) seeks. --- .../src/diracx/db/sql/sandbox_metadata/db.py | 23 +++++++++++++++---- .../src/diracx/logic/jobs/sandboxes.py | 8 ++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index 5f842d16b..27301ea8c 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -208,13 +208,20 @@ async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None: await self.conn.execute(unassign_stmt) async def select_sandboxes_for_deletion( - self, *, batch_size: int = 500, prefer_unassigned: bool = False - ) -> tuple[list[int], list[str]]: + self, + *, + batch_size: int = 500, + prefer_unassigned: bool = False, + cursor: int = 0, + ) -> tuple[list[int], list[str], int]: """Select and lock a batch of sandboxes for deletion. Uses FOR UPDATE SKIP LOCKED on MySQL to allow concurrent workers to process different sandboxes in parallel without conflicts. + Uses cursor-based pagination (SBId > cursor) to avoid re-scanning + rows that were already checked in previous batches. + Runs two separate queries so MySQL can use indexes effectively: - Assigned but orphaned sandboxes (fast with Location key + NOT EXISTS) - Unassigned sandboxes older than 15 days (uses idx_assigned_lastaccesstime) @@ -225,9 +232,11 @@ async def select_sandboxes_for_deletion( Args: batch_size: Maximum number of sandboxes to select. prefer_unassigned: If True, run the unassigned query first. + cursor: Only consider sandboxes with SBId > cursor. Returns: - Tuple of (sb_ids, pfns) for the selected sandboxes. + Tuple of (sb_ids, pfns, new_cursor) for the selected sandboxes. + new_cursor is the maximum SBId seen, to pass as cursor next time. On MySQL, the rows remain locked until the transaction commits/rollbacks. """ @@ -242,7 +251,7 @@ async def select_sandboxes_for_deletion( se_names_result = await self.conn.execute(select(SandBoxes.SEName).distinct()) se_names = [row.SEName for row in se_names_result] if not se_names: - return [], [] + return [], [], cursor s3_condition = and_( SandBoxes.SEName.in_(se_names), @@ -251,6 +260,7 @@ async def select_sandboxes_for_deletion( # Orphaned: assigned but no longer mapped to any entity orphaned_condition = and_( + SandBoxes.SBId > cursor, SandBoxes.Assigned, ~exists().where(SBEntityMapping.SBId == SandBoxes.SBId), s3_condition, @@ -258,6 +268,7 @@ async def select_sandboxes_for_deletion( # Unassigned: never assigned and older than 15 days (sargable) threshold = substract_date(days=15) unassigned_condition = and_( + SandBoxes.SBId > cursor, ~SandBoxes.Assigned, SandBoxes.LastAccessTime < threshold, s3_condition, @@ -276,6 +287,7 @@ async def select_sandboxes_for_deletion( stmt = ( select(SandBoxes.SBId, SandBoxes.SEPFN) .where(condition) + .order_by(SandBoxes.SBId) .limit(remaining) ) if is_mysql: @@ -285,8 +297,9 @@ async def select_sandboxes_for_deletion( sb_ids = [row.SBId for row in rows] pfns = [row.SEPFN for row in rows] + new_cursor = max(sb_ids) if sb_ids else cursor - return sb_ids, pfns + return sb_ids, pfns, new_cursor async def delete_sandboxes(self, sb_ids: list[int]) -> int: """Delete sandboxes by their IDs. diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index ea8a16f58..cf1738459 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -233,12 +233,18 @@ async def clean_sandboxes( async def worker(prefer_unassigned: bool) -> int: """Process batches until no more work is available.""" worker_deleted = 0 + cursor = 0 while True: async with sandbox_metadata_db: # Select and lock a batch of sandboxes - sb_ids, pfns = await sandbox_metadata_db.select_sandboxes_for_deletion( + ( + sb_ids, + pfns, + cursor, + ) = await sandbox_metadata_db.select_sandboxes_for_deletion( batch_size=batch_size, prefer_unassigned=prefer_unassigned, + cursor=cursor, ) if not pfns: From 30050c41bfa4cf570bbec62b13ab884ab81e9468 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Thu, 19 Feb 2026 23:26:44 +0100 Subject: [PATCH 04/15] fix: simplify to single worker with parallel S3 deletions Replace multi-worker approach with a single sequential query loop and larger batches (50k). S3 delete chunks are now sent concurrently via TaskGroup. This eliminates lock contention between workers while keeping S3 throughput high. --- diracx-core/src/diracx/core/s3.py | 12 ++- .../src/diracx/db/sql/sandbox_metadata/db.py | 57 ++++------- .../src/diracx/logic/jobs/sandboxes.py | 99 +++++++------------ diracx-logic/tests/jobs/test_sandboxes.py | 4 +- 4 files changed, 67 insertions(+), 105 deletions(-) diff --git a/diracx-core/src/diracx/core/s3.py b/diracx-core/src/diracx/core/s3.py index 150579798..7e5abd5fc 100644 --- a/diracx-core/src/diracx/core/s3.py +++ b/diracx-core/src/diracx/core/s3.py @@ -90,11 +90,15 @@ def b16_to_b64(hex_string: str) -> str: async def s3_bulk_delete_with_retry( s3_client, bucket: str, objects: list[S3Object] ) -> None: - # S3 delete_objects supports at most 1000 keys per call + # S3 delete_objects supports at most 1000 keys per call. + # Chunks are sent concurrently for throughput. max_chunk_size = 1000 - for i in range(0, len(objects), max_chunk_size): - chunk = objects[i : i + max_chunk_size] - await _s3_delete_chunk_with_retry(s3_client, bucket, chunk) + chunks = [ + objects[i : i + max_chunk_size] for i in range(0, len(objects), max_chunk_size) + ] + async with asyncio.TaskGroup() as tg: + for chunk in chunks: + tg.create_task(_s3_delete_chunk_with_retry(s3_client, bucket, chunk)) async def _s3_delete_chunk_with_retry( diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index 27301ea8c..2ca001685 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -211,41 +211,26 @@ async def select_sandboxes_for_deletion( self, *, batch_size: int = 500, - prefer_unassigned: bool = False, cursor: int = 0, ) -> tuple[list[int], list[str], int]: - """Select and lock a batch of sandboxes for deletion. - - Uses FOR UPDATE SKIP LOCKED on MySQL to allow concurrent workers to - process different sandboxes in parallel without conflicts. + """Select a batch of sandboxes for deletion. Uses cursor-based pagination (SBId > cursor) to avoid re-scanning rows that were already checked in previous batches. Runs two separate queries so MySQL can use indexes effectively: - - Assigned but orphaned sandboxes (fast with Location key + NOT EXISTS) - - Unassigned sandboxes older than 15 days (uses idx_assigned_lastaccesstime) - - The ``prefer_unassigned`` flag controls which query runs first, allowing - callers to spread workers across both query types. + - Orphaned assigned sandboxes (NOT EXISTS on entity mapping) + - Unassigned sandboxes older than 15 days (sargable comparison) Args: batch_size: Maximum number of sandboxes to select. - prefer_unassigned: If True, run the unassigned query first. cursor: Only consider sandboxes with SBId > cursor. Returns: Tuple of (sb_ids, pfns, new_cursor) for the selected sandboxes. new_cursor is the maximum SBId seen, to pass as cursor next time. - On MySQL, the rows remain locked until the transaction commits/rollbacks. """ - is_mysql = self.conn.dialect.name == "mysql" - if not is_mysql and self.conn.dialect.name != "sqlite": - raise NotImplementedError( - f"Unsupported database dialect: {self.conn.dialect.name}" - ) - # Fetch distinct SEName values so the queries can use the Location # unique key (SEName, SEPFN) instead of doing a full table scan. se_names_result = await self.conn.execute(select(SandBoxes.SEName).distinct()) @@ -258,26 +243,24 @@ async def select_sandboxes_for_deletion( SandBoxes.SEPFN.like("/S3/%"), ) - # Orphaned: assigned but no longer mapped to any entity - orphaned_condition = and_( - SandBoxes.SBId > cursor, - SandBoxes.Assigned, - ~exists().where(SBEntityMapping.SBId == SandBoxes.SBId), - s3_condition, - ) - # Unassigned: never assigned and older than 15 days (sargable) - threshold = substract_date(days=15) - unassigned_condition = and_( - SandBoxes.SBId > cursor, - ~SandBoxes.Assigned, - SandBoxes.LastAccessTime < threshold, - s3_condition, - ) + conditions = [ + # Orphaned: assigned but no longer mapped to any entity + and_( + SandBoxes.SBId > cursor, + SandBoxes.Assigned, + ~exists().where(SBEntityMapping.SBId == SandBoxes.SBId), + s3_condition, + ), + # Unassigned: never assigned and older than 15 days (sargable) + and_( + SandBoxes.SBId > cursor, + ~SandBoxes.Assigned, + SandBoxes.LastAccessTime < substract_date(days=15), + s3_condition, + ), + ] - if prefer_unassigned: - conditions = [unassigned_condition, orphaned_condition] - else: - conditions = [orphaned_condition, unassigned_condition] + is_mysql = self.conn.dialect.name == "mysql" rows: list = [] for condition in conditions: diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index cf1738459..2b8957f1c 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio import logging from typing import TYPE_CHECKING, Any, Literal @@ -201,79 +200,55 @@ async def clean_sandboxes( sandbox_metadata_db: SandboxMetadataDB, settings: SandboxStoreSettings, *, - batch_size: int = 5000, - max_workers: int = 10, + batch_size: int = 50000, ) -> int: """Delete sandboxes that are not assigned to any job. - Uses SELECT FOR UPDATE SKIP LOCKED to allow multiple workers to run - in parallel without conflicts. Each batch: - 1. Selects and locks rows - 2. Deletes from S3 + Each batch: + 1. Selects rows using cursor-based pagination + 2. Deletes from S3 (chunks sent concurrently) 3. Deletes from DB Args: sandbox_metadata_db: Database connection (not yet entered). settings: Sandbox store settings with S3 client. batch_size: Number of sandboxes to process per batch. - max_workers: Maximum number of concurrent workers processing batches. Returns: Total number of sandboxes deleted. """ - # Check if parallel workers are supported - async with sandbox_metadata_db: - dialect = sandbox_metadata_db.conn.dialect.name - if max_workers > 1 and dialect == "sqlite": - raise NotImplementedError( - "SQLite does not support parallel workers (no SKIP LOCKED support)" - ) + total_deleted = 0 + cursor = 0 + while True: + async with sandbox_metadata_db: + ( + sb_ids, + pfns, + cursor, + ) = await sandbox_metadata_db.select_sandboxes_for_deletion( + batch_size=batch_size, + cursor=cursor, + ) + + if not pfns: + break + + # Delete from S3 first (chunks sent concurrently) + objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns] + if logger.isEnabledFor(logging.DEBUG): + for pfn in pfns: + logger.debug("Deleting sandbox %s from S3", pfn) + + await s3_bulk_delete_with_retry( + settings.s3_client, settings.bucket_name, objects + ) + logger.info( + "Deleted %d sandboxes from %s", len(objects), settings.bucket_name + ) + + # Then delete from DB + await sandbox_metadata_db.delete_sandboxes(sb_ids) + total_deleted += len(sb_ids) - async def worker(prefer_unassigned: bool) -> int: - """Process batches until no more work is available.""" - worker_deleted = 0 - cursor = 0 - while True: - async with sandbox_metadata_db: - # Select and lock a batch of sandboxes - ( - sb_ids, - pfns, - cursor, - ) = await sandbox_metadata_db.select_sandboxes_for_deletion( - batch_size=batch_size, - prefer_unassigned=prefer_unassigned, - cursor=cursor, - ) - - if not pfns: - break - - # Delete from S3 first (while rows are locked) - objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns] - if logger.isEnabledFor(logging.DEBUG): - for pfn in pfns: - logger.debug("Deleting sandbox %s from S3", pfn) - - await s3_bulk_delete_with_retry( - settings.s3_client, settings.bucket_name, objects - ) - logger.info( - "Deleted %d sandboxes from %s", len(objects), settings.bucket_name - ) - - # Then delete from DB - await sandbox_metadata_db.delete_sandboxes(sb_ids) - worker_deleted += len(sb_ids) - - return worker_deleted - - async with asyncio.TaskGroup() as tg: - # Split workers: half prefer orphaned sandboxes, half prefer unassigned - tasks = [ - tg.create_task(worker(prefer_unassigned=i % 2 == 1)) - for i in range(max_workers) - ] - - return sum(task.result() for task in tasks) + return total_deleted diff --git a/diracx-logic/tests/jobs/test_sandboxes.py b/diracx-logic/tests/jobs/test_sandboxes.py index f0c2231ee..6dc59197d 100644 --- a/diracx-logic/tests/jobs/test_sandboxes.py +++ b/diracx-logic/tests/jobs/test_sandboxes.py @@ -118,7 +118,7 @@ async def test_upload_and_clean( assert response.content == data # There should be no sandboxes to remove - await clean_sandboxes(sandbox_metadata_db, sandbox_settings, max_workers=1) + await clean_sandboxes(sandbox_metadata_db, sandbox_settings) # Try to download the sandbox async with sandbox_metadata_db: @@ -139,7 +139,7 @@ async def test_upload_and_clean( ) # Now the sandbox should be removed - await clean_sandboxes(sandbox_metadata_db, sandbox_settings, max_workers=1) + await clean_sandboxes(sandbox_metadata_db, sandbox_settings) # Check that the sandbox was actually removed from the bucket with pytest.raises(botocore.exceptions.ClientError, match="Not Found"): From 426c9415dd63fb8183971ecd054b7719b9fe59e9 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 20 Feb 2026 06:27:12 +0100 Subject: [PATCH 05/15] fix: separate row selection from locking to avoid mass locking Split the SELECT FOR UPDATE into two steps: 1. SELECT without locking to find candidate SBIds (scans many rows) 2. SELECT FOR UPDATE SKIP LOCKED WHERE SBId IN (...) to lock only the exact matching rows by primary key Previously FOR UPDATE locked every row examined during the scan (82M rows for a 50k result), causing 143MB of lock memory and 12+ minute DELETE times. Now only the returned rows are locked. --- .../src/diracx/db/sql/sandbox_metadata/db.py | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index 2ca001685..e0397de7d 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -260,27 +260,38 @@ async def select_sandboxes_for_deletion( ), ] - is_mysql = self.conn.dialect.name == "mysql" - - rows: list = [] + # Step 1: Find candidate SBIds without locking (avoids locking + # every row examined during the scan). + candidate_ids: list[int] = [] for condition in conditions: - remaining = batch_size - len(rows) + remaining = batch_size - len(candidate_ids) if remaining <= 0: break stmt = ( - select(SandBoxes.SBId, SandBoxes.SEPFN) + select(SandBoxes.SBId) .where(condition) .order_by(SandBoxes.SBId) .limit(remaining) ) - if is_mysql: - stmt = stmt.with_for_update(skip_locked=True) result = await self.conn.execute(stmt) - rows.extend(result.all()) + candidate_ids.extend(row.SBId for row in result) + + if not candidate_ids: + return [], [], cursor + + # Step 2: Lock only the exact rows by primary key. SKIP LOCKED + # ensures concurrent runs don't process the same rows. + lock_stmt = select(SandBoxes.SBId, SandBoxes.SEPFN).where( + SandBoxes.SBId.in_(candidate_ids) + ) + if self.conn.dialect.name == "mysql": + lock_stmt = lock_stmt.with_for_update(skip_locked=True) + result = await self.conn.execute(lock_stmt) + rows = result.all() sb_ids = [row.SBId for row in rows] pfns = [row.SEPFN for row in rows] - new_cursor = max(sb_ids) if sb_ids else cursor + new_cursor = max(candidate_ids) return sb_ids, pfns, new_cursor From 46ca24afdf6896e8756edd7658871d95fece53df Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 20 Feb 2026 10:06:46 +0100 Subject: [PATCH 06/15] fix: remove row locking from sandbox cleanup to avoid lock memory bloat Split the single long transaction into three phases: SELECT candidates (short transaction, no locks), S3 delete (no transaction), and DB DELETE (short transaction). This eliminates the SELECT FOR UPDATE that locked every row examined during the primary key scan. --- .../src/diracx/db/sql/sandbox_metadata/db.py | 30 ++++++------------ .../src/diracx/logic/jobs/sandboxes.py | 31 ++++++++++--------- 2 files changed, 25 insertions(+), 36 deletions(-) diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index e0397de7d..621543c3f 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -260,39 +260,27 @@ async def select_sandboxes_for_deletion( ), ] - # Step 1: Find candidate SBIds without locking (avoids locking - # every row examined during the scan). - candidate_ids: list[int] = [] + sb_ids: list[int] = [] + pfns: list[str] = [] for condition in conditions: - remaining = batch_size - len(candidate_ids) + remaining = batch_size - len(sb_ids) if remaining <= 0: break stmt = ( - select(SandBoxes.SBId) + select(SandBoxes.SBId, SandBoxes.SEPFN) .where(condition) .order_by(SandBoxes.SBId) .limit(remaining) ) result = await self.conn.execute(stmt) - candidate_ids.extend(row.SBId for row in result) + for row in result: + sb_ids.append(row.SBId) + pfns.append(row.SEPFN) - if not candidate_ids: + if not sb_ids: return [], [], cursor - # Step 2: Lock only the exact rows by primary key. SKIP LOCKED - # ensures concurrent runs don't process the same rows. - lock_stmt = select(SandBoxes.SBId, SandBoxes.SEPFN).where( - SandBoxes.SBId.in_(candidate_ids) - ) - if self.conn.dialect.name == "mysql": - lock_stmt = lock_stmt.with_for_update(skip_locked=True) - result = await self.conn.execute(lock_stmt) - rows = result.all() - - sb_ids = [row.SBId for row in rows] - pfns = [row.SEPFN for row in rows] - new_cursor = max(candidate_ids) - + new_cursor = max(sb_ids) return sb_ids, pfns, new_cursor async def delete_sandboxes(self, sb_ids: list[int]) -> int: diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index 2b8957f1c..1a624cdb6 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -221,6 +221,7 @@ async def clean_sandboxes( total_deleted = 0 cursor = 0 while True: + # Phase 1: SELECT candidates (short transaction, no locks) async with sandbox_metadata_db: ( sb_ids, @@ -231,24 +232,24 @@ async def clean_sandboxes( cursor=cursor, ) - if not pfns: - break + if not pfns: + break - # Delete from S3 first (chunks sent concurrently) - objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns] - if logger.isEnabledFor(logging.DEBUG): - for pfn in pfns: - logger.debug("Deleting sandbox %s from S3", pfn) + # Phase 2: Delete from S3 (no transaction — prevents dark data + # since S3 is cleaned first) + objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns] + if logger.isEnabledFor(logging.DEBUG): + for pfn in pfns: + logger.debug("Deleting sandbox %s from S3", pfn) - await s3_bulk_delete_with_retry( - settings.s3_client, settings.bucket_name, objects - ) - logger.info( - "Deleted %d sandboxes from %s", len(objects), settings.bucket_name - ) + await s3_bulk_delete_with_retry( + settings.s3_client, settings.bucket_name, objects + ) + logger.info("Deleted %d sandboxes from %s", len(objects), settings.bucket_name) - # Then delete from DB + # Phase 3: Delete from DB (short transaction) + async with sandbox_metadata_db: await sandbox_metadata_db.delete_sandboxes(sb_ids) - total_deleted += len(sb_ids) + total_deleted += len(sb_ids) return total_deleted From cee338601dd89a4e35e85d3adaf27715c5cba7c3 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 20 Feb 2026 10:18:06 +0100 Subject: [PATCH 07/15] fix: chunk DB deletes into 1000-row batches with concurrent execution A single DELETE of 50k rows caused MySQL to lock 30M rows during the index scan. Split into 1000-row chunks running up to 10 concurrently via asyncio.Semaphore, keeping each transaction short. --- .../src/diracx/logic/jobs/sandboxes.py | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index 1a624cdb6..d33a76270 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import logging from typing import TYPE_CHECKING, Any, Literal @@ -247,9 +248,21 @@ async def clean_sandboxes( ) logger.info("Deleted %d sandboxes from %s", len(objects), settings.bucket_name) - # Phase 3: Delete from DB (short transaction) - async with sandbox_metadata_db: - await sandbox_metadata_db.delete_sandboxes(sb_ids) - total_deleted += len(sb_ids) + # Phase 3: Delete from DB in small chunks (each chunk is a short + # transaction to avoid locking millions of rows in a single DELETE). + # Up to 10 chunks run concurrently via a semaphore. + delete_chunk_size = 1000 + sem = asyncio.Semaphore(10) + + async def _delete_chunk(chunk: list[int], _sem: asyncio.Semaphore = sem) -> int: + async with _sem, sandbox_metadata_db: + return await sandbox_metadata_db.delete_sandboxes(chunk) + + tasks = [ + _delete_chunk(sb_ids[i : i + delete_chunk_size]) + for i in range(0, len(sb_ids), delete_chunk_size) + ] + results = await asyncio.gather(*tasks) + total_deleted += sum(results) return total_deleted From e6015194ac3c8739357646419a2b05f2796fb021 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 20 Feb 2026 10:33:18 +0100 Subject: [PATCH 08/15] refactor: add per-phase timing to sandbox cleanup logging Log duration of each phase (SELECT, S3 delete, DB delete) per batch so bottlenecks are immediately visible. Demote per-chunk DB delete log to DEBUG to reduce noise. --- .../src/diracx/db/sql/sandbox_metadata/db.py | 2 +- .../src/diracx/logic/jobs/sandboxes.py | 38 ++++++++++++++++--- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index 621543c3f..3945ca257 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -298,5 +298,5 @@ async def delete_sandboxes(self, sb_ids: list[int]) -> int: delete_stmt = delete(SandBoxes).where(SandBoxes.SBId.in_(sb_ids)) result = await self.conn.execute(delete_stmt) - logger.info("Deleted %d expired/unassigned sandboxes", result.rowcount) + logger.debug("Deleted %d expired/unassigned sandboxes", result.rowcount) return result.rowcount diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index d33a76270..dc66ad086 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -2,6 +2,7 @@ import asyncio import logging +import time from typing import TYPE_CHECKING, Any, Literal from diracx.core.exceptions import SandboxAlreadyInsertedError, SandboxNotFoundError @@ -221,8 +222,12 @@ async def clean_sandboxes( """ total_deleted = 0 cursor = 0 + batch_num = 0 while True: + batch_num += 1 + # Phase 1: SELECT candidates (short transaction, no locks) + t0 = time.monotonic() async with sandbox_metadata_db: ( sb_ids, @@ -232,25 +237,37 @@ async def clean_sandboxes( batch_size=batch_size, cursor=cursor, ) + select_duration = time.monotonic() - t0 if not pfns: + logger.info( + "Batch %d: no candidates found (%.1fs)", batch_num, select_duration + ) break + logger.info( + "Batch %d: selected %d candidates (%.1fs)", + batch_num, + len(pfns), + select_duration, + ) + # Phase 2: Delete from S3 (no transaction — prevents dark data # since S3 is cleaned first) + t0 = time.monotonic() objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns] - if logger.isEnabledFor(logging.DEBUG): - for pfn in pfns: - logger.debug("Deleting sandbox %s from S3", pfn) - await s3_bulk_delete_with_retry( settings.s3_client, settings.bucket_name, objects ) - logger.info("Deleted %d sandboxes from %s", len(objects), settings.bucket_name) + s3_duration = time.monotonic() - t0 + logger.info( + "Batch %d: deleted %d from S3 (%.1fs)", batch_num, len(objects), s3_duration + ) # Phase 3: Delete from DB in small chunks (each chunk is a short # transaction to avoid locking millions of rows in a single DELETE). # Up to 10 chunks run concurrently via a semaphore. + t0 = time.monotonic() delete_chunk_size = 1000 sem = asyncio.Semaphore(10) @@ -263,6 +280,15 @@ async def _delete_chunk(chunk: list[int], _sem: asyncio.Semaphore = sem) -> int: for i in range(0, len(sb_ids), delete_chunk_size) ] results = await asyncio.gather(*tasks) - total_deleted += sum(results) + deleted = sum(results) + db_duration = time.monotonic() - t0 + total_deleted += deleted + logger.info( + "Batch %d: deleted %d from DB (%.1fs, total so far: %d)", + batch_num, + deleted, + db_duration, + total_deleted, + ) return total_deleted From fa203bcdf25ea179dae333b95fa38656b80ba05f Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 20 Feb 2026 11:05:17 +0100 Subject: [PATCH 09/15] fix: increase S3 connection pool to 50 for parallel bulk deletes The default max_pool_connections=10 serialized 50 concurrent delete requests into 5 rounds, adding ~4s latency per round against Ceph. --- diracx-core/src/diracx/core/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diracx-core/src/diracx/core/settings.py b/diracx-core/src/diracx/core/settings.py index dd0681fbc..2de069341 100644 --- a/diracx-core/src/diracx/core/settings.py +++ b/diracx-core/src/diracx/core/settings.py @@ -276,7 +276,7 @@ async def lifetime_function(self) -> AsyncIterator[None]: async with get_session().create_client( "s3", **self.s3_client_kwargs, - config=Config(signature_version="v4"), + config=Config(signature_version="v4", max_pool_connections=50), ) as self._client: # type: ignore if not await s3_bucket_exists(self._client, self.bucket_name): if not self.auto_create_bucket: From 3cc63537e83f25b6811c479844f9108cddedf3a3 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 20 Feb 2026 11:42:33 +0100 Subject: [PATCH 10/15] fix: make S3 connection pool size configurable Add s3_max_pool_connections setting (default 50) to SandboxStoreSettings, configurable via DIRACX_SANDBOX_STORE_S3_MAX_POOL_CONNECTIONS. --- diracx-core/src/diracx/core/settings.py | 13 ++++++++++++- docs/admin/reference/env-variables.md | 9 +++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/diracx-core/src/diracx/core/settings.py b/diracx-core/src/diracx/core/settings.py index 2de069341..eee504235 100644 --- a/diracx-core/src/diracx/core/settings.py +++ b/diracx-core/src/diracx/core/settings.py @@ -269,6 +269,14 @@ class SandboxStoreSettings(ServiceSettingsBase): This name is used within DIRAC to refer to this sandbox storage endpoint in job descriptions and file catalogs. """ + + s3_max_pool_connections: int = 50 + """Maximum number of connections in the S3 client connection pool. + + Higher values allow more parallel S3 requests (e.g. during bulk sandbox + deletion). Default: 50. + """ + _client: S3Client = PrivateAttr() @contextlib.asynccontextmanager @@ -276,7 +284,10 @@ async def lifetime_function(self) -> AsyncIterator[None]: async with get_session().create_client( "s3", **self.s3_client_kwargs, - config=Config(signature_version="v4", max_pool_connections=50), + config=Config( + signature_version="v4", + max_pool_connections=self.s3_max_pool_connections, + ), ) as self._client: # type: ignore if not await s3_bucket_exists(self._client, self.bucket_name): if not self.auto_create_bucket: diff --git a/docs/admin/reference/env-variables.md b/docs/admin/reference/env-variables.md index 5694af7fc..35e13caed 100644 --- a/docs/admin/reference/env-variables.md +++ b/docs/admin/reference/env-variables.md @@ -148,6 +148,15 @@ Logical name of the Storage Element for the sandbox store. This name is used within DIRAC to refer to this sandbox storage endpoint in job descriptions and file catalogs. +### `DIRACX_SANDBOX_STORE_S3_MAX_POOL_CONNECTIONS` + +*Optional*, default value: `50` + +Maximum number of connections in the S3 client connection pool. + +Higher values allow more parallel S3 requests (e.g. during bulk sandbox +deletion). Default: 50. + ## OTELSettings Settings for the Open Telemetry Configuration. From 55f303b354726fdac46fa94523b6301e8e97c004 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 20 Feb 2026 15:57:44 +0100 Subject: [PATCH 11/15] fix: stop iterating when batch returns fewer than batch_size candidates Avoids redundant SELECT queries when the previous batch already exhausted all matching rows. --- diracx-logic/src/diracx/logic/jobs/sandboxes.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index dc66ad086..affa19a44 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -291,4 +291,8 @@ async def _delete_chunk(chunk: list[int], _sem: asyncio.Semaphore = sem) -> int: total_deleted, ) + # If we got fewer than batch_size, there are no more candidates + if len(sb_ids) < batch_size: + break + return total_deleted From 83790e0a74ed56df9e4ebcbb60b85633839502d1 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 20 Feb 2026 16:48:06 +0100 Subject: [PATCH 12/15] fix: handle S3 partial delete failures and pass se_name directly s3_bulk_delete_with_retry now returns failed keys instead of raising, and _s3_delete_chunk_with_retry retries only the failed keys. The caller skips DB deletion for sandboxes whose S3 objects were not removed, preventing dark data. Also removes the per-batch SELECT DISTINCT SEName query by passing settings.se_name directly, and updates the stale docstring. --- diracx-core/src/diracx/core/s3.py | 52 ++++++++++++++----- .../src/diracx/db/sql/sandbox_metadata/db.py | 23 ++++---- .../src/diracx/logic/jobs/sandboxes.py | 30 ++++++++++- 3 files changed, 76 insertions(+), 29 deletions(-) diff --git a/diracx-core/src/diracx/core/s3.py b/diracx-core/src/diracx/core/s3.py index 7e5abd5fc..afc0c9d20 100644 --- a/diracx-core/src/diracx/core/s3.py +++ b/diracx-core/src/diracx/core/s3.py @@ -89,34 +89,60 @@ def b16_to_b64(hex_string: str) -> str: async def s3_bulk_delete_with_retry( s3_client, bucket: str, objects: list[S3Object] -) -> None: - # S3 delete_objects supports at most 1000 keys per call. - # Chunks are sent concurrently for throughput. +) -> set[str]: + """Delete objects from S3 in chunks of 1000, retrying failures. + + Returns: + Set of keys that failed to delete after all retries. + + """ max_chunk_size = 1000 chunks = [ objects[i : i + max_chunk_size] for i in range(0, len(objects), max_chunk_size) ] - async with asyncio.TaskGroup() as tg: - for chunk in chunks: - tg.create_task(_s3_delete_chunk_with_retry(s3_client, bucket, chunk)) + tasks = [_s3_delete_chunk_with_retry(s3_client, bucket, chunk) for chunk in chunks] + results = await asyncio.gather(*tasks) + failed_keys: set[str] = set() + for result in results: + failed_keys.update(result) + return failed_keys async def _s3_delete_chunk_with_retry( s3_client, bucket: str, objects: list[S3Object] -) -> None: +) -> set[str]: + """Try to delete a chunk of S3 objects, retrying partial failures. + + Returns: + Set of keys that failed to delete after all retries. + + """ max_attempts = 5 delay = 1.0 + remaining = objects for attempt in range(1, max_attempts + 1): try: response = await s3_client.delete_objects( Bucket=bucket, - Delete={"Objects": objects, "Quiet": True}, + Delete={"Objects": remaining, "Quiet": True}, ) - if "Errors" in response and response["Errors"]: - raise RuntimeError(f"S3 deletion error: {response['Errors']}") - return - except (ClientError, RuntimeError) as e: + except ClientError: if attempt == max_attempts: - raise RuntimeError(f"Failed to delete objects in {bucket}") from e + return {obj["Key"] for obj in remaining} await asyncio.sleep(delay) delay *= 2 + continue + + errors = response.get("Errors", []) + if not errors: + return set() + + # Retry only the keys that failed + failed_keys = {e["Key"] for e in errors} + if attempt == max_attempts: + return failed_keys + remaining = [obj for obj in remaining if obj["Key"] in failed_keys] + await asyncio.sleep(delay) + delay *= 2 + + return set() diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index 3945ca257..d81a92916 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -210,19 +210,21 @@ async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None: async def select_sandboxes_for_deletion( self, *, + se_name: str, batch_size: int = 500, cursor: int = 0, ) -> tuple[list[int], list[str], int]: - """Select a batch of sandboxes for deletion. + """Select a batch of sandboxes eligible for deletion. - Uses cursor-based pagination (SBId > cursor) to avoid re-scanning - rows that were already checked in previous batches. + Uses cursor-based pagination (SBId > cursor) and runs two queries: + - Orphaned: assigned but no entity mapping (NOT EXISTS) + - Unassigned: not assigned and older than 15 days - Runs two separate queries so MySQL can use indexes effectively: - - Orphaned assigned sandboxes (NOT EXISTS on entity mapping) - - Unassigned sandboxes older than 15 days (sargable comparison) + No row locking is used — the caller is responsible for crash safety + by deleting from S3 before removing DB rows. Args: + se_name: Storage element name to filter on. batch_size: Maximum number of sandboxes to select. cursor: Only consider sandboxes with SBId > cursor. @@ -231,15 +233,8 @@ async def select_sandboxes_for_deletion( new_cursor is the maximum SBId seen, to pass as cursor next time. """ - # Fetch distinct SEName values so the queries can use the Location - # unique key (SEName, SEPFN) instead of doing a full table scan. - se_names_result = await self.conn.execute(select(SandBoxes.SEName).distinct()) - se_names = [row.SEName for row in se_names_result] - if not se_names: - return [], [], cursor - s3_condition = and_( - SandBoxes.SEName.in_(se_names), + SandBoxes.SEName == se_name, SandBoxes.SEPFN.like("/S3/%"), ) diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index affa19a44..9dd617fef 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -234,6 +234,7 @@ async def clean_sandboxes( pfns, cursor, ) = await sandbox_metadata_db.select_sandboxes_for_deletion( + se_name=settings.se_name, batch_size=batch_size, cursor=cursor, ) @@ -256,14 +257,39 @@ async def clean_sandboxes( # since S3 is cleaned first) t0 = time.monotonic() objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns] - await s3_bulk_delete_with_retry( + failed_keys = await s3_bulk_delete_with_retry( settings.s3_client, settings.bucket_name, objects ) s3_duration = time.monotonic() - t0 + + if failed_keys: + # Only delete DB rows for sandboxes whose S3 objects were + # actually removed — the rest will be retried next run. + logger.warning( + "Batch %d: %d S3 deletions failed, skipping their DB rows", + batch_num, + len(failed_keys), + ) + filtered = [ + (sid, pfn) + for sid, pfn in zip(sb_ids, pfns) + if pfn_to_key(pfn) not in failed_keys + ] + if filtered: + sb_ids, pfns = map(list, zip(*filtered)) + else: + sb_ids, pfns = [], [] + logger.info( - "Batch %d: deleted %d from S3 (%.1fs)", batch_num, len(objects), s3_duration + "Batch %d: deleted %d from S3 (%.1fs)", + batch_num, + len(objects) - len(failed_keys), + s3_duration, ) + if not sb_ids: + continue + # Phase 3: Delete from DB in small chunks (each chunk is a short # transaction to avoid locking millions of rows in a single DELETE). # Up to 10 chunks run concurrently via a semaphore. From 0086d70f5cfa1126d5d1668c0a9bdb8461f701be Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 20 Feb 2026 16:53:10 +0100 Subject: [PATCH 13/15] fix: remove idx_assigned_lastaccesstime index from schema This index is not needed for the current cleanup queries. --- diracx-db/src/diracx/db/sql/sandbox_metadata/schema.py | 1 - 1 file changed, 1 deletion(-) diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/schema.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/schema.py index 77f51765f..473966ed6 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/schema.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/schema.py @@ -46,7 +46,6 @@ class SandBoxes(Base): __table_args__ = ( PrimaryKeyConstraint("SBId"), Index("OwnerId", "OwnerId"), - Index("idx_assigned_lastaccesstime", "Assigned", "LastAccessTime"), UniqueConstraint("SEName", "SEPFN", name="Location"), ) From 94e4f50bb2b98a808d12a8b825139f5eb770ce02 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Wed, 25 Feb 2026 14:41:06 +0100 Subject: [PATCH 14/15] fix: make sandbox cleaning parameters configurable via settings Move hardcoded batch_size, delete_chunk_size, and semaphore concurrency values into SandboxStoreSettings so they can be tuned per deployment. --- diracx-core/src/diracx/core/settings.py | 18 ++++++++++++++ .../src/diracx/logic/jobs/sandboxes.py | 8 +++---- docs/admin/reference/env-variables.md | 24 +++++++++++++++++++ 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/diracx-core/src/diracx/core/settings.py b/diracx-core/src/diracx/core/settings.py index eee504235..16bd9e1e7 100644 --- a/diracx-core/src/diracx/core/settings.py +++ b/diracx-core/src/diracx/core/settings.py @@ -277,6 +277,24 @@ class SandboxStoreSettings(ServiceSettingsBase): deletion). Default: 50. """ + clean_batch_size: int = 50_000 + """Number of sandbox candidates to select per batch during cleaning. + + Each batch runs SELECT → S3 delete → DB delete sequentially. Default: 50000. + """ + + clean_delete_chunk_size: int = 1000 + """Number of sandbox DB rows to delete per chunk during cleaning. + + Smaller chunks mean shorter transactions and less lock contention. Default: 1000. + """ + + clean_max_concurrent_db_deletes: int = 10 + """Maximum number of concurrent DB delete chunks during cleaning. + + Controls parallelism of database DELETE operations. Default: 10. + """ + _client: S3Client = PrivateAttr() @contextlib.asynccontextmanager diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index 9dd617fef..853cd9236 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -201,8 +201,6 @@ async def insert_sandbox( async def clean_sandboxes( sandbox_metadata_db: SandboxMetadataDB, settings: SandboxStoreSettings, - *, - batch_size: int = 50000, ) -> int: """Delete sandboxes that are not assigned to any job. @@ -214,12 +212,12 @@ async def clean_sandboxes( Args: sandbox_metadata_db: Database connection (not yet entered). settings: Sandbox store settings with S3 client. - batch_size: Number of sandboxes to process per batch. Returns: Total number of sandboxes deleted. """ + batch_size = settings.clean_batch_size total_deleted = 0 cursor = 0 batch_num = 0 @@ -294,8 +292,8 @@ async def clean_sandboxes( # transaction to avoid locking millions of rows in a single DELETE). # Up to 10 chunks run concurrently via a semaphore. t0 = time.monotonic() - delete_chunk_size = 1000 - sem = asyncio.Semaphore(10) + delete_chunk_size = settings.clean_delete_chunk_size + sem = asyncio.Semaphore(settings.clean_max_concurrent_db_deletes) async def _delete_chunk(chunk: list[int], _sem: asyncio.Semaphore = sem) -> int: async with _sem, sandbox_metadata_db: diff --git a/docs/admin/reference/env-variables.md b/docs/admin/reference/env-variables.md index 35e13caed..88762aa86 100644 --- a/docs/admin/reference/env-variables.md +++ b/docs/admin/reference/env-variables.md @@ -157,6 +157,30 @@ Maximum number of connections in the S3 client connection pool. Higher values allow more parallel S3 requests (e.g. during bulk sandbox deletion). Default: 50. +### `DIRACX_SANDBOX_STORE_CLEAN_BATCH_SIZE` + +*Optional*, default value: `50000` + +Number of sandbox candidates to select per batch during cleaning. + +Each batch runs SELECT → S3 delete → DB delete sequentially. Default: 50000. + +### `DIRACX_SANDBOX_STORE_CLEAN_DELETE_CHUNK_SIZE` + +*Optional*, default value: `1000` + +Number of sandbox DB rows to delete per chunk during cleaning. + +Smaller chunks mean shorter transactions and less lock contention. Default: 1000. + +### `DIRACX_SANDBOX_STORE_CLEAN_MAX_CONCURRENT_DB_DELETES` + +*Optional*, default value: `10` + +Maximum number of concurrent DB delete chunks during cleaning. + +Controls parallelism of database DELETE operations. Default: 10. + ## OTELSettings Settings for the Open Telemetry Configuration. From e64067d7517316764d881060c6bd288c3eee203f Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Thu, 26 Feb 2026 15:40:41 +0100 Subject: [PATCH 15/15] fix: remove redundant default values from settings docstrings The default values were duplicated in the docstring prose when they are already visible from the field definitions and the generated docs. --- diracx-core/src/diracx/core/settings.py | 8 ++++---- docs/admin/reference/env-variables.md | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/diracx-core/src/diracx/core/settings.py b/diracx-core/src/diracx/core/settings.py index 16bd9e1e7..6e97dcfbe 100644 --- a/diracx-core/src/diracx/core/settings.py +++ b/diracx-core/src/diracx/core/settings.py @@ -274,25 +274,25 @@ class SandboxStoreSettings(ServiceSettingsBase): """Maximum number of connections in the S3 client connection pool. Higher values allow more parallel S3 requests (e.g. during bulk sandbox - deletion). Default: 50. + deletion). """ clean_batch_size: int = 50_000 """Number of sandbox candidates to select per batch during cleaning. - Each batch runs SELECT → S3 delete → DB delete sequentially. Default: 50000. + Each batch runs SELECT → S3 delete → DB delete sequentially. """ clean_delete_chunk_size: int = 1000 """Number of sandbox DB rows to delete per chunk during cleaning. - Smaller chunks mean shorter transactions and less lock contention. Default: 1000. + Smaller chunks mean shorter transactions and less lock contention. """ clean_max_concurrent_db_deletes: int = 10 """Maximum number of concurrent DB delete chunks during cleaning. - Controls parallelism of database DELETE operations. Default: 10. + Controls parallelism of database DELETE operations. """ _client: S3Client = PrivateAttr() diff --git a/docs/admin/reference/env-variables.md b/docs/admin/reference/env-variables.md index 88762aa86..1770414e0 100644 --- a/docs/admin/reference/env-variables.md +++ b/docs/admin/reference/env-variables.md @@ -155,7 +155,7 @@ endpoint in job descriptions and file catalogs. Maximum number of connections in the S3 client connection pool. Higher values allow more parallel S3 requests (e.g. during bulk sandbox -deletion). Default: 50. +deletion). ### `DIRACX_SANDBOX_STORE_CLEAN_BATCH_SIZE` @@ -163,7 +163,7 @@ deletion). Default: 50. Number of sandbox candidates to select per batch during cleaning. -Each batch runs SELECT → S3 delete → DB delete sequentially. Default: 50000. +Each batch runs SELECT → S3 delete → DB delete sequentially. ### `DIRACX_SANDBOX_STORE_CLEAN_DELETE_CHUNK_SIZE` @@ -171,7 +171,7 @@ Each batch runs SELECT → S3 delete → DB delete sequentially. Default: 50000. Number of sandbox DB rows to delete per chunk during cleaning. -Smaller chunks mean shorter transactions and less lock contention. Default: 1000. +Smaller chunks mean shorter transactions and less lock contention. ### `DIRACX_SANDBOX_STORE_CLEAN_MAX_CONCURRENT_DB_DELETES` @@ -179,7 +179,7 @@ Smaller chunks mean shorter transactions and less lock contention. Default: 1000 Maximum number of concurrent DB delete chunks during cleaning. -Controls parallelism of database DELETE operations. Default: 10. +Controls parallelism of database DELETE operations. ## OTELSettings