Skip to content

Commit 8a8e4b2

Browse files
committed
fix: handle S3 partial delete failures and pass se_name directly
s3_bulk_delete_with_retry now returns failed keys instead of raising, and _s3_delete_chunk_with_retry retries only the failed keys. The caller skips DB deletion for sandboxes whose S3 objects were not removed, preventing dark data. Also removes the per-batch SELECT DISTINCT SEName query by passing settings.se_name directly, and updates the stale docstring.
1 parent 92bf936 commit 8a8e4b2

3 files changed

Lines changed: 76 additions & 29 deletions

File tree

  • diracx-core/src/diracx/core
  • diracx-db/src/diracx/db/sql/sandbox_metadata
  • diracx-logic/src/diracx/logic/jobs

diracx-core/src/diracx/core/s3.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -89,34 +89,60 @@ def b16_to_b64(hex_string: str) -> str:
8989

9090
async def s3_bulk_delete_with_retry(
9191
s3_client, bucket: str, objects: list[S3Object]
92-
) -> None:
93-
# S3 delete_objects supports at most 1000 keys per call.
94-
# Chunks are sent concurrently for throughput.
92+
) -> set[str]:
93+
"""Delete objects from S3 in chunks of 1000, retrying failures.
94+
95+
Returns:
96+
Set of keys that failed to delete after all retries.
97+
98+
"""
9599
max_chunk_size = 1000
96100
chunks = [
97101
objects[i : i + max_chunk_size] for i in range(0, len(objects), max_chunk_size)
98102
]
99-
async with asyncio.TaskGroup() as tg:
100-
for chunk in chunks:
101-
tg.create_task(_s3_delete_chunk_with_retry(s3_client, bucket, chunk))
103+
tasks = [_s3_delete_chunk_with_retry(s3_client, bucket, chunk) for chunk in chunks]
104+
results = await asyncio.gather(*tasks)
105+
failed_keys: set[str] = set()
106+
for result in results:
107+
failed_keys.update(result)
108+
return failed_keys
102109

103110

104111
async def _s3_delete_chunk_with_retry(
105112
s3_client, bucket: str, objects: list[S3Object]
106-
) -> None:
113+
) -> set[str]:
114+
"""Try to delete a chunk of S3 objects, retrying partial failures.
115+
116+
Returns:
117+
Set of keys that failed to delete after all retries.
118+
119+
"""
107120
max_attempts = 5
108121
delay = 1.0
122+
remaining = objects
109123
for attempt in range(1, max_attempts + 1):
110124
try:
111125
response = await s3_client.delete_objects(
112126
Bucket=bucket,
113-
Delete={"Objects": objects, "Quiet": True},
127+
Delete={"Objects": remaining, "Quiet": True},
114128
)
115-
if "Errors" in response and response["Errors"]:
116-
raise RuntimeError(f"S3 deletion error: {response['Errors']}")
117-
return
118-
except (ClientError, RuntimeError) as e:
129+
except ClientError:
119130
if attempt == max_attempts:
120-
raise RuntimeError(f"Failed to delete objects in {bucket}") from e
131+
return {obj["Key"] for obj in remaining}
121132
await asyncio.sleep(delay)
122133
delay *= 2
134+
continue
135+
136+
errors = response.get("Errors", [])
137+
if not errors:
138+
return set()
139+
140+
# Retry only the keys that failed
141+
failed_keys = {e["Key"] for e in errors}
142+
if attempt == max_attempts:
143+
return failed_keys
144+
remaining = [obj for obj in remaining if obj["Key"] in failed_keys]
145+
await asyncio.sleep(delay)
146+
delay *= 2
147+
148+
return set()

