Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions diracx-core/src/diracx/core/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,60 @@ def b16_to_b64(hex_string: str) -> str:

async def s3_bulk_delete_with_retry(
s3_client, bucket: str, objects: list[S3Object]
) -> None:
) -> set[str]:
"""Delete objects from S3 in chunks of 1000, retrying failures.

Returns:
Set of keys that failed to delete after all retries.

"""
max_chunk_size = 1000
chunks = [
objects[i : i + max_chunk_size] for i in range(0, len(objects), max_chunk_size)
]
tasks = [_s3_delete_chunk_with_retry(s3_client, bucket, chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)
failed_keys: set[str] = set()
for result in results:
failed_keys.update(result)
return failed_keys


async def _s3_delete_chunk_with_retry(
s3_client, bucket: str, objects: list[S3Object]
) -> set[str]:
"""Try to delete a chunk of S3 objects, retrying partial failures.

Returns:
Set of keys that failed to delete after all retries.

"""
max_attempts = 5
delay = 1.0
remaining = objects
for attempt in range(1, max_attempts + 1):
try:
response = await s3_client.delete_objects(
Bucket=bucket,
Delete={"Objects": objects, "Quiet": True},
Delete={"Objects": remaining, "Quiet": True},
)
if "Errors" in response and response["Errors"]:
raise RuntimeError(f"S3 deletion error: {response['Errors']}")
return
except (ClientError, RuntimeError) as e:
except ClientError:
if attempt == max_attempts:
raise RuntimeError(f"Failed to delete objects in {bucket}") from e
return {obj["Key"] for obj in remaining}
await asyncio.sleep(delay)
delay *= 2
continue

errors = response.get("Errors", [])
if not errors:
return set()

# Retry only the keys that failed
failed_keys = {e["Key"] for e in errors}
if attempt == max_attempts:
return failed_keys
remaining = [obj for obj in remaining if obj["Key"] in failed_keys]
await asyncio.sleep(delay)
delay *= 2

return set()
31 changes: 30 additions & 1 deletion diracx-core/src/diracx/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,43 @@ class SandboxStoreSettings(ServiceSettingsBase):
This name is used within DIRAC to refer to this sandbox storage
endpoint in job descriptions and file catalogs.
"""

s3_max_pool_connections: int = 50
"""Maximum number of connections in the S3 client connection pool.

Higher values allow more parallel S3 requests (e.g. during bulk sandbox
deletion).
"""

clean_batch_size: int = 50_000
"""Number of sandbox candidates to select per batch during cleaning.

Each batch runs SELECT → S3 delete → DB delete sequentially.
"""

clean_delete_chunk_size: int = 1000
"""Number of sandbox DB rows to delete per chunk during cleaning.

Smaller chunks mean shorter transactions and less lock contention.
"""

clean_max_concurrent_db_deletes: int = 10
"""Maximum number of concurrent DB delete chunks during cleaning.

Controls parallelism of database DELETE operations.
"""

_client: S3Client = PrivateAttr()

@contextlib.asynccontextmanager
async def lifetime_function(self) -> AsyncIterator[None]:
async with get_session().create_client(
"s3",
**self.s3_client_kwargs,
config=Config(signature_version="v4"),
config=Config(
signature_version="v4",
max_pool_connections=self.s3_max_pool_connections,
),
) as self._client: # type: ignore
if not await s3_bucket_exists(self._client, self.bucket_name):
if not self.auto_create_bucket:
Expand Down
85 changes: 53 additions & 32 deletions diracx-db/src/diracx/db/sql/sandbox_metadata/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
exists,
insert,
literal,
or_,
select,
update,
)
Expand All @@ -24,7 +23,7 @@
from diracx.core.models.auth import UserInfo
from diracx.core.models.sandbox import SandboxInfo, SandboxType
from diracx.db.sql.utils.base import BaseSQLDB
from diracx.db.sql.utils.functions import days_since, utcnow
from diracx.db.sql.utils.functions import substract_date, utcnow

from .schema import Base as SandboxMetadataDBBase
from .schema import SandBoxes, SBEntityMapping, SBOwners
Expand Down Expand Up @@ -209,53 +208,75 @@ async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None:
await self.conn.execute(unassign_stmt)

async def select_sandboxes_for_deletion(
self, *, batch_size: int = 500
) -> tuple[list[int], list[str]]:
"""Select and lock a batch of sandboxes for deletion.
self,
*,
se_name: str,
batch_size: int = 500,
cursor: int = 0,
) -> tuple[list[int], list[str], int]:
"""Select a batch of sandboxes eligible for deletion.

Uses FOR UPDATE SKIP LOCKED on MySQL to allow concurrent workers to
process different sandboxes in parallel without conflicts.
Uses cursor-based pagination (SBId > cursor) and runs two queries:
- Orphaned: assigned but no entity mapping (NOT EXISTS)
- Unassigned: not assigned and older than 15 days

No row locking is used — the caller is responsible for crash safety
by deleting from S3 before removing DB rows.

Args:
se_name: Storage element name to filter on.
batch_size: Maximum number of sandboxes to select.
cursor: Only consider sandboxes with SBId > cursor.

Returns:
Tuple of (sb_ids, pfns) for the selected sandboxes.
On MySQL, the rows remain locked until the transaction commits/rollbacks.
Tuple of (sb_ids, pfns, new_cursor) for the selected sandboxes.
new_cursor is the maximum SBId seen, to pass as cursor next time.

"""
s3_condition = and_(
SandBoxes.SEName == se_name,
SandBoxes.SEPFN.like("/S3/%"),
)

