✨(storage) implement tiered storage#486
Conversation
📝 WalkthroughWalkthroughAdds S3-backed tiered blob storage with AES-GCM encryption, schema/migration, GC with reservations, offload/verify/restore tooling, API/MDA refactors, admin updates, Redis-only coalescer, importer Range reads, settings/env/docs/CI updates, Celery beat, and extensive tests. ChangesTiered Blob Storage, Encryption, GC, and Integrations
Sequence Diagram(s)sequenceDiagram
participant Client
participant API
participant DB
participant GC as GC/Reservations
participant Tiered as TieredStorageService
participant S3 as Object Storage
Client->>API: POST /blob/upload
API->>GC: upload_and_reserve_blob(mailbox, content)
GC->>DB: Create Blob + MailboxBlob (atomic)
API-->>Client: blobId,size,sha256
Note over API,Tiered: Later (hourly offload)
API->>Tiered: offload_one_blob(id)
Tiered->>S3: PUT blobs/{key_id}/{shard}/{sha}
Tiered->>DB: Update location/encryption, clear raw_content
Client->>API: GET /blob/{id}
API->>DB: user_can_access(user, blob.id)
alt Offloaded
API->>Tiered: download_blob(id)
Tiered->>S3: GET object
Tiered-->>API: decrypted bytes
else In Postgres
API->>DB: read raw_content and decrypt
end
API-->>Client: content
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested reviewers
✨ Finishing Touches📝 Generate docstrings
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 466-495: The current flow writes the newly encrypted object via
self.service.storage.save(storage_key, ...) before updating
blob.encryption_key_id inside transaction.atomic(), risking storage/DB
inconsistency if the DB update fails; instead, write the new encrypted bytes to
a temporary object (e.g. derive a temp key from storage_key and new_key_id)
using self.service.storage.save(temp_key, ContentFile(encrypted)), then perform
the DB update inside transaction.atomic() (update blob.encryption_key_id and
save), and only after the transaction succeeds atomically remove/rename the temp
object to the final storage_key (or copy temp→final and delete temp) so storage
and DB remain consistent; reference symbols: self.service.storage.save,
storage_key, temp_key (create), self.service.encrypt, blob.encryption_key_id,
transaction.atomic.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 31-43: In __init__, the enabled gate currently checks for an
OPTIONS.endpoint_url which wrongly disables valid S3 setups; instead set
self.enabled based on presence of the "message-blobs" storage config itself
(e.g. check that settings.STORAGES contains a non-empty "message-blobs" entry).
Update the assignment to self.enabled to use
settings.STORAGES.get("message-blobs") (or "message-blobs" in settings.STORAGES
and truthy) rather than digging for OPTIONS.endpoint_url so AWS S3 configs
without endpoint_url remain enabled.
🧹 Nitpick comments (3)
src/backend/core/services/tiered_storage_tasks.py (1)
68-133: Consider adding retry for transient failures.The task handles lock contention gracefully by returning "locked" status, but transient failures (network issues, temporary S3 unavailability) at line 131 are logged and returned as errors without retry. The periodic
offload_blobs_taskwill eventually re-queue these blobs, but adding explicit retry behavior for transient exceptions (e.g.,ConnectionError,Timeout) could improve reliability.💡 Optional: Add retry for transient failures
-@celery_app.task(bind=True) +@celery_app.task(bind=True, autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, max_retries=3) def offload_single_blob_task(self, blob_id: str) -> Dict[str, Any]:src/backend/core/models.py (1)
1536-1557: Enforce storage_location/raw_content invariants at the DB layer.
Withraw_contentnow nullable, inconsistent states (e.g., OBJECT_STORAGE + non-null content)
become possible and will surface as runtime errors inget_content. A check constraint makes
the invariant explicit and avoids silent drift. This will require a migration.♻️ Proposed constraint
constraints = [ models.CheckConstraint( check=( models.Q(mailbox__isnull=False) | models.Q(maildomain__isnull=False) ), name="blob_has_owner", ), + models.CheckConstraint( + check=( + models.Q( + storage_location=BlobStorageLocationChoices.POSTGRES, + raw_content__isnull=False, + ) + | models.Q( + storage_location=BlobStorageLocationChoices.OBJECT_STORAGE, + raw_content__isnull=True, + ) + ), + name="blob_storage_location_matches_content", + ), ]As per coding guidelines, enforce data integrity with model constraints.
Also applies to: 1583-1589
src/backend/core/services/tiered_storage.py (1)
244-281: Guard against orphan-delete races and capture delete errors.
There’s a TOCTOU window between the reference count (Line 259-263) and deletion (Line 274-275);
a concurrent offload could add a reference after the count and still have its object deleted.
Consider an advisory lock keyed by SHA256 or a transactional guard around the check+delete.Also, capture the storage deletion exception to Sentry so cleanup failures are observable.
♻️ Suggested Sentry capture
from cryptography.fernet import Fernet +from sentry_sdk import capture_exception @@ - except Exception as e: # pylint: disable=broad-except - logger.warning("Failed to delete blob from storage %s: %s", key, e) + except Exception as exc: # pylint: disable=broad-except + capture_exception(exc) + logger.warning("Failed to delete blob from storage %s: %s", key, exc) return FalseAs per coding guidelines, capture and report exceptions to Sentry.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
compose.yamlenv.d/development/backend.defaultssrc/backend/core/api/viewsets/config.pysrc/backend/core/enums.pysrc/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.pysrc/backend/core/models.pysrc/backend/core/services/search/search.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/services/tiered_storage_tasks.pysrc/backend/core/signals.pysrc/backend/core/tests/commands/__init__.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/conftest.pysrc/backend/core/tests/services/__init__.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/__init__.pysrc/backend/core/tests/tasks/test_task_send_message.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.pysrc/backend/core/utils.pysrc/backend/messages/celery_app.pysrc/backend/messages/settings.py
🧰 Additional context used
📓 Path-based instructions (6)
src/backend/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
src/backend/**/*.py: Follow Django/PEP 8 style with a 100-character line limit
Use descriptive, snake_case names for variables and functions
Use Django ORM for database access; avoid raw SQL unless necessary for performance
Use Django’s built-in user model and authentication framework
Prefer try-except blocks to handle exceptions in business logic and views
Log expected and unexpected actions with appropriate log levels
Capture and report exceptions to Sentry; use capture_exception() for custom errors
Do not log sensitive information (tokens, passwords, financial/health data, PII)
Files:
src/backend/messages/celery_app.pysrc/backend/core/tests/tasks/__init__.pysrc/backend/core/api/viewsets/config.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/services/search/search.pysrc/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/utils.pysrc/backend/core/signals.pysrc/backend/core/services/tiered_storage_tasks.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/test_task_send_message.pysrc/backend/messages/settings.pysrc/backend/core/tests/conftest.pysrc/backend/core/models.pysrc/backend/core/tests/services/__init__.pysrc/backend/core/enums.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/tests/commands/__init__.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.pysrc/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py
src/backend/**/{tests.py,tests/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
src/backend/**/{tests.py,tests/**/*.py}: Use Django’s testing tools (pytest-django) to ensure code quality and reliability
Unit tests should focus on a single use case, keep assertions minimal, and cover all possible cases
Files:
src/backend/core/tests/tasks/__init__.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/test_task_send_message.pysrc/backend/core/tests/conftest.pysrc/backend/core/tests/services/__init__.pysrc/backend/core/tests/commands/__init__.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.py
src/backend/**/{settings.py,middleware.py}
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
Use middleware judiciously for cross-cutting concerns (authentication, logging, caching)
Files:
src/backend/messages/settings.py
src/backend/**/settings.py
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
src/backend/**/settings.py: Leverage Django’s caching framework (e.g., Redis/Memcached) where appropriate
Use Django’s cache framework with a backend like Redis or Memcached to reduce DB load
Optimize static file handling using Django’s staticfiles pipeline (e.g., WhiteNoise)
Files:
src/backend/messages/settings.py
src/backend/**/{models.py,forms.py,views.py}
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
Keep business logic in models and forms; keep views thin and focused on request handling
Files:
src/backend/core/models.py
src/backend/**/{models.py,migrations/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)
Implement database indexing and query optimization (Model Meta indexes, constraints)
Files:
src/backend/core/models.pysrc/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py
🧠 Learnings (1)
📚 Learning: 2025-09-02T10:12:12.835Z
Learnt from: CR
Repo: suitenumerique/messages PR: 0
File: .cursor/rules/django-python.mdc:0-0
Timestamp: 2025-09-02T10:12:12.835Z
Learning: Applies to src/backend/**/{tests.py,tests/**/*.py} : Use Django’s testing tools (pytest-django) to ensure code quality and reliability
Applied to files:
src/backend/core/tests/conftest.pysrc/backend/core/tests/commands/__init__.py
🧬 Code graph analysis (3)
src/backend/messages/settings.py (1)
src/backend/core/utils.py (1)
JSONValue(8-22)
src/backend/core/tests/conftest.py (1)
src/backend/core/services/tiered_storage.py (1)
storage(45-49)
src/backend/core/models.py (2)
src/backend/core/enums.py (2)
BlobStorageLocationChoices(61-65)CompressionTypeChoices(54-58)src/backend/core/services/tiered_storage.py (6)
TieredStorageService(28-296)encrypt(68-91)compute_storage_key(52-66)decrypt(93-117)download_blob(208-242)delete_if_orphaned(244-280)
🪛 Ruff (0.14.11)
src/backend/core/tests/commands/test_verify_tiered_storage.py
31-31: import should be at the top-level of a file
(PLC0415)
180-180: import should be at the top-level of a file
(PLC0415)
208-208: import should be at the top-level of a file
(PLC0415)
275-275: import should be at the top-level of a file
(PLC0415)
357-357: import should be at the top-level of a file
(PLC0415)
397-397: import should be at the top-level of a file
(PLC0415)
415-415: import should be at the top-level of a file
(PLC0415)
442-442: import should be at the top-level of a file
(PLC0415)
470-470: import should be at the top-level of a file
(PLC0415)
506-506: import should be at the top-level of a file
(PLC0415)
552-552: import should be at the top-level of a file
(PLC0415)
596-596: import should be at the top-level of a file
(PLC0415)
630-630: import should be at the top-level of a file
(PLC0415)
src/backend/core/management/commands/verify_tiered_storage.py
280-280: import should be at the top-level of a file
(PLC0415)
282-282: import should be at the top-level of a file
(PLC0415)
src/backend/core/tests/services/test_tiered_storage.py
114-114: import should be at the top-level of a file
(PLC0415)
129-129: import should be at the top-level of a file
(PLC0415)
271-271: import should be at the top-level of a file
(PLC0415)
296-296: import should be at the top-level of a file
(PLC0415)
362-362: import should be at the top-level of a file
(PLC0415)
396-396: import should be at the top-level of a file
(PLC0415)
571-571: import should be at the top-level of a file
(PLC0415)
614-614: import should be at the top-level of a file
(PLC0415)
654-654: import should be at the top-level of a file
(PLC0415)
src/backend/core/tests/conftest.py
22-22: import should be at the top-level of a file
(PLC0415)
src/backend/core/models.py
1474-1474: import should be at the top-level of a file
(PLC0415)
1625-1625: import should be at the top-level of a file
(PLC0415)
1656-1656: import should be at the top-level of a file
(PLC0415)
src/backend/core/services/tiered_storage.py
132-132: import should be at the top-level of a file
(PLC0415)
149-149: import should be at the top-level of a file
(PLC0415)
257-257: import should be at the top-level of a file
(PLC0415)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: test-e2e
- GitHub Check: lint-back
- GitHub Check: build-front
- GitHub Check: test-front
- GitHub Check: check-api-state
- GitHub Check: lint-front
- GitHub Check: test-back
🔇 Additional comments (52)
src/backend/core/tests/tasks/__init__.py (1)
1-1: LGTM!Clean package initializer for the tasks test module.
src/backend/core/tests/commands/__init__.py (1)
1-1: LGTM!Appropriate package initializer for management command tests.
src/backend/core/tests/services/__init__.py (1)
1-1: LGTM!Appropriate package initializer for service tests.
src/backend/core/tests/tasks/test_task_send_message.py (3)
3-3: LGTM!Appropriate pylint disables for test file patterns:
no-value-for-parameterfor Celery task invocations without bound arguments, andunused-argumentfor pytest fixtures that establish state but aren't directly referenced.
45-45: Good hygiene.Using
_mailboxprefix correctly signals the variable is intentionally unused while still unpacking the fixture tuple.
99-99: Consistent with the pattern above.src/backend/core/api/viewsets/config.py (1)
139-140: LGTM!Using
getattr(settings, setting, None)ensures consistent API response schema where all keys are always present, aligning with the OpenAPI specification that marks these fields as required. This is cleaner than conditional inclusion and provides predictable behavior for frontend consumers.src/backend/core/utils.py (1)
14-22: LGTM!Returning
Nonefor empty/whitespace strings allows django-configurations to fall back to default values, which is appropriate for optional JSON configuration like encryption keys.env.d/development/backend.defaults (1)
54-66: LGTM!The development defaults are well-documented with clear comments. Tiered storage is configured with a 3-day offload threshold and credentials follow the existing pattern for
msg-imports.compose.yaml (1)
84-84: LGTM!The
msg-blobsbucket creation follows the existing pattern. Correctly omits the ILM expiration rules since blobs are intended for long-term storage unlike temporary imports.src/backend/messages/celery_app.py (1)
48-52: LGTM! The new beat schedule entry follows the existing pattern and an hourly interval is appropriate for blob offloading. Thecore.services.tiered_storage_tasks.offload_blobs_taskis properly defined with the@celery_app.task(bind=True)decorator and has comprehensive test coverage.src/backend/core/services/search/search.py (1)
40-42: No action required; direct access tosettings.OPENSEARCH_INDEX_THREADSis safe.The setting is properly defined in
src/backend/messages/settings.pyas aBooleanValuewith a default ofTrue. It will always exist at runtime, and direct access does not riskAttributeError. This pattern is already used consistently throughout the codebase in multiple other files (src/backend/core/signals.py,src/backend/core/services/search/tasks.py).Likely an incorrect or invalid review comment.
src/backend/core/enums.py (1)
61-66: LGTM!The new
BlobStorageLocationChoicesenum follows the existing patterns in this file with appropriate integer values and descriptive labels. Good placement in the logical order of the file.src/backend/core/tests/conftest.py (1)
14-45: LGTM!The session-scoped fixture properly defers Django imports and handles missing storage configuration gracefully. The broad exception handling is appropriate here to prevent test setup failures when object storage isn't configured. The
pylint: disablecomment at line 3 correctly covers the in-function import pattern.src/backend/core/services/tiered_storage_tasks.py (1)
26-65: LGTM!The task efficiently streams eligible blob IDs using
values_listwithiterator()to avoid memory pressure. The filtering criteria (age threshold + minimum size) appropriately limit the scope of each run.src/backend/core/tests/commands/test_verify_tiered_storage.py (3)
1-7: LGTM!Comprehensive test coverage for the
verify_tiered_storagemanagement command. The tests appropriately cover disabled states, E2E verification modes, hash verification with corruption detection, and re-encryption workflows. The in-function imports are intentionally used for test isolation and thepylint: disablecomment at line 7 correctly covers this pattern.
78-80: Good cleanup pattern.Consistent use of
try/finallywith existence checks ensures test isolation and prevents storage pollution across test runs.
468-534: Thorough E2E test for object storage re-encryption.This test covers the complete workflow: encrypting with old key, uploading to storage, rotating keys, re-encrypting, and verifying content integrity via download and decompression.
src/backend/core/signals.py (1)
51-52: The concern is unfounded.OPENSEARCH_INDEX_THREADSis always defined in settings with a default value (Trueinsrc/backend/messages/settings.py:83-84andFalseas a fallback at line 1090). Direct attribute access tosettings.OPENSEARCH_INDEX_THREADSis safe and will not raiseAttributeError. The change fromgetattr()to direct access is a valid simplification that removes unnecessary defensive programming.Likely an incorrect or invalid review comment.
src/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py (1)
12-27: Migration structure looks correct.The migration properly:
- Adds
storage_locationwithdb_index=Truefor efficient filtering- Makes
raw_contentnullable to support object storage blobs- Uses hardcoded choices in migration (correct Django practice)
One consideration: if key rotation queries will frequently filter by
encryption_key_id(e.g., finding all blobs encrypted with a specific key), adding an index on that field could be beneficial. However, this can be deferred based on actual query patterns.src/backend/core/tests/services/test_tiered_storage.py (5)
24-181: Comprehensive unit test coverage for encryption/decryption.The unit tests thoroughly cover:
- Storage key computation with different SHA256 prefixes
- Encryption passthrough when disabled (key_id=0)
- Proper error handling for invalid/missing keys and corrupted data
- Key rotation scenarios maintaining backward compatibility
Good separation of concerns with no DB or storage dependencies.
183-263: Good database-level test coverage.Tests properly validate:
- Default storage location behavior
- Content retrieval from PostgreSQL
- Error handling when content is missing
- SHA256-based storage key derivation
- Deduplication detection via
check_already_uploaded
389-420: Critical regression test for double-encryption bug.This test (
test_offload_with_encryption_roundtrip) is valuable as it explicitly guards against the double-encryption bug mentioned in the docstring. The test verifies that encrypted blobs can be offloaded and read back correctly, which is a common failure point.
494-562: Important deduplication behavior documented in test.The test correctly validates that when two blobs with identical content are encrypted with different keys, deduplication uses the first blob's encryption key_id. The inline comment at line 554 clarifies this is expected behavior until key rotation is complete.
565-667: Key rotation tests cover both storage locations.The tests properly validate the re-encryption workflow for:
- PostgreSQL-stored blobs (decrypt with old key, encrypt with new key, update in place)
- Object storage blobs (download, decrypt, re-encrypt, upload, update metadata)
Both tests verify content integrity after rotation, which is critical.
src/backend/messages/settings.py (2)
234-259: New storage configuration follows existing patterns.The
message-blobsstorage configuration mirrors the existingmessage-importspattern. The default bucket namemsg-blobsis provided.One difference:
endpoint_urlhas no default value, whereas some configurations might expect a default for local development. Verify this is intentional and that the development environment properly setsSTORAGE_MESSAGE_BLOBS_ENDPOINT_URL.
376-398: Well-documented encryption and offload configuration.The configuration properly:
- Documents the key format and key_id=0 convention
- Uses appropriate types (
JSONValuefor dict,PositiveIntegerValuefor IDs)- Defaults to encryption disabled (key_id=0), requiring explicit opt-in
- Allows fine-grained control over offload timing and size thresholds
src/backend/core/tests/tasks/test_tiered_storage_tasks.py (5)
32-48: Disabled state test properly mocks settings.The test correctly mocks the storage configuration to verify the task gracefully handles the disabled state.
55-120: Good coverage of blob eligibility criteria.The tests properly validate:
- Age-based filtering using
TIERED_STORAGE_OFFLOAD_AFTER_DAYS- Size-based filtering using
TIERED_STORAGE_OFFLOAD_MIN_SIZE- The use of
Blob.objects.filter().update()correctly bypasses any auto-update timestampsThe conditional assertion at line 119 (
if settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0) adapts to the test environment settings, which is acceptable.
156-171: Consistent disabled state handling test.Follows the same mocking pattern as the batch task test.
242-264: Good error handling test with rollback verification.The test
test_handles_upload_errorproperly verifies that:
- Upload errors are caught and reported
- The blob state is preserved (transaction rolled back)
- Storage location remains
POSTGRESandraw_contentis intactThis ensures data integrity during upload failures.
335-352: Idempotency test validates concurrent safety.The
test_concurrent_offload_idempotenttest verifies that repeated offload attempts for the same blob are handled gracefully, returningalready_offloadedon subsequent calls. This is important for Celery task retry scenarios.src/backend/core/management/commands/verify_tiered_storage.py (6)
24-88: Well-designed CLI interface with clear modes.The command provides:
- Multiple verification modes (
db-to-storage,storage-to-db,full)- Safety features (
--dry-run,--limit)- Key rotation capability (
--re-encrypt)- Proper early exit when storage is not configured
89-134: DB-to-storage verification handles scale well.The method:
- Uses
iterator(chunk_size=1000)to avoid memory issues with large datasets- Reports missing blobs to stderr (appropriate severity for potential data loss)
- Respects the
--limitoption for sampling
136-218: Storage-to-DB verification with orphan cleanup.The method properly:
- Validates storage path format before processing
- Detects orphans (objects without DB references)
- Optionally deletes orphans with
--fix- Optionally verifies hashes (wisely marked as "slow")
Progress is reported every 100 objects (line 201), which provides good feedback during long operations.
220-254: Storage listing handles multiple backends.The method:
- Prefers direct boto3 access with pagination for efficiency
- Falls back to Django's
listdirfor compatibility- Raises clear error when listing is not supported
256-305: Hash verification covers the complete pipeline.The method correctly:
- Downloads encrypted content from storage
- Decrypts using the blob's
encryption_key_id- Decompresses based on
compressiontype- Computes and compares SHA256 hash
The broad exception handling is acceptable here since the goal is to report issues, not crash.
307-402: Re-encryption workflow with proper validation.The method:
- Validates encryption configuration before starting
- Uses smaller chunk size (100) appropriate for the heavier re-encryption operation
- Supports
--dry-runfor safe preview- Provides clear progress reporting and final summary
src/backend/core/models.py (6)
31-33: No review needed for this import change.
1472-1487: Encryption is integrated cleanly into blob creation.
Storing encrypted bytes and the key id alongside compression keeps retrieval consistent.
1595-1599: Conditionalsize_compressedupdate makes sense.
This avoids clobbering the stored size after offload clearsraw_content.
1601-1609: No issues spotted.
1624-1643: Content retrieval flow reads cleanly.
The PostgreSQL vs object-storage branches and decryption/decompression steps are clear.
1645-1660: Deletion flow aligns with object-storage cleanup.src/backend/core/services/tiered_storage.py (8)
1-24: No review needed for the module header/imports.
44-49: Lazy storage initialization looks good.
51-66: Storage key sharding is straightforward.
68-117: Encryption/decryption flow is consistent with key_id semantics.
Please double-check that configured key formats align with Fernet requirements.
119-155: DB-backed dedup lookups are clear.
208-243: Download path is clear and well-scoped.
282-296: Existence check is straightforward.
157-207: Return value is correctly handled in production code. The call site insrc/backend/core/services/tiered_storage_tasks.py(line 113) properly captures the returnedkey_idand updatesblob.encryption_key_idaccordingly. Dedup-specific tests also confirm this pattern works correctly when the same content is uploaded with different encryption keys.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 449-455: When re-encrypting Postgres blobs in
verify_tiered_storage.py (using self.service.decrypt and self.service.encrypt)
ensure you persist the recomputed size_compressed: after setting
blob.raw_content = encrypted and blob.encryption_key_id = new_key_id, call
blob.save(update_fields=["raw_content", "encryption_key_id", "size_compressed"])
so the recalculated size_compressed is written to the DB (match the approach
used in 0007_blob_size_compressed.py).
- Around line 280-282: Move the local imports of pyzstd and
CompressionTypeChoices out of the function and place them at the top of the
module alongside the existing BlobStorageLocationChoices import so they are
module-level imports; update the import section to include "import pyzstd" and
"from core.enums import CompressionTypeChoices" and remove the in-function
imports where pyzstd and CompressionTypeChoices are currently referenced in
verify_tiered_storage logic.
In `@src/backend/core/signals.py`:
- Around line 109-119: The post_delete signal handler cleanup_blob_storage
currently calls TieredStorageService().delete_if_orphaned() immediately, which
can run inside a transaction that may roll back; wrap the deletion call in
transaction.on_commit to defer it until after successful commit: import
django.db.transaction if needed, capture the instance.sha256 (or
bytes(instance.sha256)) and any required storage_location check inside
cleanup_blob_storage, create a small closure or lambda that calls
TieredStorageService().delete_if_orphaned(...) and register it with
transaction.on_commit, and ensure the existing guard checks
(BlobStorageLocationChoices.OBJECT_STORAGE and service.enabled) are preserved
before scheduling the on_commit callback.
In `@src/backend/core/tests/services/test_tiered_storage.py`:
- Line 114: Several intentional local test imports (e.g. "from
cryptography.fernet import InvalidToken" and the other test-only imports flagged
at the comment's listed locations) are meant to remain inside test functions;
add a trailing "# noqa: PLC0415" to each of those local import lines instead of
moving them to module level so Ruff/Pylint stops reporting
import-outside-toplevel while keeping the imports local to their tests. Ensure
you update each flagged import line (the ones referenced in the review) by
appending the exact comment "# noqa: PLC0415".
🧹 Nitpick comments (2)
src/backend/core/tests/services/test_tiered_storage.py (1)
265-563: Consider skipping E2E tests when object storage isn’t configured.The docstring says “when available,” but these tests hard‑fail if storage isn’t configured. A
skipifmakes local/dev runs more resilient.🔧 Suggested refactor
+# At module level +_STORAGE_ENABLED = TieredStorageService().enabled @@ -@pytest.mark.django_db -class TestTieredStorageE2E: +@pytest.mark.django_db +@pytest.mark.skipif( + not _STORAGE_ENABLED, reason="Object storage not configured" +) +class TestTieredStorageE2E:src/backend/core/tests/tasks/test_tiered_storage_tasks.py (1)
60-194: Consider skipping E2E task tests when object storage isn’t configured.These E2E tests will fail in environments without MinIO. A
skipifkeeps dev/test runs resilient while still exercising coverage in CI.🔧 Suggested refactor
+# At module level +_STORAGE_ENABLED = TieredStorageService().enabled @@ -@pytest.mark.django_db -class TestOffloadBlobsTaskE2E: +@pytest.mark.django_db +@pytest.mark.skipif( + not _STORAGE_ENABLED, reason="Object storage not configured" +) +class TestOffloadBlobsTaskE2E: @@ -@pytest.mark.django_db -class TestOffloadSingleBlobTaskE2E: +@pytest.mark.django_db +@pytest.mark.skipif( + not _STORAGE_ENABLED, reason="Object storage not configured" +) +class TestOffloadSingleBlobTaskE2E:Also applies to: 221-399
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 475-535: The code currently locks and updates only the single Blob
row but the storage object is shared by all Blobs with the same sha256; fix by
selecting and locking the entire cohort (all Blob rows that share the sha256 and
OBJECT_STORAGE) inside the transaction, verify none already have
encryption_key_id == target_key_id (or treat as already-promoted if they do),
then update every locked row's encryption_key_id to new_key_id before calling
transaction.on_commit(_promote_temp). Use a select_for_update() queryset
filtered by blob.sha256 (and storage type/field used for OBJECT_STORAGE) instead
of Blob.objects.select_for_update().get(...), and perform a bulk update or
iterate the locked rows to set and save encryption_key_id so all siblings remain
readable after promotion.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 179-190: The dedup fast-path returns the existing
encryption_key_id without verifying the underlying object is present; change the
logic in the branch that uses check_already_uploaded(sha256_bytes) so it
computes the canonical storage key via compute_storage_key(sha256_bytes) and
verifies the object exists in the storage backend (e.g., storage.exists(key) or
trying to open/read the object) before calling
get_existing_key_id(sha256_bytes); if the object is missing or unreadable, fall
through to the upload path that uses blob.raw_content and storage.save(...) so
we don't convert the row to OBJECT_STORAGE pointing at nothing. Ensure you
reference check_already_uploaded, compute_storage_key, get_existing_key_id,
storage.exists/open, and storage.save in the change.
- Around line 47-61: compute_storage_key currently only uses the SHA of the
uncompressed payload, so blobs with identical source bytes but different
compression metadata collide; change compute_storage_key to include the storage
representation (e.g., compression) in the key or in the input used to derive the
key (either by accepting a compression/representation parameter and returning
something like "blobs/{compression}/{sha[:3]}/{sha}" or by computing the SHA
over the final compressed bytes), and update every call site (notably
upload_blob and the call sites around lines 187-190) to pass the representation
so deduplication is based on both content and representation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 966620c0-2a4d-4de7-be72-51978a4576ac
📒 Files selected for processing (6)
src/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/signals.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.py
0706ad8 to
672d2e6
Compare
There was a problem hiding this comment.
Actionable comments posted: 8
♻️ Duplicate comments (1)
src/backend/core/services/tiered_storage.py (1)
74-88:⚠️ Potential issue | 🔴 CriticalInclude compression in the storage identity.
sha256is computed from the uncompressed payload, but the stored object is the compressed/encrypted representation. Two blobs with identical source bytes and differentcompressionvalues will collide on the same key, dedup together inupload_blob(), and later one of them will read back bytes that do not match its own metadata. Key derivation/dedup needs to include the storage representation, or uploads need to canonicalize representation before writing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage.py` around lines 74 - 88, compute_storage_key currently derives the object key only from the source SHA256 (sha256_bytes) which ignores the chosen compression/encryption, causing different storage representations to collide; update key derivation to include the storage representation (e.g., incorporate the compression string/enum and any representation-specific metadata into the input used to derive the key) or alternatively ensure upload_blob canonicalizes the bytes (compress/encrypt) before computing the SHA used by compute_storage_key so keys reflect the actual stored bytes; refer to compute_storage_key and upload_blob when making the change so the key always encodes the compression parameter (or is computed from post-compression bytes).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 332-356: The current loop limits raw queryset rows, but
OBJECT_STORAGE re-encrypts entire sha256 cohorts in
_re_encrypt_object_storage_blob, so change the logic in the command to build a
worklist of unique re-encryption work-units first (each work-unit represents
either a single Postgres row or a unique object-storage cohort identified by
sha256/object key), deduplicate those units, then apply the --limit to that
worklist before processing; update the code paths around Blob, queryset,
_re_encrypt_single_blob and _re_encrypt_object_storage_blob to consult the
prebuilt worklist (not the raw queryset) when counting, printing "Blobs to
re-encrypt", and iterating, and ensure current_key_id and OBJECT_STORAGE checks
are used to determine whether a row maps to a cohort work-unit.
In
`@src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py`:
- Around line 13-17: The new migrations.AddField for model_name='blob' adding
name='encryption_key_id' should create an indexed column and also add a
composite index with storage_location to support key-rotation queries; update
the migration to set db_index=True on the SmallIntegerField for
encryption_key_id (the migrations.AddField entry) and add a migrations.AddIndex
entry that creates an index on ('encryption_key_id','storage_location') (or
equivalent Index/fields tuple) so queries filtering by encryption_key_id and
storage_location use an index.
In `@src/backend/core/models.py`:
- Around line 2189-2201: The new tiered-storage fields (storage_location,
created_at, size, sha256) need composite DB indexes to avoid broad scans; update
the Blob model by adding appropriate Meta.indexes entries (e.g., an index on
("storage_location","created_at","size") for offload scans and an index on
("sha256","storage_location") for dedup/orphan checks) and create a Django
migration that adds these same composite indexes to the database so the ORM and
DB stay in sync; reference the Blob class/Blob.Meta and the existing migration
pattern for adding indexes when implementing the change.
In `@src/backend/core/services/tiered_storage_tasks.py`:
- Around line 78-91: The second Blob lookup inside the transaction/lock (the
select_for_update().get(id=blob_id) call in tiered_storage_tasks.py) can raise
Blob.DoesNotExist if the row was deleted after the initial sha256 lookup; wrap
that select_for_update().get(...) in a try/except catching Blob.DoesNotExist and
return {"status": "not_found", "blob_id": blob_id} instead of letting it bubble
to the broad exception handler; apply the same fix to the other similar block
later in this file (the other select_for_update().get usage around the offload
completion code) so both places avoid logger.exception noise and return
not_found on concurrent delete.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 303-310: The current delete_if_orphaned implementation swallows
all exceptions from storage.delete and returns False, preventing
cleanup_orphaned_blob_task from retrying transient errors; change
delete_if_orphaned so that storage.delete failures are not converted into a
silent False: either re-raise the caught exception (allowing
cleanup_orphaned_blob_task to catch and retry) or return a distinct error result
that cleanup_orphaned_blob_task understands as retryable. Locate
compute_storage_key, delete_if_orphaned and the call to self.storage.delete in
tiered_storage.py and update the exception handling to
propagate/storage-delete-error semantics instead of treating every exception as
"still_referenced".
In `@src/backend/core/tests/conftest.py`:
- Around line 39-47: The current bucket bootstrap incorrectly catches
client.exceptions.NoSuchBucket (which doesn’t exist) and falls back to a broad
except that masks real failures; update the try/except around
client.head_bucket() to catch botocore.exceptions.ClientError (import
botocore.exceptions as needed), inspect the error (e.response['Error']['Code']
or HTTPStatus in e.response['ResponseMetadata']) and only call
client.create_bucket(Bucket=bucket_name) when the error indicates the bucket is
missing (404 / NoSuchBucket), otherwise re-raise or log the unexpected exception
so real setup failures aren’t swallowed; replace references to
client.exceptions.NoSuchBucket with this ClientError check and ensure
client.create_bucket remains the recovery path.
In `@src/backend/core/tests/tasks/test_task_send_message.py`:
- Line 3: Remove the module-level "# pylint:
disable=no-value-for-parameter,unused-argument" and instead apply scoped
disables at the exact offending sites: add "# pylint:
disable=no-value-for-parameter" inline on the direct task invocation lines (the
direct task call sites in this test file) and add "# pylint:
disable=unused-argument" either inline on the specific test function signatures
or on the unused fixture parameter names where they are declared; ensure only
those lines reference the disables so future
unused-argument/no-value-for-parameter issues elsewhere in the file are not
globally suppressed.
In `@src/backend/messages/settings.py`:
- Around line 295-320: The "message-blobs" storage entry is always present in
settings.STORAGES which makes TieredStorageService.enabled truthy even when no
object-store is configured; change settings.py so the "message-blobs" key is
only added when real object-storage config is provided (e.g. check relevant env
vars such as STORAGE_MESSAGE_BLOBS_BUCKET_NAME or
STORAGE_MESSAGE_BLOBS_ENDPOINT_URL/ACCESS_KEY/SECRET in os.environ or via
values.Value().environ_name) instead of unconditionally defining the dict;
locate the "message-blobs" dict in src/backend/messages/settings.py and wrap its
creation/assignment in that conditional so
settings.STORAGES.get("message-blobs") is falsy when no blob config is supplied,
preserving the documented "empty = disabled" behavior used by
TieredStorageService.enabled.
---
Duplicate comments:
In `@src/backend/core/services/tiered_storage.py`:
- Around line 74-88: compute_storage_key currently derives the object key only
from the source SHA256 (sha256_bytes) which ignores the chosen
compression/encryption, causing different storage representations to collide;
update key derivation to include the storage representation (e.g., incorporate
the compression string/enum and any representation-specific metadata into the
input used to derive the key) or alternatively ensure upload_blob canonicalizes
the bytes (compress/encrypt) before computing the SHA used by
compute_storage_key so keys reflect the actual stored bytes; refer to
compute_storage_key and upload_blob when making the change so the key always
encodes the compression parameter (or is computed from post-compression bytes).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 59f4d311-6f3d-406c-bdf2-bf1f1057f13c
📒 Files selected for processing (23)
.github/workflows/messages.ymlMakefileenv.d/development/backend.defaultssrc/backend/core/admin.pysrc/backend/core/enums.pysrc/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.pysrc/backend/core/models.pysrc/backend/core/services/search/search.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/services/tiered_storage_tasks.pysrc/backend/core/signals.pysrc/backend/core/tests/commands/__init__.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/conftest.pysrc/backend/core/tests/services/__init__.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/__init__.pysrc/backend/core/tests/tasks/test_task_send_message.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.pysrc/backend/core/utils.pysrc/backend/messages/celery_app.pysrc/backend/messages/settings.py
| migrations.AddField( | ||
| model_name='blob', | ||
| name='encryption_key_id', | ||
| field=models.SmallIntegerField(default=0, help_text='Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])', verbose_name='encryption key ID'), | ||
| ), |
There was a problem hiding this comment.
Missing index on encryption_key_id will hurt large-scale key-rotation queries.
At the PR’s target scale, filtering blobs by encryption key can become a full-table scan. Please index this field (and ideally add a composite index with storage_location if queries combine both).
🔧 Suggested migration adjustment
migrations.AddField(
model_name='blob',
name='encryption_key_id',
- field=models.SmallIntegerField(default=0, help_text='Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])', verbose_name='encryption key ID'),
+ field=models.SmallIntegerField(
+ default=0,
+ db_index=True,
+ help_text='Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])',
+ verbose_name='encryption key ID',
+ ),
),
+ migrations.AddIndex(
+ model_name='blob',
+ index=models.Index(
+ fields=['storage_location', 'encryption_key_id'],
+ name='core_blob_storage_enc_idx',
+ ),
+ ),As per coding guidelines, src/backend/**/{models.py,migrations/**/*.py}: "Implement database indexing and query optimization (Model Meta indexes, constraints)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py`
around lines 13 - 17, The new migrations.AddField for model_name='blob' adding
name='encryption_key_id' should create an indexed column and also add a
composite index with storage_location to support key-rotation queries; update
the migration to set db_index=True on the SmallIntegerField for
encryption_key_id (the migrations.AddField entry) and add a migrations.AddIndex
entry that creates an index on ('encryption_key_id','storage_location') (or
equivalent Index/fields tuple) so queries filtering by encryption_key_id and
storage_location use an index.
| # Tiered storage fields | ||
| storage_location = models.SmallIntegerField( | ||
| "storage location", | ||
| choices=BlobStorageLocationChoices.choices, | ||
| default=BlobStorageLocationChoices.POSTGRES, | ||
| help_text="Where the blob content is stored", | ||
| db_index=True, | ||
| ) | ||
| encryption_key_id = models.SmallIntegerField( | ||
| "encryption key ID", | ||
| default=0, | ||
| help_text="Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])", | ||
| ) |
There was a problem hiding this comment.
Add composite indexes for the new tiered-storage lookups.
These new columns become the hot filter keys for the periodic offload scan (storage_location, created_at, size) and for dedup/orphan checks (sha256, storage_location). At the scale this PR targets, single-column indexes here will still drive very broad scans on messages_blob. Please add matching composite indexes in Blob.Meta and the migration.
Suggested index shape
class Meta:
db_table = "messages_blob"
verbose_name = "blob"
verbose_name_plural = "blobs"
ordering = ["-created_at"]
+ indexes = [
+ models.Index(
+ fields=["storage_location", "created_at", "size"],
+ name="msg_blob_offload_scan",
+ ),
+ models.Index(
+ fields=["sha256", "storage_location"],
+ name="msg_blob_sha_loc",
+ ),
+ ]
constraints = [As per coding guidelines, src/backend/**/{models.py,migrations/**/*.py}: Implement database indexing and query optimization (Model Meta indexes, constraints).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/models.py` around lines 2189 - 2201, The new tiered-storage
fields (storage_location, created_at, size, sha256) need composite DB indexes to
avoid broad scans; update the Blob model by adding appropriate Meta.indexes
entries (e.g., an index on ("storage_location","created_at","size") for offload
scans and an index on ("sha256","storage_location") for dedup/orphan checks) and
create a Django migration that adds these same composite indexes to the database
so the ORM and DB stay in sync; reference the Blob class/Blob.Meta and the
existing migration pattern for adding indexes when implementing the change.
| "message-blobs": { | ||
| "BACKEND": "storages.backends.s3.S3Storage", | ||
| "OPTIONS": { | ||
| "endpoint_url": values.Value( | ||
| environ_name="STORAGE_MESSAGE_BLOBS_ENDPOINT_URL", | ||
| environ_prefix=None, | ||
| ), | ||
| "bucket_name": values.Value( | ||
| "msg-blobs", | ||
| environ_name="STORAGE_MESSAGE_BLOBS_BUCKET_NAME", | ||
| environ_prefix=None, | ||
| ), | ||
| "access_key": values.Value( | ||
| environ_name="STORAGE_MESSAGE_BLOBS_ACCESS_KEY", | ||
| environ_prefix=None, | ||
| ), | ||
| "secret_key": values.Value( | ||
| environ_name="STORAGE_MESSAGE_BLOBS_SECRET_KEY", | ||
| environ_prefix=None, | ||
| ), | ||
| "region_name": values.Value( | ||
| environ_name="STORAGE_MESSAGE_BLOBS_REGION_NAME", | ||
| environ_prefix=None, | ||
| ), | ||
| }, | ||
| }, |
There was a problem hiding this comment.
Gate message-blobs registration on real object-storage config.
TieredStorageService.enabled is just bool(settings.STORAGES.get("message-blobs")), so defining this alias unconditionally here makes blob offloading look enabled even when STORAGE_MESSAGE_BLOBS_* is intentionally left empty. That breaks the documented “empty = disabled” contract and can send the offload path into a partially configured S3 backend instead of keeping blobs in Postgres.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/messages/settings.py` around lines 295 - 320, The "message-blobs"
storage entry is always present in settings.STORAGES which makes
TieredStorageService.enabled truthy even when no object-store is configured;
change settings.py so the "message-blobs" key is only added when real
object-storage config is provided (e.g. check relevant env vars such as
STORAGE_MESSAGE_BLOBS_BUCKET_NAME or
STORAGE_MESSAGE_BLOBS_ENDPOINT_URL/ACCESS_KEY/SECRET in os.environ or via
values.Value().environ_name) instead of unconditionally defining the dict;
locate the "message-blobs" dict in src/backend/messages/settings.py and wrap its
creation/assignment in that conditional so
settings.STORAGES.get("message-blobs") is falsy when no blob config is supplied,
preserving the documented "empty = disabled" behavior used by
TieredStorageService.enabled.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/backend/core/models.py (1)
2077-2102:⚠️ Potential issue | 🟠 MajorThis configurable default is still bypassed by
Mailbox.create_blob().
BlobManager.create_blob()now usescompression=Noneto mean “readMESSAGES_BLOB_COMPRESS”, butsrc/backend/core/models.py:752-778still defaultsMailbox.create_blob(..., compression=CompressionTypeChoices.ZSTD)and forwards that value unconditionally. Callers using the mailbox helper will never reach this new fallback.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/models.py` around lines 2077 - 2102, Mailbox.create_blob currently supplies CompressionTypeChoices.ZSTD (hard default) and forwards it to BlobManager.create_blob which treats compression=None as "use settings.MESSAGES_BLOB_COMPRESS", so callers of Mailbox.create_blob never hit the new configurable default; change Mailbox.create_blob to accept compression: Optional[CompressionTypeChoices] = None (or if the signature must remain, ensure it passes None when the caller didn't explicitly request a compression) and forward that None to BlobManager.create_blob so BlobManager.parse_compression_spec can apply settings.MESSAGES_BLOB_COMPRESS; update references to the Mailbox.create_blob parameter handling to distinguish "unspecified" vs explicit CompressionTypeChoices.ZSTD and only pass ZSTD when explicitly requested.
♻️ Duplicate comments (6)
src/backend/core/services/tiered_storage.py (3)
291-298:⚠️ Potential issue | 🟠 MajorPropagate delete failures so cleanup can retry them.
cleanup_orphaned_blob_task()only retries whendelete_if_orphaned()raises. Converting storage delete failures intoFalsemakes the task report"still_referenced"on transient backend errors and leaks the orphaned object until manual repair.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage.py` around lines 291 - 298, The current delete_if_orphaned implementation swallows storage.delete failures by logging and returning False, preventing cleanup_orphaned_blob_task from retrying; change the code so storage.delete failures propagate: in delete_if_orphaned (the block that calls self.compute_storage_key and self.storage.delete) remove or narrow the broad except that returns False and instead log the error and re-raise the exception (or allow it to bubble up) so cleanup_orphaned_blob_task sees the exception and will retry the orphaned object; reference functions: delete_if_orphaned, compute_storage_key, self.storage.delete, and cleanup_orphaned_blob_task.
185-192:⚠️ Potential issue | 🟠 MajorDon't log full storage keys.
These log lines expose the object path, which embeds the blob SHA and key cohort. That is sensitive storage metadata and should not be written to normal application logs; log
blob.idand the outcome instead.Based on learnings,
src/backend/**/*.py: Do not log sensitive information (tokens, passwords, financial/health data, PII).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage.py` around lines 185 - 192, The debug/info logs currently emit sensitive storage keys/paths (see logger.debug and logger.info around the dedupe/save flow); change these to avoid logging the full storage key or existing_path returned by compute_storage_key_for_blob/storage.save and instead log only non-sensitive identifiers such as blob.id and the outcome (e.g., "deduped" or "uploaded") and, if needed, existing_key_id (only if it is non-sensitive); update the logger.debug that mentions existing_path and the logger.info after storage.save to remove the path variable and replace with a brief outcome message referencing blob.id and the operation result.
100-115:⚠️ Potential issue | 🔴 CriticalDedup still ignores the stored representation.
The storage key and sibling lookup are keyed by
sha256/key_id, but the uploaded bytes areblob.raw_contentafter compression. Two blobs with identical source bytes and differentcompressionvalues will reuse the same object and one of them will later read back bytes that do not match its own compression metadata.Also applies to: 154-193
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage.py` around lines 100 - 115, compute_storage_key and the sibling lookup currently key on sha256 of the raw bytes and key_id only, but the uploaded object is the compressed/stored representation (blob.raw_content), so different compression values can collide; change the logic to compute the SHA over the actual bytes written (i.e., the compressed/stored bytes) and include the representation/compression identifier in the storage key (and sibling lookup) so keys are unique per stored format; update compute_storage_key to accept the stored-bytes or a representation tag (e.g., compression) along with key_id, and make the sibling lookup code use that same computed key/sha-of-stored-bytes so read/write use the identical namespace.src/backend/core/services/tiered_storage_tasks.py (1)
91-91:⚠️ Potential issue | 🟠 MajorTreat the locked re-fetch delete race as
not_found.The row can still disappear between the sha256 lookup and the
select_for_update().get(...)at Line 91. Right now that benign race falls into the catch-all and returns"error". CatchBlob.DoesNotExistaround the locked lookup and return{"status": "not_found", "blob_id": blob_id}instead.Based on learnings,
logger.exception(...)insrc/backend/**/*.pyis automatically reported to Sentry as an event.Also applies to: 121-123
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/tiered_storage_tasks.py` at line 91, The selective row-lock fetch using Blob.objects.select_for_update().get(id=blob_id) can raise Blob.DoesNotExist if the row was deleted after the sha256 lookup; wrap that specific call in a try/except catching Blob.DoesNotExist and return {"status": "not_found", "blob_id": blob_id} instead of falling into the generic error path (do the same for the similar select_for_update().get(...) at the block around lines 121-123); ensure you only catch Blob.DoesNotExist (not broad Exception) and avoid using logger.exception for this benign race.src/backend/messages/settings.py (1)
295-320:⚠️ Potential issue | 🟠 MajorOnly register
message-blobswhen blob storage is really configured.This alias is always present, so
TieredStorageService.enabledbecomes truthy even when blob-storage env vars are intentionally unset. That pushes offload and verification down a half-configured backend instead of keeping tiered storage disabled.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/messages/settings.py` around lines 295 - 320, The "message-blobs" storage alias is always registered which makes TieredStorageService.enabled truthy even when blob env vars are unset; change the settings population so the "message-blobs" entry is only added when required blob configuration exists (e.g., STORAGE_MESSAGE_BLOBS_BUCKET_NAME or STORAGE_MESSAGE_BLOBS_ENDPOINT_URL is present). In practice, update the code that builds the storage aliases (the dict containing "message-blobs") to conditionally insert that key only if the relevant values.Value keys (access_key/secret_key/bucket_name or region/endpoint) are set/Truthy, ensuring TieredStorageService.enabled accurately reflects a fully configured blob backend.src/backend/core/models.py (1)
2198-2243:⚠️ Potential issue | 🟠 MajorAdd composite indexes for the new tiered-storage queries.
These fields are now on the hot paths for offload scans and dedup/orphan checks, but
Blob.Metastill has no matching composite indexes. At the scale described in this PR, single-column indexes will still leave very broad scans onmessages_blob.As per coding guidelines,
src/backend/**/{models.py,migrations/**/*.py}: Implement database indexing and query optimization (Model Meta indexes, constraints).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/models.py` around lines 2198 - 2243, Add composite DB indexes to the Blob model Meta to support tiered-storage queries: update Blob.Meta to include a list of models.Index entries for the hot-path combinations referencing the actual field names—at minimum add indexes on ("storage_location", "mailbox"), ("storage_location", "maildomain") and ("storage_location", "encryption_key_id") (give each index a descriptive name); this ensures queries in BlobManager and offload/dedup/orphan scans that filter by storage_location plus owner or key use the composite indexes instead of wide single-column scans.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/backend/core/services/tiered_storage_tasks.py`:
- Around line 87-89: The broad except in the task is catching
celery.exceptions.Retry raised by self.retry() (used after sha256_advisory_lock
contention and on transient errors), preventing Celery from requeueing; add an
explicit except celery.exceptions.Retry: raise (or re-raise) immediately before
the existing generic except Exception so Retry can propagate, leaving the rest
of the error handling unchanged — target the block that uses
sha256_advisory_lock and self.retry and the subsequent broad except to insert
this handler.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 230-257: rotate_blob() currently calls self.encrypt(...) which
always uses self.active_key_id, so when a caller passes a different
target_key_id the data is encrypted with the wrong key while paths/DB are
updated for the target; fix by either passing target_key_id into the encryption
routine (add an optional parameter to self.encrypt and call
self.encrypt(decrypted, key_id=target_key_id) in both the DB-blob and
OBJECT_STORAGE branches so encryption, compute_storage_key(sha256,
target_key_id) and DB updates use the same key) or, if you intend only to
support the active key, immediately assert target_key_id == self.active_key_id
at the start of rotate_blob() and remove the misleading parameter; update calls
around encrypt, compute_storage_key, and Blob.objects.filter(...).update(...)
accordingly to keep key IDs consistent.
In `@src/backend/core/tests/commands/test_verify_tiered_storage.py`:
- Around line 322-619: The tests that call the management command with
re_encrypt=True mutate the mocked TieredStorageService but fail to patch the
settings that re_encrypt_blobs() reads (MESSAGES_BLOB_ENCRYPTION_KEYS /
MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID), so the command uses ambient Django
settings instead of the test scenario; fix each re_encrypt=True test by patching
the same settings module that re_encrypt_blobs() uses (e.g., patch
"core.services.tiered_storage.settings" or django.conf.settings) and set
MESSAGES_BLOB_ENCRYPTION_KEYS and MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID to
match the service.encryption_keys and service.active_key_id before you patch
"core.management.commands.verify_tiered_storage.TieredStorageService", ensuring
the command and the mocked TieredStorageService see the same encryption config.
In `@src/backend/core/tests/tasks/test_tiered_storage_tasks.py`:
- Around line 61-159: The age-based offload tests fail when
TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0 because offload_blobs_task filters by
size__gte; update the tests (test_queues_eligible_blobs_by_age and
test_immediate_offload_with_zero_days) to ensure created blobs meet the size
threshold (e.g., make content length >=
settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE) or apply
override_settings(TIERED_STORAGE_OFFLOAD_MIN_SIZE=0) for those tests so the
age/immediacy logic is exercised; reference the tests by name and the
offload_blobs_task and TieredStorageService.compute_storage_key_for_blob symbols
when making the change.
---
Outside diff comments:
In `@src/backend/core/models.py`:
- Around line 2077-2102: Mailbox.create_blob currently supplies
CompressionTypeChoices.ZSTD (hard default) and forwards it to
BlobManager.create_blob which treats compression=None as "use
settings.MESSAGES_BLOB_COMPRESS", so callers of Mailbox.create_blob never hit
the new configurable default; change Mailbox.create_blob to accept compression:
Optional[CompressionTypeChoices] = None (or if the signature must remain, ensure
it passes None when the caller didn't explicitly request a compression) and
forward that None to BlobManager.create_blob so
BlobManager.parse_compression_spec can apply settings.MESSAGES_BLOB_COMPRESS;
update references to the Mailbox.create_blob parameter handling to distinguish
"unspecified" vs explicit CompressionTypeChoices.ZSTD and only pass ZSTD when
explicitly requested.
---
Duplicate comments:
In `@src/backend/core/models.py`:
- Around line 2198-2243: Add composite DB indexes to the Blob model Meta to
support tiered-storage queries: update Blob.Meta to include a list of
models.Index entries for the hot-path combinations referencing the actual field
names—at minimum add indexes on ("storage_location", "mailbox"),
("storage_location", "maildomain") and ("storage_location", "encryption_key_id")
(give each index a descriptive name); this ensures queries in BlobManager and
offload/dedup/orphan scans that filter by storage_location plus owner or key use
the composite indexes instead of wide single-column scans.
In `@src/backend/core/services/tiered_storage_tasks.py`:
- Line 91: The selective row-lock fetch using
Blob.objects.select_for_update().get(id=blob_id) can raise Blob.DoesNotExist if
the row was deleted after the sha256 lookup; wrap that specific call in a
try/except catching Blob.DoesNotExist and return {"status": "not_found",
"blob_id": blob_id} instead of falling into the generic error path (do the same
for the similar select_for_update().get(...) at the block around lines 121-123);
ensure you only catch Blob.DoesNotExist (not broad Exception) and avoid using
logger.exception for this benign race.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 291-298: The current delete_if_orphaned implementation swallows
storage.delete failures by logging and returning False, preventing
cleanup_orphaned_blob_task from retrying; change the code so storage.delete
failures propagate: in delete_if_orphaned (the block that calls
self.compute_storage_key and self.storage.delete) remove or narrow the broad
except that returns False and instead log the error and re-raise the exception
(or allow it to bubble up) so cleanup_orphaned_blob_task sees the exception and
will retry the orphaned object; reference functions: delete_if_orphaned,
compute_storage_key, self.storage.delete, and cleanup_orphaned_blob_task.
- Around line 185-192: The debug/info logs currently emit sensitive storage
keys/paths (see logger.debug and logger.info around the dedupe/save flow);
change these to avoid logging the full storage key or existing_path returned by
compute_storage_key_for_blob/storage.save and instead log only non-sensitive
identifiers such as blob.id and the outcome (e.g., "deduped" or "uploaded") and,
if needed, existing_key_id (only if it is non-sensitive); update the
logger.debug that mentions existing_path and the logger.info after storage.save
to remove the path variable and replace with a brief outcome message referencing
blob.id and the operation result.
- Around line 100-115: compute_storage_key and the sibling lookup currently key
on sha256 of the raw bytes and key_id only, but the uploaded object is the
compressed/stored representation (blob.raw_content), so different compression
values can collide; change the logic to compute the SHA over the actual bytes
written (i.e., the compressed/stored bytes) and include the
representation/compression identifier in the storage key (and sibling lookup) so
keys are unique per stored format; update compute_storage_key to accept the
stored-bytes or a representation tag (e.g., compression) along with key_id, and
make the sibling lookup code use that same computed key/sha-of-stored-bytes so
read/write use the identical namespace.
In `@src/backend/messages/settings.py`:
- Around line 295-320: The "message-blobs" storage alias is always registered
which makes TieredStorageService.enabled truthy even when blob env vars are
unset; change the settings population so the "message-blobs" entry is only added
when required blob configuration exists (e.g., STORAGE_MESSAGE_BLOBS_BUCKET_NAME
or STORAGE_MESSAGE_BLOBS_ENDPOINT_URL is present). In practice, update the code
that builds the storage aliases (the dict containing "message-blobs") to
conditionally insert that key only if the relevant values.Value keys
(access_key/secret_key/bucket_name or region/endpoint) are set/Truthy, ensuring
TieredStorageService.enabled accurately reflects a fully configured blob
backend.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9ce7b1d7-830c-4186-bee0-8fdf30b9a6db
📒 Files selected for processing (13)
env.d/development/backend.defaultssrc/backend/core/apps.pysrc/backend/core/checks.pysrc/backend/core/enums.pysrc/backend/core/management/commands/verify_tiered_storage.pysrc/backend/core/models.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/services/tiered_storage_tasks.pysrc/backend/core/signals.pysrc/backend/core/tests/commands/test_verify_tiered_storage.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.pysrc/backend/messages/settings.py
| def test_re_encrypt_no_keys_configured(self): | ||
| """Test that re-encrypt fails when no keys are configured.""" | ||
| from unittest.mock import patch | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| with patch("core.services.tiered_storage.settings") as mock_settings: | ||
| mock_settings.STORAGES = {"message-blobs": {"OPTIONS": {}}} | ||
| mock_settings.MESSAGES_BLOB_ENCRYPTION_KEYS = {} | ||
| mock_settings.MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID = 0 | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| assert "No encryption keys configured" in stderr.getvalue() | ||
|
|
||
| def test_all_blobs_already_current_key(self): | ||
| """Test that re-encrypt reports success when all blobs use current key.""" | ||
| key = secrets.token_hex(32) | ||
| service = TieredStorageService() | ||
| service.encryption_keys = {"1": key} | ||
| service.active_key_id = 1 | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| blob = mailbox.create_blob(content=b"test", content_type="text/plain") | ||
|
|
||
| # Manually encrypt with key 1 | ||
| compressed = bytes(blob.raw_content) | ||
| encrypted, key_id = service.encrypt(compressed) | ||
| blob.raw_content = encrypted | ||
| blob.encryption_key_id = key_id | ||
| blob.save() | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| # Temporarily modify service in command | ||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| assert "All blobs already use the current encryption key" in stdout.getvalue() | ||
|
|
||
| def test_re_encrypt_postgres_blob(self): | ||
| """Test re-encrypting a PostgreSQL blob with real encryption.""" | ||
| import pyzstd | ||
|
|
||
| service = TieredStorageService() | ||
| old_key = secrets.token_hex(32) | ||
| new_key = secrets.token_hex(32) | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| original_content = b"test content for re-encryption" * 20 | ||
|
|
||
| # Create blob and encrypt with old key (key_id=2) | ||
| blob = mailbox.create_blob(content=original_content, content_type="text/plain") | ||
| compressed = bytes(blob.raw_content) | ||
|
|
||
| service.encryption_keys = {"2": old_key} | ||
| service.active_key_id = 2 | ||
| encrypted, key_id = service.encrypt(compressed) | ||
| blob.raw_content = encrypted | ||
| blob.encryption_key_id = key_id | ||
| blob.save() | ||
|
|
||
| # Now configure service for key rotation (new key is "1", old is "2") | ||
| service.encryption_keys = {"1": new_key, "2": old_key} | ||
| service.active_key_id = 1 | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "Re-encrypted" in output | ||
| assert "Re-encrypted: 1" in output | ||
|
|
||
| # Verify blob was updated | ||
| blob.refresh_from_db() | ||
| assert blob.encryption_key_id == 1 | ||
|
|
||
| # Verify content is still readable | ||
| decrypted = service.decrypt(bytes(blob.raw_content), blob.encryption_key_id) | ||
| assert pyzstd.decompress(decrypted) == original_content | ||
|
|
||
| @pytest.mark.django_db(transaction=True) | ||
| def test_re_encrypt_object_storage_blob(self): | ||
| """Test re-encrypting an object storage blob with real encryption.""" | ||
| import pyzstd | ||
|
|
||
| service = TieredStorageService() | ||
| old_key = secrets.token_hex(32) | ||
| new_key = secrets.token_hex(32) | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| original_content = b"test content for object storage re-encryption" * 20 | ||
|
|
||
| # Create blob and encrypt with old key (key_id=2) | ||
| blob = mailbox.create_blob(content=original_content, content_type="text/plain") | ||
| compressed = bytes(blob.raw_content) | ||
|
|
||
| service.encryption_keys = {"2": old_key} | ||
| service.active_key_id = 2 | ||
| encrypted, key_id = service.encrypt(compressed) | ||
| blob.raw_content = encrypted | ||
| blob.encryption_key_id = key_id | ||
| blob.save() | ||
|
|
||
| old_path = TieredStorageService.compute_storage_key_for_blob(blob) | ||
| new_path = TieredStorageService.compute_storage_key(bytes(blob.sha256), 1) | ||
|
|
||
| try: | ||
| # Upload to storage | ||
| service.upload_blob(blob) | ||
| blob.storage_location = BlobStorageLocationChoices.OBJECT_STORAGE | ||
| blob.raw_content = None | ||
| blob.save() | ||
|
|
||
| # Configure for key rotation | ||
| service.encryption_keys = {"1": new_key, "2": old_key} | ||
| service.active_key_id = 1 | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "Re-encrypted: 1" in output | ||
|
|
||
| # Verify blob was updated | ||
| blob.refresh_from_db() | ||
| assert blob.encryption_key_id == 1 | ||
|
|
||
| # Verify content moved from old path to new path | ||
| assert not service.storage.exists(old_path) | ||
| assert service.storage.exists(new_path) | ||
| downloaded = service.download_blob(blob) | ||
| assert pyzstd.decompress(downloaded) == original_content | ||
| finally: | ||
| for k in (old_path, new_path): | ||
| if service.storage.exists(k): | ||
| service.storage.delete(k) | ||
|
|
||
| def test_dry_run(self): | ||
| """Test that --dry-run shows what would be done without changes.""" | ||
| service = TieredStorageService() | ||
| key = secrets.token_hex(32) | ||
| service.encryption_keys = {"1": key} | ||
| service.active_key_id = 1 | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| blob = mailbox.create_blob(content=b"test", content_type="text/plain") | ||
| # key_id=0 means unencrypted, needs re-encryption | ||
| blob.encryption_key_id = 0 | ||
| blob.save() | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| dry_run=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "DRY RUN" in output | ||
| assert "Would re-encrypt" in output | ||
|
|
||
| # Verify blob was NOT modified | ||
| blob.refresh_from_db() | ||
| assert blob.encryption_key_id == 0 | ||
|
|
||
| def test_re_encrypt_with_limit(self): | ||
| """Test that --limit restricts number of blobs re-encrypted.""" | ||
| service = TieredStorageService() | ||
| key = secrets.token_hex(32) | ||
| service.encryption_keys = {"1": key} | ||
| service.active_key_id = 1 | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
|
|
||
| # Create 3 blobs with key_id=0 | ||
| for i in range(3): | ||
| blob = mailbox.create_blob( | ||
| content=f"test content {i}".encode(), | ||
| content_type="text/plain", | ||
| ) | ||
| blob.encryption_key_id = 0 | ||
| blob.save() | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| limit=2, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "Blobs to re-encrypt: 2" in output | ||
|
|
||
| def test_re_encrypt_skips_blob_without_content(self): | ||
| """Test that re-encrypt skips PostgreSQL blobs with no content.""" | ||
| service = TieredStorageService() | ||
| key = secrets.token_hex(32) | ||
| service.encryption_keys = {"1": key} | ||
| service.active_key_id = 1 | ||
|
|
||
| mailbox = factories.MailboxFactory() | ||
| blob = mailbox.create_blob(content=b"test", content_type="text/plain") | ||
| blob.encryption_key_id = 0 | ||
| blob.raw_content = None # Simulate missing content | ||
| blob.save() | ||
|
|
||
| stdout = StringIO() | ||
| stderr = StringIO() | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| with patch( | ||
| "core.management.commands.verify_tiered_storage.TieredStorageService" | ||
| ) as mock_svc_class: | ||
| mock_svc_class.return_value = service | ||
|
|
||
| call_command( | ||
| "verify_tiered_storage", | ||
| re_encrypt=True, | ||
| stdout=stdout, | ||
| stderr=stderr, | ||
| ) | ||
|
|
||
| output = stdout.getvalue() | ||
| assert "Skipped: 1" in output | ||
| # Blob row left unchanged. | ||
| blob.refresh_from_db() | ||
| assert blob.encryption_key_id == 0 | ||
| assert blob.raw_content is None |
There was a problem hiding this comment.
The --re-encrypt tests are configuring the wrong source of truth.
re_encrypt_blobs() branches on django.conf.settings.MESSAGES_BLOB_ENCRYPTION_* before it uses the mocked TieredStorageService. In this block, the tests mostly mutate the returned service instance but never override those Django settings, so they depend on ambient test settings instead of the scenario each test is trying to cover.
🛠️ Suggested pattern
- with patch(
- "core.management.commands.verify_tiered_storage.TieredStorageService"
- ) as mock_svc_class:
+ with override_settings(
+ MESSAGES_BLOB_ENCRYPTION_KEYS={"1": key},
+ MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID=1,
+ ), patch(
+ "core.management.commands.verify_tiered_storage.TieredStorageService"
+ ) as mock_svc_class:
mock_svc_class.return_value = serviceApply the same idea to the other re_encrypt=True cases so the command and the mocked service see the same encryption configuration.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/tests/commands/test_verify_tiered_storage.py` around lines
322 - 619, The tests that call the management command with re_encrypt=True
mutate the mocked TieredStorageService but fail to patch the settings that
re_encrypt_blobs() reads (MESSAGES_BLOB_ENCRYPTION_KEYS /
MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID), so the command uses ambient Django
settings instead of the test scenario; fix each re_encrypt=True test by patching
the same settings module that re_encrypt_blobs() uses (e.g., patch
"core.services.tiered_storage.settings" or django.conf.settings) and set
MESSAGES_BLOB_ENCRYPTION_KEYS and MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID to
match the service.encryption_keys and service.active_key_id before you patch
"core.management.commands.verify_tiered_storage.TieredStorageService", ensuring
the command and the mocked TieredStorageService see the same encryption config.
| def test_queues_eligible_blobs_by_age(self): | ||
| """Test that task queues blobs older than cutoff date.""" | ||
| mailbox = factories.MailboxFactory() | ||
|
|
||
| # Create an old blob (should be queued) | ||
| old_blob = mailbox.create_blob( | ||
| content=b"old content", content_type="text/plain" | ||
| ) | ||
| Blob.objects.filter(id=old_blob.id).update( | ||
| created_at=now() | ||
| - timedelta(days=settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS + 1) | ||
| ) | ||
|
|
||
| # Create a new blob (should not be queued) | ||
| new_blob = mailbox.create_blob( | ||
| content=b"new content", content_type="text/plain" | ||
| ) | ||
|
|
||
| # Mock the delay call to track what gets queued | ||
| queued_ids = [] | ||
| with patch.object( | ||
| offload_single_blob_task, | ||
| "delay", | ||
| side_effect=queued_ids.append, | ||
| ): | ||
| result = offload_blobs_task() | ||
|
|
||
| assert result["status"] == "success" | ||
| assert str(old_blob.id) in queued_ids | ||
| assert str(new_blob.id) not in queued_ids | ||
|
|
||
| def test_queues_eligible_blobs_by_size(self): | ||
| """Test that task respects minimum size threshold.""" | ||
| mailbox = factories.MailboxFactory() | ||
|
|
||
| # Create a small blob (may or may not be queued depending on OFFLOAD_MIN_SIZE) | ||
| small_blob = mailbox.create_blob(content=b"small", content_type="text/plain") | ||
| Blob.objects.filter(id=small_blob.id).update( | ||
| created_at=now() | ||
| - timedelta(days=settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS + 1) | ||
| ) | ||
|
|
||
| # Create a large blob (should be queued if old enough) | ||
| large_content = b"x" * (settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE + 1000) | ||
| large_blob = mailbox.create_blob( | ||
| content=large_content, content_type="text/plain" | ||
| ) | ||
| Blob.objects.filter(id=large_blob.id).update( | ||
| created_at=now() | ||
| - timedelta(days=settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS + 1) | ||
| ) | ||
|
|
||
| queued_ids = [] | ||
| with patch.object( | ||
| offload_single_blob_task, | ||
| "delay", | ||
| side_effect=queued_ids.append, | ||
| ): | ||
| result = offload_blobs_task() | ||
|
|
||
| assert result["status"] == "success" | ||
| assert str(large_blob.id) in queued_ids | ||
|
|
||
| # Small blob should only be queued if min_size is 0 | ||
| if settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0: | ||
| assert str(small_blob.id) not in queued_ids | ||
|
|
||
| @override_settings(TIERED_STORAGE_OFFLOAD_AFTER_DAYS=0) | ||
| def test_immediate_offload_with_zero_days(self): | ||
| """Test that OFFLOAD_AFTER_DAYS=0 offloads blobs immediately. | ||
|
|
||
| With CELERY_TASK_ALWAYS_EAGER=True (test settings), calling .delay() | ||
| executes the task synchronously, simulating a worker running alongside. | ||
| """ | ||
| service = TieredStorageService() | ||
| mailbox = factories.MailboxFactory() | ||
| content = b"immediate offload test content" * 20 | ||
| blob = mailbox.create_blob(content=content, content_type="text/plain") | ||
| storage_key = TieredStorageService.compute_storage_key_for_blob(blob) | ||
|
|
||
| try: | ||
| # No age manipulation - blob was just created | ||
| result = offload_blobs_task() | ||
|
|
||
| assert result["status"] == "success" | ||
| assert result["queued"] == 1 | ||
|
|
||
| # With CELERY_TASK_ALWAYS_EAGER, the single blob task already ran | ||
| blob.refresh_from_db() | ||
| assert blob.storage_location == BlobStorageLocationChoices.OBJECT_STORAGE | ||
| assert blob.raw_content is None | ||
|
|
||
| # Verify content is still accessible from S3 | ||
| retrieved = blob.get_content() | ||
| assert retrieved == content | ||
| finally: | ||
| if service.storage.exists(storage_key): | ||
| service.storage.delete(storage_key) | ||
|
|
There was a problem hiding this comment.
These age-based offload tests still depend on the size filter.
offload_blobs_task() also requires size__gte=settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE. Here the payloads are small, so the tests can fail without ever exercising the age/immediacy logic when the default min size is non-zero.
🛠️ Suggested fix
- def test_queues_eligible_blobs_by_age(self):
+ `@override_settings`(TIERED_STORAGE_OFFLOAD_MIN_SIZE=0)
+ def test_queues_eligible_blobs_by_age(self):
"""Test that task queues blobs older than cutoff date."""- `@override_settings`(TIERED_STORAGE_OFFLOAD_AFTER_DAYS=0)
+ `@override_settings`(
+ TIERED_STORAGE_OFFLOAD_AFTER_DAYS=0,
+ TIERED_STORAGE_OFFLOAD_MIN_SIZE=0,
+ )
def test_immediate_offload_with_zero_days(self):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/tests/tasks/test_tiered_storage_tasks.py` around lines 61 -
159, The age-based offload tests fail when TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0
because offload_blobs_task filters by size__gte; update the tests
(test_queues_eligible_blobs_by_age and test_immediate_offload_with_zero_days) to
ensure created blobs meet the size threshold (e.g., make content length >=
settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE) or apply
override_settings(TIERED_STORAGE_OFFLOAD_MIN_SIZE=0) for those tests so the
age/immediacy logic is exercised; reference the tests by name and the
offload_blobs_task and TieredStorageService.compute_storage_key_for_blob symbols
when making the change.
This allows to use S3-compatible object storage to offload blobs, making Postgres much lighter. We design for storing ~1B emails on a single instance.
…f sensitive information' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
…f sensitive information' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
| Blobs (raw RFC822 email bodies and attachments) live in PostgreSQL by | ||
| default. Once a blob is older than `TIERED_STORAGE_OFFLOAD_AFTER_DAYS`, | ||
| a periodic celery task moves its bytes to S3 and clears the PG row's | ||
| `raw_content`. Reads transparently fetch from whichever location the | ||
| row points at — application code only ever calls `blob.get_content()`. | ||
|
|
There was a problem hiding this comment.
Have you concidered moving message body onto the object store while keeping headers around for longer?
This is generally performance critical (no object store hit and no heavy mail parsing) on email list load, while actually it is OK to store headers around in DB for much longer (months?)
There was a problem hiding this comment.
Hi! We extract enough metadata from the headers to the DB (and to OpenSearch) to be able to have all email lists load without hitting the mime data, so I think it's already the case even if we don't store all headers?
There was a problem hiding this comment.
Great, that will do the job actually I presume.
With all these data, what is the actual footprint of a typical message once tiered in the DB ?
| - **Deduplication**: blobs sharing the same SHA-256 share the same S3 | ||
| object. The DB is the source of truth; the existence check on S3 is | ||
| a defensive guard against external deletions. |
There was a problem hiding this comment.
Nice but how is this GCed if data is deleted ?
There was a problem hiding this comment.
We removed the existence check on S3 - external deletions on S3 are not a risk in the scope
There was a problem hiding this comment.
Actionable comments posted: 17
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
Makefile (1)
361-367:⚠️ Potential issue | 🟡 Minor | ⚡ Quick win
start-e2eis missingmessage-blobsbucket creation.
create-bucketsprovisions both buckets, butstart-e2eonly createsmessage-imports. Any e2e scenario that exercises tiered-storage blob offload will fail with a no-such-bucket error.🛠️ Proposed fix
start-e2e: ## Start e2e services (migrate, seed, etc.) `@echo` "$(BLUE)\n\n| 🔧 Setting up E2E services... \n$(RESET)" @$(COMPOSE_E2E) run --rm backend python manage.py create_bucket --storage message-imports --expire-days 1 + @$(COMPOSE_E2E) run --rm backend python manage.py create_bucket --storage message-blobs @$(COMPOSE_E2E) run --rm backend python manage.py migrate --noinput🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@Makefile` around lines 361 - 367, The start-e2e Makefile target omits creating the message-blobs bucket, causing tiered-storage tests to fail; update the start-e2e recipe (target: start-e2e) to run the same create_bucket invocation used for message-imports but for --storage message-blobs (i.e., add a line invoking $(COMPOSE_E2E) run --rm backend python manage.py create_bucket --storage message-blobs --expire-days 1 before migrate/search_index_create) so both buckets are provisioned during e2e setup.src/backend/core/factories.py (1)
372-395:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftMake upload reservations opt-in in
BlobFactory.
mailbox=is still the natural argument for “create a blob related to this mailbox”, but this branch now turns every such factory call into a live upload reservation. That leaks into non-upload fixtures in this module and can keep blobs reachable/GC-ineligible until the reservation expires, which makes lifecycle tests less trustworthy. Please keep plain factory creation side-effect free and add an explicit opt-in for upload-style fixtures.Possible direction
class BlobFactory(factory.django.DjangoModelFactory): @@ `@classmethod` def _create(cls, model_class, *args, **kwargs): content = kwargs.pop("content") content_type = kwargs.pop("content_type", "application/octet-stream") mailbox = kwargs.pop("mailbox", None) - if mailbox is not None: + reserve_upload = kwargs.pop("reserve_upload", False) + if reserve_upload: + if mailbox is None: + raise ValueError("reserve_upload=True requires mailbox=...") return upload_and_reserve_blob(mailbox, content, content_type) return models.Blob.objects.create_blob( content=content, content_type=content_type )Then only the upload-path fixtures/tests would pass
reserve_upload=True.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/backend/core/factories.py` around lines 372 - 395, BlobFactory currently treats any call with mailbox=... as an upload reservation; change _create to make reservations opt-in by adding and popping a reserve_upload (or reserve) boolean kwarg defaulting to False, keep mailbox as the way to associate a mailbox but only call upload_and_reserve_blob(mailbox, content, content_type) when mailbox is not None AND reserve_upload is True; otherwise call models.Blob.objects.create_blob(content=..., content_type=...) so plain factory creation remains side-effect free; update any tests/fixtures that need the upload path to pass reserve_upload=True when invoking BlobFactory or its helpers.
♻️ Duplicate comments (2)
src/backend/core/services/tiered_storage.py (2)
321-329:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon't reuse a sibling unless the backing object still exists.
Returning the sibling's
(key_id, compression)here makes the caller clearraw_contentand flip the row toOBJECT_STORAGEwithout re-uploading anything. If that sibling row is stale and its bucket object has already been lost, this path throws away the last good copy of the blob. Verify the canonical key exists before short-circuiting; otherwise fall through tostorage.save(...).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/backend/core/services/tiered_storage.py` around lines 321 - 329, The current dedupe short-circuit uses get_existing_sibling(sha256_bytes) and returns its (key_id, compression) without verifying the backing object exists; instead, after obtaining sibling (existing_key_id, existing_compression) call the storage backend check (e.g., a head/exists method on self.storage or equivalent) for that key_id/canonical object before returning; if the object check fails, do not return the sibling—fall through to the normal upload path (storage.save(...)) so the blob is re-uploaded and the row can be safely flipped to OBJECT_STORAGE. Ensure you reference get_existing_sibling, the sibling tuple, and the storage existence check when implementing this guard.
193-194:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon't disable IAM-based S3 configurations.
This gate stays false unless
endpoint_urloraccess_keyis set, but a normal AWS S3 setup can rely on instance/role credentials and only providebucket_name/region. In that deployment the whole offload path silently stays disabled. Gate on the presence of the blob storage config itself, or at leastbucket_name, instead.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/backend/core/services/tiered_storage.py` around lines 193 - 194, The feature gate currently sets self.enabled = bool(opts.get("endpoint_url") or opts.get("access_key")), which disables IAM-based S3 because it requires endpoint_url or access_key; update the check to enable the offload when the blob storage config is present (or at least a bucket is configured). Replace the self.enabled assignment (using the existing opts variable) with a check like bool(opts.get("bucket_name")) or fallback to bool(opts) so that instance/role credential setups (no access_key/endpoint_url) are enabled; keep the rest of the code and only change how self.enabled is computed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/tiered-storage.md`:
- Around line 106-120: The fenced shell code blocks in the tiered-storage doc
(the ```sh blocks showing openssl rand and MESSAGES_BLOBS_ENCRYPT_KEYS) need a
blank line before and after each fenced block to satisfy markdownlint MD031;
update each occurrence of those fenced snippets so there is an empty line
immediately above the opening ```sh and immediately below the closing ``` (apply
the same change to the repeated shell snippets throughout the document).
- Around line 299-300: The runbook step referencing a "same-sha sibling row" is
inconsistent with the deduplicated Blob model; replace that sentence so
operators are instructed to attempt recovery only via supported mechanisms: if
the DB row's storage_location is OBJECT_STORAGE but the S3 object is missing,
first check for a full system backup and restore the blob into S3 and verify
with `verify --mode=db-to-storage`; if no backup exists, attempt to re-ingest
the original content from the upstream source that produced the Blob (or
reconstruct it from application logs), and only if neither is possible mark the
DB row as unrecoverable (add a status/annotation and create an incident) rather
than copying `raw_content` from a non-existent sibling; update references to
`storage_location=POSTGRES`, `raw_content`, and the `Blob` record in the same
paragraph to reflect these supported recovery options.
In `@src/backend/core/admin.py`:
- Around line 1130-1136: The current exception handler for blob.get_content uses
redirect(".") which returns the user to the POST-only /download/ endpoint
causing a 405; update the redirect to send the user back to the blob change form
instead. In the except block that catches exceptions from blob.get_content
(where logging.exception, capture_exception, and messages.error are called),
replace redirect(".") with a redirect to the admin change view for that blob
(use Django's reverse with the appropriate admin change URL name and the blob's
id or the existing helper that builds the blob change URL) so the flashed error
is shown on the blob edit page.
In `@src/backend/core/api/viewsets/blob.py`:
- Around line 128-135: The current response rounds
settings.MAX_OUTGOING_ATTACHMENT_SIZE to 0 MB for small limits, producing
misleading "max 0 MB" messages; change the formatting logic in the blob upload
handler (the block using uploaded_file.size and
settings.MAX_OUTGOING_ATTACHMENT_SIZE that builds the Response) to display the
limit in a sensible unit: compute max_bytes =
settings.MAX_OUTGOING_ATTACHMENT_SIZE, then if max_bytes >= 1024*1024 format as
f"{max_bytes/(1024*1024):.0f} MB", elif max_bytes >= 1024 format as
f"{max_bytes/1024:.1f} KB", else format as f"{max_bytes} bytes"; use that string
in the error message instead of always using max_mb with .0f.
In `@src/backend/core/checks.py`:
- Around line 16-20: This file imports module-private symbols _MIN_KEY_LEN and
_decode_key from core.services.tiered_storage; replace that coupling by either
(A) promoting those symbols in tiered_storage to public names (e.g., MIN_KEY_LEN
and decode_key) and then import the public names, or (B) add a single public
helper validate_key_entry(...) inside core.services.tiered_storage that
encapsulates the checks currently done with _MIN_KEY_LEN, _decode_key and
normalize_key_entry, then here remove imports of _MIN_KEY_LEN and _decode_key
and call validate_key_entry(...) (or import the newly public
MIN_KEY_LEN/decode_key if you chose promotion); update any call sites in this
file that reference _MIN_KEY_LEN or _decode_key to use the new public API
(validate_key_entry or MIN_KEY_LEN/decode_key) so no private
(leading-underscore) symbols are imported.
In `@src/backend/core/management/commands/re_store_blobs.py`:
- Around line 189-232: _build_worklist currently materializes entire querysets
into memory by calling list(...) for postgres_ids and object_storage_ids; change
it to stream IDs instead (use QuerySet.iterator(chunk_size=...) or
values_list(...).iterator()) and build worklist incrementally until self.limit
(respecting restore_to_pg branch and the distinct("sha256","encryption_key_id")
cohort case) so you never load all IDs at once—replace the list(...) +
concatenation with loops that extend a local worklist (or yield IDs) from
postgres_qs.iterator() then object_storage_qs.iterator(), stopping as soon as
self.limit (>0) is reached; alternatively set a safe default for self.limit if
you want an ops-safety guard.
- Around line 151-170: The loop currently does an unlocked per-row fetch using
Blob.objects.get(id=blob_id) before calling _process_one, which then
re-fetches/refreshes the Blob under the advisory lock; remove the wasted
unlocked fetch and pass blob_id into _process_one instead so the method is
responsible for fetching the row under transaction.atomic() (using
Blob.objects.select_for_update().get(id=blob_id) or refresh_from_db inside the
lock) and for all lock-based checks (e.g., sha256 used to acquire the advisory
lock); update the loop to increment counts based on _process_one's return value
and keep the existing exception handling.
In `@src/backend/core/management/commands/verify_blobs.py`:
- Around line 171-180: The path parser in verify_blobs.py currently only checks
parts length and "blobs" prefix then converts parts[3] to bytes, allowing inputs
like blobs/1/zzz/<valid_sha> to slip through; update the try block that parses
obj_name (variables parts, key_id, sha_bytes) to also validate that parts[3] has
length 64 and that parts[2] == parts[3][:3] before calling bytes.fromhex,
raising ValueError when those checks fail so malformed shard/sha combos
increment invalid_count and produce the INVALID PATH warning via
self.stdout.write.
- Around line 159-163: Replace the silent return in the exception handler for
the storage listing with a raised CommandError so the command exits non-zero: in
the try/except around the call to self._list_storage_objects("blobs/") update
the except block to raise CommandError(f"Failed to list storage objects: {e}")
instead of calling return, and add CommandError to the imports from
django.core.management.base so the symbol is available.
In `@src/backend/core/mda/outbound.py`:
- Around line 411-415: In the MIME composition except block (the one catching
drf.exceptions.ValidationError then a generic except Exception as e in
src/backend/core/mda/outbound.py around the MIME composition code that logs
"Failed to compose MIME for message %s: %s"), replace the current logger.error
call with a logger.exception (or logger.error(..., exc_info=True)) so the full
traceback is preserved and sent to Sentry; keep the existing return False
behavior and the surrounding exception handling (including the
drf.exceptions.ValidationError pass-through) intact.
In
`@src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py`:
- Around line 272-281: The migration currently uses
core.models._default_upload_expiry as the default for the expires_at
DateTimeField which couples the migration to application code; define a local
callable named _default_upload_expiry at the top of this migration (matching the
pattern used for split_multi_message_attachments) that imports
django.utils.timezone and datetime.timedelta, computes timezone.now() +
timedelta(hours=1) (or the same TTL used elsewhere) and replace
default=core.models._default_upload_expiry with default=_default_upload_expiry
on the expires_at models.DateTimeField so the migration is self-contained and
immutable.
In `@src/backend/core/models.py`:
- Around line 2198-2232: user_can_access performs a deep OR'ed join (Message
blob_id OR draft_blob_id → Thread → ThreadAccess → Mailbox → MailboxAccess) that
will be very slow at scale; replace the final Message-based branch with a
mailbox-scoped EXISTS pattern: first compute a subquery of mailbox ids the user
can access via MailboxAccess/ThreadAccess (i.e. a MailboxAccess-filtered mailbox
id set using the Thread→ThreadAccess→Mailbox link), then check EXISTS against
each referencing table separately (Message with blob_id, Message with
draft_blob_id, Attachment, MessageTemplate, MailboxBlob) using that mailbox id
set — or alternatively implement a covering index on Message(blob_id,
draft_blob_id) if schema change is acceptable; update the Message-related check
in user_can_access accordingly (referencing MailboxBlob, Attachment,
MessageTemplate, Message, ThreadAccess, MailboxAccess symbols).
- Around line 2157-2165: The create_blob (caller via upload_and_reserve_blob)
forwards unfiltered **kwargs into self.create(...) which can collide with
explicit parameters (sha256, size, content_type, compression, raw_content,
encryption_key_id) and raise TypeError; fix this by filtering/whitelisting the
kwargs before calling self.create — e.g., build a new dict of allowed optional
fields (exclude reserved names like "sha256", "size", "content_type",
"compression", "raw_content", "encryption_key_id") or explicitly pop those keys
from kwargs, then pass only the safe dict into the self.create(...) call
referenced in the diff so no duplicate keyword arguments can occur.
In `@src/backend/core/services/importer/eml_tasks.py`:
- Around line 73-81: The code assumes an S3 client by accessing
message_imports_storage.connection.meta.client which raises AttributeError for
non-S3 backends; update the logic to detect whether message_imports_storage
provides an S3 client (e.g., check hasattr(message_imports_storage,
"connection") and hasattr(message_imports_storage.connection, "meta") and
hasattr(message_imports_storage.connection.meta, "client")), and if present use
s3_client.get_object(...) as before to set resp and file_content, otherwise fall
back to backend-agnostic message_imports_storage.open(file_key, "rb") and read
settings.MAX_INCOMING_EMAIL_SIZE+1 bytes (matching the previous Range semantics)
into file_content; keep existing variable names (message_imports_storage,
s3_client, resp, file_content) and preserve the same error handling for genuine
S3 errors.
In `@src/backend/core/services/tiered_storage.py`:
- Around line 500-511: Deferred orphan cleanup callback (_run passed to
transaction.on_commit) calls delete_if_orphaned without any advisory lock,
risking deleting blobs recreated concurrently; modify the callback to reacquire
the per-sha lock and DB transaction before running the check-and-delete: wrap
the body of _run in a new transaction.atomic() and call sha256_advisory_lock(s)
(or alternatively add the same locking inside delete_if_orphaned()), then call
delete_if_orphaned(s, k) inside that locked/atomic block so the exists() →
storage.delete() sequence is protected; keep transaction.on_commit(_run) as the
trigger.
In `@src/backend/core/tests/services/test_tiered_storage.py`:
- Around line 1277-1288: The test currently relies on BlobFactory creating three
distinct DB rows but DB-level dedup means they collapse to one; either rename
the test to reflect the single-row behavior or explicitly seed distinct rows so
the multi-row cohort path is exercised: change the Blob creation loop using
factories.BlobFactory to ensure unique sha per row (e.g., append a small unique
nonce to content when calling BlobFactory) or bypass dedup by creating rows
directly via the Blob model (use Blob.objects.create with distinct sha values)
so the later refresh_from_db() loop operates on multiple DB rows; update
assertions accordingly (refer to BlobFactory, blobs, and refresh_from_db()).
In `@src/backend/messages/celery_app.py`:
- Around line 53-68: The scheduled tasks "offload-blobs-to-object-storage" and
"gc-orphan-blobs" currently use the "default" queue; change their "options":
{"queue": "..."} entries to use a dedicated queue name like "blob-maintenance"
(e.g., options: {"queue": "blob-maintenance"}) so long-running blob maintenance
is isolated, and ensure any operator documentation or deployment configs create
a worker with appropriate concurrency (e.g., --concurrency=1) bound to the
"blob-maintenance" queue; update both task entries referencing
core.services.tiered_storage_tasks.offload_blobs_task and
core.services.blob_gc.gc_orphan_blobs_task.
---
Outside diff comments:
In `@Makefile`:
- Around line 361-367: The start-e2e Makefile target omits creating the
message-blobs bucket, causing tiered-storage tests to fail; update the start-e2e
recipe (target: start-e2e) to run the same create_bucket invocation used for
message-imports but for --storage message-blobs (i.e., add a line invoking
$(COMPOSE_E2E) run --rm backend python manage.py create_bucket --storage
message-blobs --expire-days 1 before migrate/search_index_create) so both
buckets are provisioned during e2e setup.
In `@src/backend/core/factories.py`:
- Around line 372-395: BlobFactory currently treats any call with mailbox=... as
an upload reservation; change _create to make reservations opt-in by adding and
popping a reserve_upload (or reserve) boolean kwarg defaulting to False, keep
mailbox as the way to associate a mailbox but only call
upload_and_reserve_blob(mailbox, content, content_type) when mailbox is not None
AND reserve_upload is True; otherwise call
models.Blob.objects.create_blob(content=..., content_type=...) so plain factory
creation remains side-effect free; update any tests/fixtures that need the
upload path to pass reserve_upload=True when invoking BlobFactory or its
helpers.
---
Duplicate comments:
In `@src/backend/core/services/tiered_storage.py`:
- Around line 321-329: The current dedupe short-circuit uses
get_existing_sibling(sha256_bytes) and returns its (key_id, compression) without
verifying the backing object exists; instead, after obtaining sibling
(existing_key_id, existing_compression) call the storage backend check (e.g., a
head/exists method on self.storage or equivalent) for that key_id/canonical
object before returning; if the object check fails, do not return the
sibling—fall through to the normal upload path (storage.save(...)) so the blob
is re-uploaded and the row can be safely flipped to OBJECT_STORAGE. Ensure you
reference get_existing_sibling, the sibling tuple, and the storage existence
check when implementing this guard.
- Around line 193-194: The feature gate currently sets self.enabled =
bool(opts.get("endpoint_url") or opts.get("access_key")), which disables
IAM-based S3 because it requires endpoint_url or access_key; update the check to
enable the offload when the blob storage config is present (or at least a bucket
is configured). Replace the self.enabled assignment (using the existing opts
variable) with a check like bool(opts.get("bucket_name")) or fallback to
bool(opts) so that instance/role credential setups (no access_key/endpoint_url)
are enabled; keep the rest of the code and only change how self.enabled is
computed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 26aa2e00-a560-4695-816c-84b0d4ec3879
⛔ Files ignored due to path filters (1)
src/frontend/src/features/api/gen/blob/blob.tsis excluded by!**/gen/**
📒 Files selected for processing (60)
.github/workflows/messages.ymlMakefiledocs/architecture.mddocs/env.mddocs/tiered-storage.mdenv.d/development/backend.defaultssrc/backend/core/admin.pysrc/backend/core/api/openapi.jsonsrc/backend/core/api/serializers.pysrc/backend/core/api/viewsets/blob.pysrc/backend/core/api/viewsets/metrics.pysrc/backend/core/apps.pysrc/backend/core/checks.pysrc/backend/core/enums.pysrc/backend/core/factories.pysrc/backend/core/management/commands/delete_orphan_attachments.pysrc/backend/core/management/commands/re_store_blobs.pysrc/backend/core/management/commands/verify_blobs.pysrc/backend/core/mda/autoreply.pysrc/backend/core/mda/draft.pysrc/backend/core/mda/inbound_create.pysrc/backend/core/mda/outbound.pysrc/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.pysrc/backend/core/models.pysrc/backend/core/services/blob_gc.pysrc/backend/core/services/importer/eml_tasks.pysrc/backend/core/services/search/coalescer.pysrc/backend/core/services/search/search.pysrc/backend/core/services/tiered_storage.pysrc/backend/core/services/tiered_storage_tasks.pysrc/backend/core/signals.pysrc/backend/core/templates/admin/core/blob/change_form.htmlsrc/backend/core/tests/api/test_attachments.pysrc/backend/core/tests/api/test_draft_attachments.pysrc/backend/core/tests/api/test_inbound_mta.pysrc/backend/core/tests/api/test_mailbox_usage_metrics.pysrc/backend/core/tests/api/test_maildomain_users_metrics.pysrc/backend/core/tests/api/test_messages_delete.pysrc/backend/core/tests/commands/__init__.pysrc/backend/core/tests/commands/test_re_store_blobs.pysrc/backend/core/tests/commands/test_verify_blobs.pysrc/backend/core/tests/conftest.pysrc/backend/core/tests/exporter/test_export_task.pysrc/backend/core/tests/importer/test_file_import.pysrc/backend/core/tests/mda/test_autoreply.pysrc/backend/core/tests/mda/test_outbound.pysrc/backend/core/tests/models/test_blob.pysrc/backend/core/tests/search/test_e2e.pysrc/backend/core/tests/search/test_e2e_modifiers.pysrc/backend/core/tests/services/__init__.pysrc/backend/core/tests/services/test_tiered_storage.pysrc/backend/core/tests/tasks/__init__.pysrc/backend/core/tests/tasks/test_task_importer.pysrc/backend/core/tests/tasks/test_tiered_storage_tasks.pysrc/backend/core/tests/test_checks.pysrc/backend/core/tests/test_signals.pysrc/backend/core/utils.pysrc/backend/messages/celery_app.pysrc/backend/messages/settings.pysrc/backend/pyproject.toml
💤 Files with no reviewable changes (2)
- src/backend/core/tests/exporter/test_export_task.py
- src/backend/core/management/commands/delete_orphan_attachments.py
| 1. Generate a random secret string. Use `openssl rand` or equivalent | ||
| so the input is genuinely high-entropy: | ||
| ```sh | ||
| openssl rand -base64 32 | ||
| ``` | ||
| Startup emits a warning if any configured secret is shorter than | ||
| 32 characters. The warning is a length check, not an entropy | ||
| measurement — `"a" * 32` passes silently. Treat the floor as a | ||
| tripwire for typos, not a security guarantee. | ||
| 2. Add it to `MESSAGES_BLOBS_ENCRYPT_KEYS` as a JSON dict. Every | ||
| entry must spell out `algo` and `secret`. Add `"active": true` | ||
| to the entry whose key new blobs should be encrypted with: | ||
| ```sh | ||
| MESSAGES_BLOBS_ENCRYPT_KEYS='{"1": {"algo": "aes-gcm", "secret": "<the secret>", "active": true}}' | ||
| ``` |
There was a problem hiding this comment.
Add blank lines around fenced code blocks here and in the repeated sections below.
This section trips markdownlint MD031, and the same pattern repeats across the other shell snippets in the document. Adding blank lines before and after each fence should keep docs lint green.
🧰 Tools
🪛 markdownlint-cli2 (0.22.1)
[warning] 108-108: Fenced code blocks should be surrounded by blank lines
(MD031, blanks-around-fences)
[warning] 110-110: Fenced code blocks should be surrounded by blank lines
(MD031, blanks-around-fences)
[warning] 118-118: Fenced code blocks should be surrounded by blank lines
(MD031, blanks-around-fences)
[warning] 120-120: Fenced code blocks should be surrounded by blank lines
(MD031, blanks-around-fences)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/tiered-storage.md` around lines 106 - 120, The fenced shell code blocks
in the tiered-storage doc (the ```sh blocks showing openssl rand and
MESSAGES_BLOBS_ENCRYPT_KEYS) need a blank line before and after each fenced
block to satisfy markdownlint MD031; update each occurrence of those fenced
snippets so there is an empty line immediately above the opening ```sh and
immediately below the closing ``` (apply the same change to the repeated shell
snippets throughout the document).
| | DB row says `OBJECT_STORAGE` but S3 object missing | `verify --mode=db-to-storage` reports `MISSING` | The blob is unreadable. If a same-sha sibling row still has `raw_content` in PG, restore by setting the missing row to `storage_location=POSTGRES` with that `raw_content`. Otherwise restore from backup. | | ||
| | S3 object with no DB row | `verify --mode=storage-to-db` reports `ORPHAN` | After confirming, `aws s3 rm` the object. | |
There was a problem hiding this comment.
The same-sha sibling row recovery step contradicts the earlier dedup model.
Earlier in the doc, identical content is described as a single Blob row keyed by hash. In that model there is no second DB row to copy raw_content from, so this incident runbook points operators to a recovery path that should never exist. Please replace it with the real supported recovery path here.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/tiered-storage.md` around lines 299 - 300, The runbook step referencing
a "same-sha sibling row" is inconsistent with the deduplicated Blob model;
replace that sentence so operators are instructed to attempt recovery only via
supported mechanisms: if the DB row's storage_location is OBJECT_STORAGE but the
S3 object is missing, first check for a full system backup and restore the blob
into S3 and verify with `verify --mode=db-to-storage`; if no backup exists,
attempt to re-ingest the original content from the upstream source that produced
the Blob (or reconstruct it from application logs), and only if neither is
possible mark the DB row as unrecoverable (add a status/annotation and create an
incident) rather than copying `raw_content` from a non-existent sibling; update
references to `storage_location=POSTGRES`, `raw_content`, and the `Blob` record
in the same paragraph to reflect these supported recovery options.
| try: | ||
| content = blob.get_content() | ||
| except Exception as exc: # pylint: disable=broad-exception-caught | ||
| logging.exception("Failed to fetch blob %s for admin download", blob.id) | ||
| capture_exception() | ||
| messages.error(request, f"Failed to download blob: {exc}") | ||
| return redirect(".") |
There was a problem hiding this comment.
Redirect back to the blob change form on download failure.
redirect(".") keeps the browser on the POST-only /download/ URL. After the redirect, the browser will issue a GET there and land on 405 Method Not Allowed instead of returning to the blob page with the flashed error.
Suggested fix
try:
content = blob.get_content()
except Exception as exc: # pylint: disable=broad-exception-caught
logging.exception("Failed to fetch blob %s for admin download", blob.id)
capture_exception()
messages.error(request, f"Failed to download blob: {exc}")
- return redirect(".")
+ return redirect("..")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/backend/core/admin.py` around lines 1130 - 1136, The current exception
handler for blob.get_content uses redirect(".") which returns the user to the
POST-only /download/ endpoint causing a 405; update the redirect to send the
user back to the blob change form instead. In the except block that catches
exceptions from blob.get_content (where logging.exception, capture_exception,
and messages.error are called), replace redirect(".") with a redirect to the
admin change view for that blob (use Django's reverse with the appropriate admin
change URL name and the blob's id or the existing helper that builds the blob
change URL) so the flashed error is shown on the blob edit page.
| if uploaded_file.size > settings.MAX_OUTGOING_ATTACHMENT_SIZE: | ||
| max_mb = settings.MAX_OUTGOING_ATTACHMENT_SIZE / (1024 * 1024) | ||
| return Response( | ||
| { | ||
| "error": ( | ||
| f"File too large: {uploaded_file.size} bytes " | ||
| f"(max {max_mb:.0f} MB per attachment)." | ||
| ) |
There was a problem hiding this comment.
Don't round sub-MiB limits down to 0 MB.
With small test or self-hosted limits, this formats the cap as max 0 MB per attachment, which makes the 413 response misleading. Prefer bytes/KB for small values, or keep at least one decimal place.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/backend/core/api/viewsets/blob.py` around lines 128 - 135, The current
response rounds settings.MAX_OUTGOING_ATTACHMENT_SIZE to 0 MB for small limits,
producing misleading "max 0 MB" messages; change the formatting logic in the
blob upload handler (the block using uploaded_file.size and
settings.MAX_OUTGOING_ATTACHMENT_SIZE that builds the Response) to display the
limit in a sensible unit: compute max_bytes =
settings.MAX_OUTGOING_ATTACHMENT_SIZE, then if max_bytes >= 1024*1024 format as
f"{max_bytes/(1024*1024):.0f} MB", elif max_bytes >= 1024 format as
f"{max_bytes/1024:.1f} KB", else format as f"{max_bytes} bytes"; use that string
in the error message instead of always using max_mb with .0f.
| from core.services.tiered_storage import ( | ||
| _MIN_KEY_LEN, | ||
| _decode_key, | ||
| normalize_key_entry, | ||
| ) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Avoid importing private symbols across modules.
_MIN_KEY_LEN, _decode_key, and normalize_key_entry are imported from core.services.tiered_storage — but the leading underscores mark the first two as module-private. Reaching across modules for them couples this check file to internal implementation details and silences the convention. Either promote them in the source module (rename to public, e.g., MIN_KEY_LEN/decode_key) or expose a single validate_key_entry(...) helper from tiered_storage and call that here.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/backend/core/checks.py` around lines 16 - 20, This file imports
module-private symbols _MIN_KEY_LEN and _decode_key from
core.services.tiered_storage; replace that coupling by either (A) promoting
those symbols in tiered_storage to public names (e.g., MIN_KEY_LEN and
decode_key) and then import the public names, or (B) add a single public helper
validate_key_entry(...) inside core.services.tiered_storage that encapsulates
the checks currently done with _MIN_KEY_LEN, _decode_key and
normalize_key_entry, then here remove imports of _MIN_KEY_LEN and _decode_key
and call validate_key_entry(...) (or import the newly public
MIN_KEY_LEN/decode_key if you chose promotion); update any call sites in this
file that reference _MIN_KEY_LEN or _decode_key to use the new public API
(validate_key_entry or MIN_KEY_LEN/decode_key) so no private
(leading-underscore) symbols are imported.
| def user_can_access(self, user, blob_id) -> bool: | ||
| """Authz: can ``user`` legitimately read this blob's content? | ||
|
|
||
| A user can access a blob if any of: | ||
|
|
||
| - They have an ``Attachment`` mailbox-access for an Attachment | ||
| referencing the blob. | ||
| - They have a ``MessageTemplate`` mailbox-access for a template | ||
| referencing the blob. | ||
| - They have ``ThreadAccess`` to the thread of any Message whose | ||
| ``blob`` or ``draft_blob`` is the blob. | ||
| - The blob has an active upload reservation | ||
| (``MailboxBlob.expires_at > now()``) on a mailbox the user | ||
| can access (covers the JMAP upload-then-attach window). | ||
| """ | ||
| if user is None or not getattr(user, "is_authenticated", False): | ||
| return False | ||
|
|
||
| return ( | ||
| MailboxBlob.objects.filter( | ||
| blob_id=blob_id, | ||
| mailbox__accesses__user=user, | ||
| expires_at__gt=timezone.now(), | ||
| ).exists() | ||
| or Attachment.objects.filter( | ||
| blob_id=blob_id, mailbox__accesses__user=user | ||
| ).exists() | ||
| or MessageTemplate.objects.filter( | ||
| blob_id=blob_id, mailbox__accesses__user=user | ||
| ).exists() | ||
| or Message.objects.filter( | ||
| models.Q(blob_id=blob_id) | models.Q(draft_blob_id=blob_id), | ||
| thread__accesses__mailbox__accesses__user=user, | ||
| ).exists() | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm the call sites + the indexes Django actually creates for Message.blob_id / draft_blob_id.
rg -nP --type=py -C3 '\buser_can_access\s*\('
ast-grep --pattern $'class Message($_):
$$$
blob = $_
$$$
'Repository: suitenumerique/messages
Length of output: 14554
🏁 Script executed:
# Read the full user_can_access method
sed -n '2198,2232p' src/backend/core/models.pyRepository: suitenumerique/messages
Length of output: 1542
🏁 Script executed:
# Check the Message model's Meta class for any existing indexes
rg -A 20 'class Message\(BaseModel\)' src/backend/core/models.py | grep -A 15 'class Meta'Repository: suitenumerique/messages
Length of output: 49
🏁 Script executed:
# Check if there are any migrations related to Message indexes
fd migration src/backend --type f | xargs grep -l 'Message.*index\|blob.*index' 2>/dev/null | head -5Repository: suitenumerique/messages
Length of output: 49
🏁 Script executed:
# Look for the Blob model definition and its manager
rg -B 5 -A 30 'class BlobManager' src/backend/core/models.pyRepository: suitenumerique/messages
Length of output: 1188
🏁 Script executed:
# Find the exact location and content of Message.Meta
sed -n '1919,1924p' src/backend/core/models.pyRepository: suitenumerique/messages
Length of output: 231
🏁 Script executed:
# Search for all index definitions in the entire models.py to see the pattern used
rg 'indexes\s*=\s*\[' src/backend/core/models.py -A 5Repository: suitenumerique/messages
Length of output: 1509
🏁 Script executed:
# Check if there are any db_index=True or indexes on blob/draft_blob fields
rg 'blob.*db_index|draft_blob.*db_index' src/backend/core/models.pyRepository: suitenumerique/messages
Length of output: 49
🏁 Script executed:
# Look at all Message model-related migrations to see index history
fd -e py src/backend/core/migrations | xargs grep -l "Message" | head -3 | xargs ls -lhRepository: suitenumerique/messages
Length of output: 1567
🏁 Script executed:
# Count the join chain depth in the query
# Message → Thread → ThreadAccess → Mailbox → MailboxAccess → User
# Let's verify each relation
rg 'class ThreadAccess' src/backend/core/models.py -A 20 | head -30Repository: suitenumerique/messages
Length of output: 1365
🏁 Script executed:
# Check MailboxAccess structure
rg 'class MailboxAccess' src/backend/core/models.py -A 15 | head -25Repository: suitenumerique/messages
Length of output: 573
🏁 Script executed:
# Search for any TODO, FIXME, or optimization notes related to user_can_access or blob download
rg 'TODO|FIXME|XXX|optimize|perf' src/backend/core/models.py | grep -i 'user_can_access\|blob\|authz'Repository: suitenumerique/messages
Length of output: 49
🏁 Script executed:
# Check if there are any recent related changes or comments
rg -B 2 'user_can_access' src/backend/core/models.py | head -20Repository: suitenumerique/messages
Length of output: 439
🏁 Script executed:
# Final verification: check that the OR pattern isn't already handled elsewhere
rg 'Message.*blob_id\|draft_blob_id' src/backend/core/models.pyRepository: suitenumerique/messages
Length of output: 49
🏁 Script executed:
# Confirm no recent migration addressing this
ls -lt src/backend/core/migrations/*.py 2>/dev/null | head -5Repository: suitenumerique/messages
Length of output: 625
user_can_access does a 4-level relational walk on a hot authz path — this warrants optimization before scaling to 1B messages.
The final branch joins Message → Thread → ThreadAccess → Mailbox → MailboxAccess and filters on (blob_id OR draft_blob_id). This runs on every authenticated blob download (blob.py:213) and the planner's behavior on the OR + deep join is sensitive to selectivity:
- For deduplicated blobs shared across many messages (logos, forwarded attachments, common content), the inner
Messageset is large and the join fans out before the user filter narrows it. - Automatic FK indexes only cover single-column equality; the OR on two columns is not optimized.
- The first three
.exists()calls short-circuit typical "upload-then-attach" workflows, so this deep branch runs precisely for received-mail downloads—the highest-volume case at 1B-message scale.
Two concrete optimization paths to consider:
- Rewrite as a single subquery against
MailboxAccessfiltered byuser=user, thenEXISTSover the four reference tables intersected with that mailbox set — lets the planner short-circuit per-table. - Add a covering index on
Message(blob_id, draft_blob_id)(FK indexes alone do not cover the OR; aUNION ALLof two equalityEXISTStypically plans better than OR).
Both require non-trivial work, but this path gates every body fetch and deserves priority before hitting the stated 1B-message target.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/backend/core/models.py` around lines 2198 - 2232, user_can_access
performs a deep OR'ed join (Message blob_id OR draft_blob_id → Thread →
ThreadAccess → Mailbox → MailboxAccess) that will be very slow at scale; replace
the final Message-based branch with a mailbox-scoped EXISTS pattern: first
compute a subquery of mailbox ids the user can access via
MailboxAccess/ThreadAccess (i.e. a MailboxAccess-filtered mailbox id set using
the Thread→ThreadAccess→Mailbox link), then check EXISTS against each
referencing table separately (Message with blob_id, Message with draft_blob_id,
Attachment, MessageTemplate, MailboxBlob) using that mailbox id set — or
alternatively implement a covering index on Message(blob_id, draft_blob_id) if
schema change is acceptable; update the Message-related check in user_can_access
accordingly (referencing MailboxBlob, Attachment, MessageTemplate, Message,
ThreadAccess, MailboxAccess symbols).
| message_imports_storage = storages["message-imports"] | ||
| with message_imports_storage.open(file_key, "rb") as file: | ||
| file_content = file.read() | ||
| s3_client = message_imports_storage.connection.meta.client | ||
| limit = settings.MAX_INCOMING_EMAIL_SIZE | ||
| resp = s3_client.get_object( | ||
| Bucket=message_imports_storage.bucket_name, | ||
| Key=file_key, | ||
| Range=f"bytes=0-{limit}", # one byte past the limit | ||
| ) | ||
| file_content = resp["Body"].read() |
There was a problem hiding this comment.
Hard AttributeError on non-S3 message-imports storage backends.
message_imports_storage.connection.meta.client is a boto3/S3Boto3Storage-specific attribute chain. Any other backend (e.g., FileSystemStorage used in local dev or CI without a real S3) raises AttributeError, which the outer except Exception swallows as the generic "An error occurred while processing the EML file." message — making the failure very hard to diagnose.
The previous path used storage.open(), which is backend-agnostic. The switch dropped backward compatibility for non-S3 deployments.
🛡️ Proposed fix — graceful fallback to storage.open() for non-S3 backends
- message_imports_storage = storages["message-imports"]
- s3_client = message_imports_storage.connection.meta.client
- limit = settings.MAX_INCOMING_EMAIL_SIZE
- resp = s3_client.get_object(
- Bucket=message_imports_storage.bucket_name,
- Key=file_key,
- Range=f"bytes=0-{limit}", # one byte past the limit
- )
- file_content = resp["Body"].read()
+ message_imports_storage = storages["message-imports"]
+ limit = settings.MAX_INCOMING_EMAIL_SIZE
+ try:
+ # S3-compatible path: use a Range request so an oversized upload
+ # can't OOM the worker before the size check fires.
+ s3_client = message_imports_storage.connection.meta.client
+ resp = s3_client.get_object(
+ Bucket=message_imports_storage.bucket_name,
+ Key=file_key,
+ Range=f"bytes=0-{limit}", # bytes 0-limit inclusive = limit+1 bytes max
+ )
+ file_content = resp["Body"].read()
+ except AttributeError:
+ # Non-S3 backend (e.g. FileSystemStorage in development): fall back to
+ # a bounded read via the standard Django storage API.
+ logger.debug(
+ "message-imports storage is not S3-compatible; "
+ "falling back to storage.open() for %s",
+ file_key,
+ )
+ with message_imports_storage.open(file_key) as f:
+ file_content = f.read(limit + 1)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| message_imports_storage = storages["message-imports"] | |
| with message_imports_storage.open(file_key, "rb") as file: | |
| file_content = file.read() | |
| s3_client = message_imports_storage.connection.meta.client | |
| limit = settings.MAX_INCOMING_EMAIL_SIZE | |
| resp = s3_client.get_object( | |
| Bucket=message_imports_storage.bucket_name, | |
| Key=file_key, | |
| Range=f"bytes=0-{limit}", # one byte past the limit | |
| ) | |
| file_content = resp["Body"].read() | |
| message_imports_storage = storages["message-imports"] | |
| limit = settings.MAX_INCOMING_EMAIL_SIZE | |
| try: | |
| # S3-compatible path: use a Range request so an oversized upload | |
| # can't OOM the worker before the size check fires. | |
| s3_client = message_imports_storage.connection.meta.client | |
| resp = s3_client.get_object( | |
| Bucket=message_imports_storage.bucket_name, | |
| Key=file_key, | |
| Range=f"bytes=0-{limit}", # bytes 0-limit inclusive = limit+1 bytes max | |
| ) | |
| file_content = resp["Body"].read() | |
| except AttributeError: | |
| # Non-S3 backend (e.g. FileSystemStorage in development): fall back to | |
| # a bounded read via the standard Django storage API. | |
| logger.debug( | |
| "message-imports storage is not S3-compatible; " | |
| "falling back to storage.open() for %s", | |
| file_key, | |
| ) | |
| with message_imports_storage.open(file_key) as f: | |
| file_content = f.read(limit + 1) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/backend/core/services/importer/eml_tasks.py` around lines 73 - 81, The
code assumes an S3 client by accessing
message_imports_storage.connection.meta.client which raises AttributeError for
non-S3 backends; update the logic to detect whether message_imports_storage
provides an S3 client (e.g., check hasattr(message_imports_storage,
"connection") and hasattr(message_imports_storage.connection, "meta") and
hasattr(message_imports_storage.connection.meta, "client")), and if present use
s3_client.get_object(...) as before to set resp and file_content, otherwise fall
back to backend-agnostic message_imports_storage.open(file_key, "rb") and read
settings.MAX_INCOMING_EMAIL_SIZE+1 bytes (matching the previous Range semantics)
into file_content; keep existing variable names (message_imports_storage,
s3_client, resp, file_content) and preserve the same error handling for genuine
S3 errors.
| def _run(s=sha256_bytes, k=key_id): | ||
| try: | ||
| self.delete_if_orphaned(s, k) | ||
| except Exception: # pylint: disable=broad-except | ||
| logger.exception( | ||
| "Post-commit S3 cleanup failed for sha=%s key_id=%d; " | ||
| "verify_blobs --mode=storage-to-db will list strays", | ||
| s.hex()[:8], | ||
| k, | ||
| ) | ||
|
|
||
| transaction.on_commit(_run) |
There was a problem hiding this comment.
Reacquire the per-sha lock in deferred orphan cleanup.
defer_delete_if_orphaned() runs after commit with no advisory lock, and delete_if_orphaned() does a plain exists() → storage.delete() sequence. A concurrent offload/restore/rotation can recreate the same (sha256, key_id) object between those two operations, and this callback will delete bytes that a fresh DB row now references. Run the orphan check and delete under a new transaction.atomic(), sha256_advisory_lock(...) block inside the callback, or move that locking into delete_if_orphaned() itself.
Also applies to: 513-542
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/backend/core/services/tiered_storage.py` around lines 500 - 511, Deferred
orphan cleanup callback (_run passed to transaction.on_commit) calls
delete_if_orphaned without any advisory lock, risking deleting blobs recreated
concurrently; modify the callback to reacquire the per-sha lock and DB
transaction before running the check-and-delete: wrap the body of _run in a new
transaction.atomic() and call sha256_advisory_lock(s) (or alternatively add the
same locking inside delete_if_orphaned()), then call delete_if_orphaned(s, k)
inside that locked/atomic block so the exists() → storage.delete() sequence is
protected; keep transaction.on_commit(_run) as the trigger.
| "offload-blobs-to-object-storage": { | ||
| "task": "core.services.tiered_storage_tasks.offload_blobs_task", | ||
| "schedule": 3600.0, # Every hour | ||
| "options": {"queue": "default"}, | ||
| }, | ||
| "gc-orphan-blobs": { | ||
| # Drains the Redis candidate set populated by reference-source | ||
| # post_delete signals. Hourly with a 55-min in-task budget; | ||
| # see ``core/services/blob_gc.py``. The "full" mode (which | ||
| # walks every Blob row as a safety net) is invoked manually | ||
| # via ``celery call core.services.blob_gc.gc_orphan_blobs_task --kwargs '{"mode": "full"}'`` | ||
| # on whatever cadence the operator wants — typically weekly. | ||
| "task": "core.services.blob_gc.gc_orphan_blobs_task", | ||
| "schedule": 3600.0, | ||
| "options": {"queue": "default"}, | ||
| }, |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider isolating long-running blob maintenance tasks from the "default" queue.
Both offload-blobs-to-object-storage and gc-orphan-blobs carry a documented ~55-minute per-tick budget and share the "default" queue with general workloads. In the worst case, a single worker picking up both in one beat cycle can starve other "default" tasks for up to ~110 minutes.
A dedicated queue (e.g., "blob-maintenance") would:
- bound the blast radius to blob ops only,
- allow independent concurrency/worker settings (e.g.,
--concurrency=1), and - keep latency predictable for unrelated
"default"work.
📋 Suggested queue change
"offload-blobs-to-object-storage": {
"task": "core.services.tiered_storage_tasks.offload_blobs_task",
"schedule": 3600.0,
- "options": {"queue": "default"},
+ "options": {"queue": "blob-maintenance"},
},
"gc-orphan-blobs": {
"task": "core.services.blob_gc.gc_orphan_blobs_task",
"schedule": 3600.0,
- "options": {"queue": "default"},
+ "options": {"queue": "blob-maintenance"},
},🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/backend/messages/celery_app.py` around lines 53 - 68, The scheduled tasks
"offload-blobs-to-object-storage" and "gc-orphan-blobs" currently use the
"default" queue; change their "options": {"queue": "..."} entries to use a
dedicated queue name like "blob-maintenance" (e.g., options: {"queue":
"blob-maintenance"}) so long-running blob maintenance is isolated, and ensure
any operator documentation or deployment configs create a worker with
appropriate concurrency (e.g., --concurrency=1) bound to the "blob-maintenance"
queue; update both task entries referencing
core.services.tiered_storage_tasks.offload_blobs_task and
core.services.blob_gc.gc_orphan_blobs_task.
This allows to use S3-compatible object storage to offload blobs, making Postgres much lighter. We design for storing ~1B emails on a single instance.
Fixes #185.
Summary by CodeRabbit
Release Notes
New Features
Documentation