diff --git a/diracx-core/src/diracx/core/s3.py b/diracx-core/src/diracx/core/s3.py index 5b3f88e99..afc0c9d20 100644 --- a/diracx-core/src/diracx/core/s3.py +++ b/diracx-core/src/diracx/core/s3.py @@ -89,20 +89,60 @@ def b16_to_b64(hex_string: str) -> str: async def s3_bulk_delete_with_retry( s3_client, bucket: str, objects: list[S3Object] -) -> None: +) -> set[str]: + """Delete objects from S3 in chunks of 1000, retrying failures. + + Returns: + Set of keys that failed to delete after all retries. + + """ + max_chunk_size = 1000 + chunks = [ + objects[i : i + max_chunk_size] for i in range(0, len(objects), max_chunk_size) + ] + tasks = [_s3_delete_chunk_with_retry(s3_client, bucket, chunk) for chunk in chunks] + results = await asyncio.gather(*tasks) + failed_keys: set[str] = set() + for result in results: + failed_keys.update(result) + return failed_keys + + +async def _s3_delete_chunk_with_retry( + s3_client, bucket: str, objects: list[S3Object] +) -> set[str]: + """Try to delete a chunk of S3 objects, retrying partial failures. + + Returns: + Set of keys that failed to delete after all retries. + + """ max_attempts = 5 delay = 1.0 + remaining = objects for attempt in range(1, max_attempts + 1): try: response = await s3_client.delete_objects( Bucket=bucket, - Delete={"Objects": objects, "Quiet": True}, + Delete={"Objects": remaining, "Quiet": True}, ) - if "Errors" in response and response["Errors"]: - raise RuntimeError(f"S3 deletion error: {response['Errors']}") - return - except (ClientError, RuntimeError) as e: + except ClientError: if attempt == max_attempts: - raise RuntimeError(f"Failed to delete objects in {bucket}") from e + return {obj["Key"] for obj in remaining} await asyncio.sleep(delay) delay *= 2 + continue + + errors = response.get("Errors", []) + if not errors: + return set() + + # Retry only the keys that failed + failed_keys = {e["Key"] for e in errors} + if attempt == max_attempts: + return failed_keys + remaining = [obj for obj in remaining if obj["Key"] in failed_keys] + await asyncio.sleep(delay) + delay *= 2 + + return set() diff --git a/diracx-core/src/diracx/core/settings.py b/diracx-core/src/diracx/core/settings.py index dd0681fbc..6e97dcfbe 100644 --- a/diracx-core/src/diracx/core/settings.py +++ b/diracx-core/src/diracx/core/settings.py @@ -269,6 +269,32 @@ class SandboxStoreSettings(ServiceSettingsBase): This name is used within DIRAC to refer to this sandbox storage endpoint in job descriptions and file catalogs. """ + + s3_max_pool_connections: int = 50 + """Maximum number of connections in the S3 client connection pool. + + Higher values allow more parallel S3 requests (e.g. during bulk sandbox + deletion). + """ + + clean_batch_size: int = 50_000 + """Number of sandbox candidates to select per batch during cleaning. + + Each batch runs SELECT → S3 delete → DB delete sequentially. + """ + + clean_delete_chunk_size: int = 1000 + """Number of sandbox DB rows to delete per chunk during cleaning. + + Smaller chunks mean shorter transactions and less lock contention. + """ + + clean_max_concurrent_db_deletes: int = 10 + """Maximum number of concurrent DB delete chunks during cleaning. + + Controls parallelism of database DELETE operations. + """ + _client: S3Client = PrivateAttr() @contextlib.asynccontextmanager @@ -276,7 +302,10 @@ async def lifetime_function(self) -> AsyncIterator[None]: async with get_session().create_client( "s3", **self.s3_client_kwargs, - config=Config(signature_version="v4"), + config=Config( + signature_version="v4", + max_pool_connections=self.s3_max_pool_connections, + ), ) as self._client: # type: ignore if not await s3_bucket_exists(self._client, self.bucket_name): if not self.auto_create_bucket: diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index dcbababc2..d81a92916 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -10,7 +10,6 @@ exists, insert, literal, - or_, select, update, ) @@ -24,7 +23,7 @@ from diracx.core.models.auth import UserInfo from diracx.core.models.sandbox import SandboxInfo, SandboxType from diracx.db.sql.utils.base import BaseSQLDB -from diracx.db.sql.utils.functions import days_since, utcnow +from diracx.db.sql.utils.functions import substract_date, utcnow from .schema import Base as SandboxMetadataDBBase from .schema import SandBoxes, SBEntityMapping, SBOwners @@ -209,53 +208,75 @@ async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None: await self.conn.execute(unassign_stmt) async def select_sandboxes_for_deletion( - self, *, batch_size: int = 500 - ) -> tuple[list[int], list[str]]: - """Select and lock a batch of sandboxes for deletion. + self, + *, + se_name: str, + batch_size: int = 500, + cursor: int = 0, + ) -> tuple[list[int], list[str], int]: + """Select a batch of sandboxes eligible for deletion. - Uses FOR UPDATE SKIP LOCKED on MySQL to allow concurrent workers to - process different sandboxes in parallel without conflicts. + Uses cursor-based pagination (SBId > cursor) and runs two queries: + - Orphaned: assigned but no entity mapping (NOT EXISTS) + - Unassigned: not assigned and older than 15 days + + No row locking is used — the caller is responsible for crash safety + by deleting from S3 before removing DB rows. Args: + se_name: Storage element name to filter on. batch_size: Maximum number of sandboxes to select. + cursor: Only consider sandboxes with SBId > cursor. Returns: - Tuple of (sb_ids, pfns) for the selected sandboxes. - On MySQL, the rows remain locked until the transaction commits/rollbacks. + Tuple of (sb_ids, pfns, new_cursor) for the selected sandboxes. + new_cursor is the maximum SBId seen, to pass as cursor next time. """ + s3_condition = and_( + SandBoxes.SEName == se_name, + SandBoxes.SEPFN.like("/S3/%"), + ) + conditions = [ - # If it has assigned to a job but is no longer mapped it can be removed + # Orphaned: assigned but no longer mapped to any entity and_( + SandBoxes.SBId > cursor, SandBoxes.Assigned, ~exists().where(SBEntityMapping.SBId == SandBoxes.SBId), + s3_condition, + ), + # Unassigned: never assigned and older than 15 days (sargable) + and_( + SandBoxes.SBId > cursor, + ~SandBoxes.Assigned, + SandBoxes.LastAccessTime < substract_date(days=15), + s3_condition, ), - # If the sandbox is still unassigned after 15 days, remove it - and_(~SandBoxes.Assigned, days_since(SandBoxes.LastAccessTime) >= 15), ] - # Sandboxes which are not on S3 will be handled by legacy DIRAC - condition = and_(SandBoxes.SEPFN.like("/S3/%"), or_(*conditions)) - - select_stmt = ( - select(SandBoxes.SBId, SandBoxes.SEPFN).where(condition).limit(batch_size) - ) - # FOR UPDATE SKIP LOCKED is only supported on MySQL - # SQLite is used for testing and doesn't support row locking - if self.conn.dialect.name == "mysql": - select_stmt = select_stmt.with_for_update(skip_locked=True) - elif self.conn.dialect.name != "sqlite": - raise NotImplementedError( - f"Unsupported database dialect: {self.conn.dialect.name}" + sb_ids: list[int] = [] + pfns: list[str] = [] + for condition in conditions: + remaining = batch_size - len(sb_ids) + if remaining <= 0: + break + stmt = ( + select(SandBoxes.SBId, SandBoxes.SEPFN) + .where(condition) + .order_by(SandBoxes.SBId) + .limit(remaining) ) + result = await self.conn.execute(stmt) + for row in result: + sb_ids.append(row.SBId) + pfns.append(row.SEPFN) - result = await self.conn.execute(select_stmt) - rows = result.all() - - sb_ids = [row.SBId for row in rows] - pfns = [row.SEPFN for row in rows] + if not sb_ids: + return [], [], cursor - return sb_ids, pfns + new_cursor = max(sb_ids) + return sb_ids, pfns, new_cursor async def delete_sandboxes(self, sb_ids: list[int]) -> int: """Delete sandboxes by their IDs. @@ -272,5 +293,5 @@ async def delete_sandboxes(self, sb_ids: list[int]) -> int: delete_stmt = delete(SandBoxes).where(SandBoxes.SBId.in_(sb_ids)) result = await self.conn.execute(delete_stmt) - logger.info("Deleted %d expired/unassigned sandboxes", result.rowcount) + logger.debug("Deleted %d expired/unassigned sandboxes", result.rowcount) return result.rowcount diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index f18be50ae..853cd9236 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -2,6 +2,7 @@ import asyncio import logging +import time from typing import TYPE_CHECKING, Any, Literal from diracx.core.exceptions import SandboxAlreadyInsertedError, SandboxNotFoundError @@ -200,69 +201,122 @@ async def insert_sandbox( async def clean_sandboxes( sandbox_metadata_db: SandboxMetadataDB, settings: SandboxStoreSettings, - *, - batch_size: int = 500, - max_workers: int = 10, ) -> int: """Delete sandboxes that are not assigned to any job. - Uses SELECT FOR UPDATE SKIP LOCKED to allow multiple workers to run - in parallel without conflicts. Each batch: - 1. Selects and locks rows - 2. Deletes from S3 + Each batch: + 1. Selects rows using cursor-based pagination + 2. Deletes from S3 (chunks sent concurrently) 3. Deletes from DB Args: sandbox_metadata_db: Database connection (not yet entered). settings: Sandbox store settings with S3 client. - batch_size: Number of sandboxes to process per batch. - max_workers: Maximum number of concurrent workers processing batches. Returns: Total number of sandboxes deleted. """ - # Check if parallel workers are supported - async with sandbox_metadata_db: - dialect = sandbox_metadata_db.conn.dialect.name - if max_workers > 1 and dialect == "sqlite": - raise NotImplementedError( - "SQLite does not support parallel workers (no SKIP LOCKED support)" + batch_size = settings.clean_batch_size + total_deleted = 0 + cursor = 0 + batch_num = 0 + while True: + batch_num += 1 + + # Phase 1: SELECT candidates (short transaction, no locks) + t0 = time.monotonic() + async with sandbox_metadata_db: + ( + sb_ids, + pfns, + cursor, + ) = await sandbox_metadata_db.select_sandboxes_for_deletion( + se_name=settings.se_name, + batch_size=batch_size, + cursor=cursor, + ) + select_duration = time.monotonic() - t0 + + if not pfns: + logger.info( + "Batch %d: no candidates found (%.1fs)", batch_num, select_duration + ) + break + + logger.info( + "Batch %d: selected %d candidates (%.1fs)", + batch_num, + len(pfns), + select_duration, + ) + + # Phase 2: Delete from S3 (no transaction — prevents dark data + # since S3 is cleaned first) + t0 = time.monotonic() + objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns] + failed_keys = await s3_bulk_delete_with_retry( + settings.s3_client, settings.bucket_name, objects ) + s3_duration = time.monotonic() - t0 + + if failed_keys: + # Only delete DB rows for sandboxes whose S3 objects were + # actually removed — the rest will be retried next run. + logger.warning( + "Batch %d: %d S3 deletions failed, skipping their DB rows", + batch_num, + len(failed_keys), + ) + filtered = [ + (sid, pfn) + for sid, pfn in zip(sb_ids, pfns) + if pfn_to_key(pfn) not in failed_keys + ] + if filtered: + sb_ids, pfns = map(list, zip(*filtered)) + else: + sb_ids, pfns = [], [] + + logger.info( + "Batch %d: deleted %d from S3 (%.1fs)", + batch_num, + len(objects) - len(failed_keys), + s3_duration, + ) + + if not sb_ids: + continue + + # Phase 3: Delete from DB in small chunks (each chunk is a short + # transaction to avoid locking millions of rows in a single DELETE). + # Up to 10 chunks run concurrently via a semaphore. + t0 = time.monotonic() + delete_chunk_size = settings.clean_delete_chunk_size + sem = asyncio.Semaphore(settings.clean_max_concurrent_db_deletes) + + async def _delete_chunk(chunk: list[int], _sem: asyncio.Semaphore = sem) -> int: + async with _sem, sandbox_metadata_db: + return await sandbox_metadata_db.delete_sandboxes(chunk) + + tasks = [ + _delete_chunk(sb_ids[i : i + delete_chunk_size]) + for i in range(0, len(sb_ids), delete_chunk_size) + ] + results = await asyncio.gather(*tasks) + deleted = sum(results) + db_duration = time.monotonic() - t0 + total_deleted += deleted + logger.info( + "Batch %d: deleted %d from DB (%.1fs, total so far: %d)", + batch_num, + deleted, + db_duration, + total_deleted, + ) + + # If we got fewer than batch_size, there are no more candidates + if len(sb_ids) < batch_size: + break - async def worker() -> int: - """Process batches until no more work is available.""" - worker_deleted = 0 - while True: - async with sandbox_metadata_db: - # Select and lock a batch of sandboxes - sb_ids, pfns = await sandbox_metadata_db.select_sandboxes_for_deletion( - batch_size=batch_size - ) - - if not pfns: - break - - # Delete from S3 first (while rows are locked) - objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns] - if logger.isEnabledFor(logging.DEBUG): - for pfn in pfns: - logger.debug("Deleting sandbox %s from S3", pfn) - - await s3_bulk_delete_with_retry( - settings.s3_client, settings.bucket_name, objects - ) - logger.info( - "Deleted %d sandboxes from %s", len(objects), settings.bucket_name - ) - - # Then delete from DB - await sandbox_metadata_db.delete_sandboxes(sb_ids) - worker_deleted += len(sb_ids) - - return worker_deleted - - async with asyncio.TaskGroup() as tg: - tasks = [tg.create_task(worker()) for _ in range(max_workers)] - - return sum(task.result() for task in tasks) + return total_deleted diff --git a/diracx-logic/tests/jobs/test_sandboxes.py b/diracx-logic/tests/jobs/test_sandboxes.py index f0c2231ee..6dc59197d 100644 --- a/diracx-logic/tests/jobs/test_sandboxes.py +++ b/diracx-logic/tests/jobs/test_sandboxes.py @@ -118,7 +118,7 @@ async def test_upload_and_clean( assert response.content == data # There should be no sandboxes to remove - await clean_sandboxes(sandbox_metadata_db, sandbox_settings, max_workers=1) + await clean_sandboxes(sandbox_metadata_db, sandbox_settings) # Try to download the sandbox async with sandbox_metadata_db: @@ -139,7 +139,7 @@ async def test_upload_and_clean( ) # Now the sandbox should be removed - await clean_sandboxes(sandbox_metadata_db, sandbox_settings, max_workers=1) + await clean_sandboxes(sandbox_metadata_db, sandbox_settings) # Check that the sandbox was actually removed from the bucket with pytest.raises(botocore.exceptions.ClientError, match="Not Found"): diff --git a/docs/admin/reference/env-variables.md b/docs/admin/reference/env-variables.md index 5694af7fc..1770414e0 100644 --- a/docs/admin/reference/env-variables.md +++ b/docs/admin/reference/env-variables.md @@ -148,6 +148,39 @@ Logical name of the Storage Element for the sandbox store. This name is used within DIRAC to refer to this sandbox storage endpoint in job descriptions and file catalogs. +### `DIRACX_SANDBOX_STORE_S3_MAX_POOL_CONNECTIONS` + +*Optional*, default value: `50` + +Maximum number of connections in the S3 client connection pool. + +Higher values allow more parallel S3 requests (e.g. during bulk sandbox +deletion). + +### `DIRACX_SANDBOX_STORE_CLEAN_BATCH_SIZE` + +*Optional*, default value: `50000` + +Number of sandbox candidates to select per batch during cleaning. + +Each batch runs SELECT → S3 delete → DB delete sequentially. + +### `DIRACX_SANDBOX_STORE_CLEAN_DELETE_CHUNK_SIZE` + +*Optional*, default value: `1000` + +Number of sandbox DB rows to delete per chunk during cleaning. + +Smaller chunks mean shorter transactions and less lock contention. + +### `DIRACX_SANDBOX_STORE_CLEAN_MAX_CONCURRENT_DB_DELETES` + +*Optional*, default value: `10` + +Maximum number of concurrent DB delete chunks during cleaning. + +Controls parallelism of database DELETE operations. + ## OTELSettings Settings for the Open Telemetry Configuration.