|
| 1 | +from __future__ import annotations |
| 2 | + |
1 | 3 | import os |
2 | | -import uuid |
| 4 | +import secrets |
| 5 | +import string |
3 | 6 | from datetime import datetime, timedelta |
4 | 7 | from sqlmodel import Session, select |
5 | | -from app.config import UPLOAD_DIR, DELETE_AFTER_HOURS |
| 8 | +from app.config import UPLOAD_DIR, DELETE_AFTER_HOURS, FILE_ID_LENGTH |
6 | 9 | from app.models import File |
7 | 10 |
|
8 | 11 | os.makedirs(UPLOAD_DIR, exist_ok=True) |
9 | 12 |
|
| 13 | +_SLUG_ALPHABET = string.ascii_letters + string.digits |
| 14 | +_MAX_SLUG_ATTEMPTS = 5 |
| 15 | + |
| 16 | + |
| 17 | +def _generate_file_id(length: int = FILE_ID_LENGTH) -> str: |
| 18 | + return "".join(secrets.choice(_SLUG_ALPHABET) for _ in range(length)) |
| 19 | + |
| 20 | + |
| 21 | +def _reserve_path(ext: str) -> tuple[str, str]: |
| 22 | + for _ in range(_MAX_SLUG_ATTEMPTS): |
| 23 | + file_id = _generate_file_id() |
| 24 | + stored_name = f"{file_id}{ext}" |
| 25 | + path = os.path.join(UPLOAD_DIR, stored_name) |
| 26 | + if not os.path.exists(path): |
| 27 | + return stored_name, path |
| 28 | + raise RuntimeError("Unable to allocate a unique file slug") |
| 29 | + |
| 30 | + |
10 | 31 | def save_file(file_bytes: bytes, original_name: str, content_type: str) -> str: |
11 | 32 | ext = os.path.splitext(original_name)[1] or ".bin" |
12 | | - file_id = str(uuid.uuid4()) |
13 | | - stored_name = f"{file_id}{ext}" |
14 | | - path = os.path.join(UPLOAD_DIR, stored_name) |
| 33 | + stored_name, path = _reserve_path(ext) |
15 | 34 | with open(path, "wb") as f: |
16 | 35 | f.write(file_bytes) |
17 | 36 | return stored_name, len(file_bytes) |
|
0 commit comments