diracx-db/src/diracx/db/sql/sandbox_metadata/db.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -210,19 +210,21 @@ async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None:
210210
async def select_sandboxes_for_deletion(
211211
self,
212212
*,
213+
se_name: str,
213214
batch_size: int = 500,
214215
cursor: int = 0,
215216
) -> tuple[list[int], list[str], int]:
216-
"""Select a batch of sandboxes for deletion.
217+
"""Select a batch of sandboxes eligible for deletion.
217218
218-
Uses cursor-based pagination (SBId > cursor) to avoid re-scanning
219-
rows that were already checked in previous batches.
219+
Uses cursor-based pagination (SBId > cursor) and runs two queries:
220+
- Orphaned: assigned but no entity mapping (NOT EXISTS)
221+
- Unassigned: not assigned and older than 15 days
220222
221-
Runs two separate queries so MySQL can use indexes effectively:
222-
- Orphaned assigned sandboxes (NOT EXISTS on entity mapping)
223-
- Unassigned sandboxes older than 15 days (sargable comparison)
223+
No row locking is used — the caller is responsible for crash safety
224+
by deleting from S3 before removing DB rows.
224225
225226
Args:
227+
se_name: Storage element name to filter on.
226228
batch_size: Maximum number of sandboxes to select.
227229
cursor: Only consider sandboxes with SBId > cursor.
228230
@@ -231,15 +233,8 @@ async def select_sandboxes_for_deletion(
231233
new_cursor is the maximum SBId seen, to pass as cursor next time.
232234
233235
"""
234-
# Fetch distinct SEName values so the queries can use the Location
235-
# unique key (SEName, SEPFN) instead of doing a full table scan.
236-
se_names_result = await self.conn.execute(select(SandBoxes.SEName).distinct())
237-
se_names = [row.SEName for row in se_names_result]
238-
if not se_names:
239-
return [], [], cursor
240-
241236
s3_condition = and_(
242-
SandBoxes.SEName.in_(se_names),
237+
SandBoxes.SEName == se_name,
243238
SandBoxes.SEPFN.like("/S3/%"),
244239
)
245240

diracx-logic/src/diracx/logic/jobs/sandboxes.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ async def clean_sandboxes(
234234
pfns,
235235
cursor,
236236
) = await sandbox_metadata_db.select_sandboxes_for_deletion(
237+
se_name=settings.se_name,
237238
batch_size=batch_size,
238239
cursor=cursor,
239240
)
@@ -256,14 +257,39 @@ async def clean_sandboxes(
256257
# since S3 is cleaned first)
257258
t0 = time.monotonic()
258259
objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns]
259-
await s3_bulk_delete_with_retry(
260+
failed_keys = await s3_bulk_delete_with_retry(
260261
settings.s3_client, settings.bucket_name, objects
261262
)
262263
s3_duration = time.monotonic() - t0
264+
265+
if failed_keys:
266+
# Only delete DB rows for sandboxes whose S3 objects were
267+
# actually removed — the rest will be retried next run.
268+
logger.warning(
269+
"Batch %d: %d S3 deletions failed, skipping their DB rows",
270+
batch_num,
271+
len(failed_keys),
272+
)
273+
filtered = [
274+
(sid, pfn)
275+
for sid, pfn in zip(sb_ids, pfns)
276+
if pfn_to_key(pfn) not in failed_keys
277+
]
278+
if filtered:
279+
sb_ids, pfns = map(list, zip(*filtered))
280+
else:
281+
sb_ids, pfns = [], []
282+
263283
logger.info(
264-
"Batch %d: deleted %d from S3 (%.1fs)", batch_num, len(objects), s3_duration
284+
"Batch %d: deleted %d from S3 (%.1fs)",
285+
batch_num,
286+
len(objects) - len(failed_keys),
287+
s3_duration,
265288
)
266289

290+
if not sb_ids:
291+
continue
292+
267293
# Phase 3: Delete from DB in small chunks (each chunk is a short
268294
# transaction to avoid locking millions of rows in a single DELETE).
269295
# Up to 10 chunks run concurrently via a semaphore.

0 commit comments

Comments
 (0)