conditions = [
# If it has assigned to a job but is no longer mapped it can be removed
# Orphaned: assigned but no longer mapped to any entity
and_(
SandBoxes.SBId > cursor,
SandBoxes.Assigned,
~exists().where(SBEntityMapping.SBId == SandBoxes.SBId),
s3_condition,
),
# Unassigned: never assigned and older than 15 days (sargable)
and_(
SandBoxes.SBId > cursor,
~SandBoxes.Assigned,
SandBoxes.LastAccessTime < substract_date(days=15),
s3_condition,
),
# If the sandbox is still unassigned after 15 days, remove it
and_(~SandBoxes.Assigned, days_since(SandBoxes.LastAccessTime) >= 15),
]
# Sandboxes which are not on S3 will be handled by legacy DIRAC
condition = and_(SandBoxes.SEPFN.like("/S3/%"), or_(*conditions))

select_stmt = (
select(SandBoxes.SBId, SandBoxes.SEPFN).where(condition).limit(batch_size)
)

# FOR UPDATE SKIP LOCKED is only supported on MySQL
# SQLite is used for testing and doesn't support row locking
if self.conn.dialect.name == "mysql":
select_stmt = select_stmt.with_for_update(skip_locked=True)
elif self.conn.dialect.name != "sqlite":
raise NotImplementedError(
f"Unsupported database dialect: {self.conn.dialect.name}"
sb_ids: list[int] = []
pfns: list[str] = []
for condition in conditions:
remaining = batch_size - len(sb_ids)
if remaining <= 0:
break
stmt = (
select(SandBoxes.SBId, SandBoxes.SEPFN)
.where(condition)
.order_by(SandBoxes.SBId)
.limit(remaining)
)
result = await self.conn.execute(stmt)
for row in result:
sb_ids.append(row.SBId)
pfns.append(row.SEPFN)

result = await self.conn.execute(select_stmt)
rows = result.all()

sb_ids = [row.SBId for row in rows]
pfns = [row.SEPFN for row in rows]
if not sb_ids:
return [], [], cursor

return sb_ids, pfns
new_cursor = max(sb_ids)
return sb_ids, pfns, new_cursor

async def delete_sandboxes(self, sb_ids: list[int]) -> int:
"""Delete sandboxes by their IDs.
Expand All @@ -272,5 +293,5 @@ async def delete_sandboxes(self, sb_ids: list[int]) -> int:

delete_stmt = delete(SandBoxes).where(SandBoxes.SBId.in_(sb_ids))
result = await self.conn.execute(delete_stmt)
logger.info("Deleted %d expired/unassigned sandboxes", result.rowcount)
logger.debug("Deleted %d expired/unassigned sandboxes", result.rowcount)
return result.rowcount
Loading
Loading